From 198c715f11c1e1834f25336a4039a6c0328d2396 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Thu, 8 Feb 2024 10:02:00 +0100 Subject: [PATCH] Add query/reply ok(put|del)/err() tests --- zenoh/tests/session.rs | 71 ++++++++++++++++++++++++++++++++++++++---- 1 file changed, 65 insertions(+), 6 deletions(-) diff --git a/zenoh/tests/session.rs b/zenoh/tests/session.rs index c2cec7c627..f727ad60c3 100644 --- a/zenoh/tests/session.rs +++ b/zenoh/tests/session.rs @@ -153,10 +153,31 @@ async fn test_session_qryrep(peer01: &Session, peer02: &Session, reliability: Re let c_msgs = msgs.clone(); let qbl = ztimeout!(peer01 .declare_queryable(key_expr) - .callback(move |sample| { + .callback(move |query| { c_msgs.fetch_add(1, Ordering::Relaxed); - let rep = Sample::try_from(key_expr, vec![0u8; size]).unwrap(); - task::block_on(async { ztimeout!(sample.reply(Ok(rep)).res_async()).unwrap() }); + match query.parameters() { + "ok_put" => { + let mut rep = Sample::try_from(key_expr, vec![0u8; size]).unwrap(); + rep.kind = SampleKind::Put; + task::block_on(async { + ztimeout!(query.reply(Ok(rep)).res_async()).unwrap() + }); + } + "ok_del" => { + let mut rep = Sample::try_from(key_expr, vec![0u8; size]).unwrap(); + rep.kind = SampleKind::Delete; + task::block_on(async { + ztimeout!(query.reply(Ok(rep)).res_async()).unwrap() + }); + } + "err" => { + let rep = Value::from(vec![0u8; size]); + task::block_on(async { + ztimeout!(query.reply(Err(rep)).res_async()).unwrap() + }); + } + _ => panic!("Unknown query parameter"), + } }) .res_async()) .unwrap(); @@ -165,12 +186,15 @@ async fn test_session_qryrep(peer01: &Session, peer02: &Session, reliability: Re task::sleep(SLEEP).await; // Get data - println!("[QR][02c] Getting on peer02 session. {msg_count} msgs."); + println!("[QR][02c] Getting Ok(Put) on peer02 session. {msg_count} msgs."); let mut cnt = 0; for _ in 0..msg_count { - let rs = ztimeout!(peer02.get(key_expr).res_async()).unwrap(); + let selector = format!("{}?ok_put", key_expr); + let rs = ztimeout!(peer02.get(selector).res_async()).unwrap(); while let Ok(s) = ztimeout!(rs.recv_async()) { - assert_eq!(s.sample.unwrap().value.payload.len(), size); + let s = s.sample.unwrap(); + assert_eq!(s.kind, SampleKind::Put); + assert_eq!(s.value.payload.len(), size); cnt += 1; } } @@ -178,6 +202,41 @@ async fn test_session_qryrep(peer01: &Session, peer02: &Session, reliability: Re assert_eq!(msgs.load(Ordering::Relaxed), msg_count); assert_eq!(cnt, msg_count); + msgs.store(0, Ordering::Relaxed); + + println!("[QR][03c] Getting Ok(Delete) on peer02 session. {msg_count} msgs."); + let mut cnt = 0; + for _ in 0..msg_count { + let selector = format!("{}?ok_del", key_expr); + let rs = ztimeout!(peer02.get(selector).res_async()).unwrap(); + while let Ok(s) = ztimeout!(rs.recv_async()) { + let s = s.sample.unwrap(); + assert_eq!(s.kind, SampleKind::Delete); + assert_eq!(s.value.payload.len(), 0); + cnt += 1; + } + } + println!("[QR][03c] Got on peer02 session. {cnt}/{msg_count} msgs."); + assert_eq!(msgs.load(Ordering::Relaxed), msg_count); + assert_eq!(cnt, msg_count); + + msgs.store(0, Ordering::Relaxed); + + println!("[QR][04c] Getting Err() on peer02 session. {msg_count} msgs."); + let mut cnt = 0; + for _ in 0..msg_count { + let selector = format!("{}?err", key_expr); + let rs = ztimeout!(peer02.get(selector).res_async()).unwrap(); + while let Ok(s) = ztimeout!(rs.recv_async()) { + let e = s.sample.unwrap_err(); + assert_eq!(e.payload.len(), size); + cnt += 1; + } + } + println!("[QR][04c] Got on peer02 session. {cnt}/{msg_count} msgs."); + assert_eq!(msgs.load(Ordering::Relaxed), msg_count); + assert_eq!(cnt, msg_count); + println!("[PS][03c] Unqueryable on peer01 session"); ztimeout!(qbl.undeclare().res_async()).unwrap();