Skip to content

Commit

Permalink
Add query/reply ok(put|del)/err() tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Mallets committed Feb 8, 2024
1 parent 9c1598a commit 198c715
Showing 1 changed file with 65 additions and 6 deletions.
71 changes: 65 additions & 6 deletions zenoh/tests/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,31 @@ async fn test_session_qryrep(peer01: &Session, peer02: &Session, reliability: Re
let c_msgs = msgs.clone();
let qbl = ztimeout!(peer01
.declare_queryable(key_expr)
.callback(move |sample| {
.callback(move |query| {
c_msgs.fetch_add(1, Ordering::Relaxed);
let rep = Sample::try_from(key_expr, vec![0u8; size]).unwrap();
task::block_on(async { ztimeout!(sample.reply(Ok(rep)).res_async()).unwrap() });
match query.parameters() {
"ok_put" => {
let mut rep = Sample::try_from(key_expr, vec![0u8; size]).unwrap();
rep.kind = SampleKind::Put;
task::block_on(async {
ztimeout!(query.reply(Ok(rep)).res_async()).unwrap()
});
}
"ok_del" => {
let mut rep = Sample::try_from(key_expr, vec![0u8; size]).unwrap();
rep.kind = SampleKind::Delete;
task::block_on(async {
ztimeout!(query.reply(Ok(rep)).res_async()).unwrap()
});
}
"err" => {
let rep = Value::from(vec![0u8; size]);
task::block_on(async {
ztimeout!(query.reply(Err(rep)).res_async()).unwrap()
});
}
_ => panic!("Unknown query parameter"),
}
})
.res_async())
.unwrap();
Expand All @@ -165,19 +186,57 @@ async fn test_session_qryrep(peer01: &Session, peer02: &Session, reliability: Re
task::sleep(SLEEP).await;

// Get data
println!("[QR][02c] Getting on peer02 session. {msg_count} msgs.");
println!("[QR][02c] Getting Ok(Put) on peer02 session. {msg_count} msgs.");
let mut cnt = 0;
for _ in 0..msg_count {
let rs = ztimeout!(peer02.get(key_expr).res_async()).unwrap();
let selector = format!("{}?ok_put", key_expr);
let rs = ztimeout!(peer02.get(selector).res_async()).unwrap();
while let Ok(s) = ztimeout!(rs.recv_async()) {
assert_eq!(s.sample.unwrap().value.payload.len(), size);
let s = s.sample.unwrap();
assert_eq!(s.kind, SampleKind::Put);
assert_eq!(s.value.payload.len(), size);
cnt += 1;
}
}
println!("[QR][02c] Got on peer02 session. {cnt}/{msg_count} msgs.");
assert_eq!(msgs.load(Ordering::Relaxed), msg_count);
assert_eq!(cnt, msg_count);

msgs.store(0, Ordering::Relaxed);

println!("[QR][03c] Getting Ok(Delete) on peer02 session. {msg_count} msgs.");
let mut cnt = 0;
for _ in 0..msg_count {
let selector = format!("{}?ok_del", key_expr);
let rs = ztimeout!(peer02.get(selector).res_async()).unwrap();
while let Ok(s) = ztimeout!(rs.recv_async()) {
let s = s.sample.unwrap();
assert_eq!(s.kind, SampleKind::Delete);
assert_eq!(s.value.payload.len(), 0);
cnt += 1;
}
}
println!("[QR][03c] Got on peer02 session. {cnt}/{msg_count} msgs.");
assert_eq!(msgs.load(Ordering::Relaxed), msg_count);
assert_eq!(cnt, msg_count);

msgs.store(0, Ordering::Relaxed);

println!("[QR][04c] Getting Err() on peer02 session. {msg_count} msgs.");
let mut cnt = 0;
for _ in 0..msg_count {
let selector = format!("{}?err", key_expr);
let rs = ztimeout!(peer02.get(selector).res_async()).unwrap();
while let Ok(s) = ztimeout!(rs.recv_async()) {
let e = s.sample.unwrap_err();
assert_eq!(e.payload.len(), size);
cnt += 1;
}
}
println!("[QR][04c] Got on peer02 session. {cnt}/{msg_count} msgs.");
assert_eq!(msgs.load(Ordering::Relaxed), msg_count);
assert_eq!(cnt, msg_count);

println!("[PS][03c] Unqueryable on peer01 session");
ztimeout!(qbl.undeclare().res_async()).unwrap();

Expand Down

0 comments on commit 198c715

Please sign in to comment.