Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queries duplicates fix #733

Merged
merged 2 commits into from
Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 20 additions & 11 deletions zenoh/src/net/routing/hat/p2p_peer/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,17 +316,26 @@ impl HatQueriesTrait for HatCode {
let mres = mres.upgrade().unwrap();
let complete = DEFAULT_INCLUDER.includes(mres.expr().as_bytes(), key_expr.as_bytes());
for (sid, context) in &mres.session_ctxs {
let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, *sid);
if let Some(qabl_info) = context.qabl.as_ref() {
route.push(QueryTargetQabl {
direction: (context.face.clone(), key_expr.to_owned(), NodeId::default()),
complete: if complete {
qabl_info.complete as u64
} else {
0
},
distance: 0.5,
});
if match tables.whatami {
WhatAmI::Router => context.face.whatami != WhatAmI::Router,
_ => source_type == WhatAmI::Client || context.face.whatami == WhatAmI::Client,
} {
let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, *sid);
if let Some(qabl_info) = context.qabl.as_ref() {
route.push(QueryTargetQabl {
direction: (
context.face.clone(),
key_expr.to_owned(),
NodeId::default(),
),
complete: if complete {
qabl_info.complete as u64
} else {
0
},
distance: 0.5,
});
}
}
}
}
Expand Down
278 changes: 278 additions & 0 deletions zenoh/tests/unicity.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use async_std::prelude::FutureExt;
use async_std::task;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use zenoh::prelude::r#async::*;
use zenoh_core::zasync_executor_init;

const TIMEOUT: Duration = Duration::from_secs(60);
const SLEEP: Duration = Duration::from_secs(1);

const MSG_SIZE: [usize; 2] = [1_024, 100_000];

macro_rules! ztimeout {
($f:expr) => {
$f.timeout(TIMEOUT).await.unwrap()
};
}

async fn open_p2p_sessions() -> (Session, Session, Session) {
// Open the sessions
let mut config = config::peer();
config.listen.endpoints = vec!["tcp/127.0.0.1:27447".parse().unwrap()];
config.scouting.multicast.set_enabled(Some(false)).unwrap();
println!("[ ][01a] Opening s01 session");
let s01 = ztimeout!(zenoh::open(config).res_async()).unwrap();

let mut config = config::peer();
config.listen.endpoints = vec!["tcp/127.0.0.1:27448".parse().unwrap()];
config.connect.endpoints = vec!["tcp/127.0.0.1:27447".parse().unwrap()];
config.scouting.multicast.set_enabled(Some(false)).unwrap();
println!("[ ][02a] Opening s02 session");
let s02 = ztimeout!(zenoh::open(config).res_async()).unwrap();

let mut config = config::peer();
config.connect.endpoints = vec![
"tcp/127.0.0.1:27447".parse().unwrap(),
"tcp/127.0.0.1:27448".parse().unwrap(),
];
config.scouting.multicast.set_enabled(Some(false)).unwrap();
println!("[ ][03a] Opening s03 session");
let s03 = ztimeout!(zenoh::open(config).res_async()).unwrap();

(s01, s02, s03)
}

async fn open_router_session() -> Session {
// Open the sessions
let mut config = config::default();
config.set_mode(Some(WhatAmI::Router)).unwrap();
config.listen.endpoints = vec!["tcp/127.0.0.1:37447".parse().unwrap()];
config.scouting.multicast.set_enabled(Some(false)).unwrap();
println!("[ ][00a] Opening router session");
ztimeout!(zenoh::open(config).res_async()).unwrap()
}

async fn close_router_session(s: Session) {
println!("[ ][01d] Closing router session");
ztimeout!(s.close().res_async()).unwrap();
}

async fn open_client_sessions() -> (Session, Session, Session) {
// Open the sessions
let config = config::client(["tcp/127.0.0.1:37447".parse::<EndPoint>().unwrap()]);
println!("[ ][01a] Opening s01 session");
let s01 = ztimeout!(zenoh::open(config).res_async()).unwrap();

let config = config::client(["tcp/127.0.0.1:37447".parse::<EndPoint>().unwrap()]);
println!("[ ][02a] Opening s02 session");
let s02 = ztimeout!(zenoh::open(config).res_async()).unwrap();

let config = config::client(["tcp/127.0.0.1:37447".parse::<EndPoint>().unwrap()]);
println!("[ ][03a] Opening s03 session");
let s03 = ztimeout!(zenoh::open(config).res_async()).unwrap();

(s01, s02, s03)
}

async fn close_sessions(s01: Session, s02: Session, s03: Session) {
println!("[ ][01d] Closing s01 session");
ztimeout!(s01.close().res_async()).unwrap();
println!("[ ][02d] Closing s02 session");
ztimeout!(s02.close().res_async()).unwrap();
println!("[ ][03d] Closing s03 session");
ztimeout!(s03.close().res_async()).unwrap();
}

async fn test_unicity_pubsub(s01: &Session, s02: &Session, s03: &Session) {
let key_expr = "test/unicity";
let msg_count = 1;
let msgs1 = Arc::new(AtomicUsize::new(0));
let msgs2 = Arc::new(AtomicUsize::new(0));

for size in MSG_SIZE {
msgs1.store(0, Ordering::Relaxed);
msgs2.store(0, Ordering::Relaxed);

// Subscribe to data
println!("[PS][01b] Subscribing on s01 session");
let c_msgs1 = msgs1.clone();
let sub1 = ztimeout!(s01
.declare_subscriber(key_expr)
.callback(move |sample| {
assert_eq!(sample.value.payload.len(), size);
c_msgs1.fetch_add(1, Ordering::Relaxed);
})
.res_async())
.unwrap();

// Subscribe to data
println!("[PS][02b] Subscribing on s02 session");
let c_msgs2 = msgs2.clone();
let sub2 = ztimeout!(s02
.declare_subscriber(key_expr)
.callback(move |sample| {
assert_eq!(sample.value.payload.len(), size);
c_msgs2.fetch_add(1, Ordering::Relaxed);
})
.res_async())
.unwrap();

// Wait for the declaration to propagate
task::sleep(SLEEP).await;

// Put data
println!("[PS][03b] Putting on s03 session. {msg_count} msgs of {size} bytes.");
for _ in 0..msg_count {
ztimeout!(s03
.put(key_expr, vec![0u8; size])
.congestion_control(CongestionControl::Block)
.res_async())
.unwrap();
}

ztimeout!(async {
loop {
let cnt1 = msgs1.load(Ordering::Relaxed);
let cnt2 = msgs2.load(Ordering::Relaxed);
println!("[PS][01b] Received {cnt1}/{msg_count}.");
println!("[PS][02b] Received {cnt2}/{msg_count}.");
if cnt1 < msg_count || cnt2 < msg_count {
task::sleep(SLEEP).await;
} else {
break;
}
}
});

task::sleep(SLEEP).await;

let cnt1 = msgs1.load(Ordering::Relaxed);
println!("[QR][01c] Got on s01 session. {cnt1}/{msg_count} msgs.");
assert_eq!(cnt1, msg_count);
let cnt2 = msgs1.load(Ordering::Relaxed);
println!("[QR][02c] Got on s02 session. {cnt2}/{msg_count} msgs.");
assert_eq!(cnt2, msg_count);

println!("[PS][02b] Unsubscribing on s02 session");
ztimeout!(sub2.undeclare().res_async()).unwrap();

println!("[PS][01b] Unsubscribing on s01 session");
ztimeout!(sub1.undeclare().res_async()).unwrap();

// Wait for the declaration to propagate
task::sleep(SLEEP).await;
}
}

async fn test_unicity_qryrep(s01: &Session, s02: &Session, s03: &Session) {
let key_expr = "test/unicity";
let msg_count = 1;
let msgs1 = Arc::new(AtomicUsize::new(0));
let msgs2 = Arc::new(AtomicUsize::new(0));

for size in MSG_SIZE {
msgs1.store(0, Ordering::Relaxed);
msgs2.store(0, Ordering::Relaxed);

// Queryable to data
println!("[QR][01c] Queryable on s01 session");
let c_msgs1 = msgs1.clone();
let qbl1 = ztimeout!(s01
.declare_queryable(key_expr)
.callback(move |sample| {
c_msgs1.fetch_add(1, Ordering::Relaxed);
let rep = Sample::try_from(key_expr, vec![0u8; size]).unwrap();
task::block_on(async { ztimeout!(sample.reply(Ok(rep)).res_async()).unwrap() });
})
.res_async())
.unwrap();

// Queryable to data
println!("[QR][02c] Queryable on s02 session");
let c_msgs2 = msgs2.clone();
let qbl2 = ztimeout!(s02
.declare_queryable(key_expr)
.callback(move |sample| {
c_msgs2.fetch_add(1, Ordering::Relaxed);
let rep = Sample::try_from(key_expr, vec![0u8; size]).unwrap();
task::block_on(async { ztimeout!(sample.reply(Ok(rep)).res_async()).unwrap() });
})
.res_async())
.unwrap();

// Wait for the declaration to propagate
task::sleep(SLEEP).await;

// Get data
println!("[QR][03c] Getting on s03 session. {msg_count} msgs.");
let mut cnt = 0;
for _ in 0..msg_count {
let rs = ztimeout!(s03.get(key_expr).res_async()).unwrap();
while let Ok(s) = ztimeout!(rs.recv_async()) {
assert_eq!(s.sample.unwrap().value.payload.len(), size);
cnt += 1;
}
}
let cnt1 = msgs1.load(Ordering::Relaxed);
println!("[QR][01c] Got on s01 session. {cnt1}/{msg_count} msgs.");
assert_eq!(cnt1, msg_count);
let cnt2 = msgs1.load(Ordering::Relaxed);
println!("[QR][02c] Got on s02 session. {cnt2}/{msg_count} msgs.");
assert_eq!(cnt2, msg_count);
println!("[QR][03c] Got on s03 session. {cnt}/{msg_count} msgs.");
assert_eq!(cnt, msg_count);

println!("[PS][01c] Unqueryable on s01 session");
ztimeout!(qbl1.undeclare().res_async()).unwrap();

println!("[PS][02c] Unqueryable on s02 session");
ztimeout!(qbl2.undeclare().res_async()).unwrap();

// Wait for the declaration to propagate
task::sleep(SLEEP).await;
}
}

#[test]
fn zenoh_unicity_p2p() {
task::block_on(async {
zasync_executor_init!();
let _ = env_logger::try_init();

let (s01, s02, s03) = open_p2p_sessions().await;
test_unicity_pubsub(&s01, &s02, &s03).await;
test_unicity_qryrep(&s01, &s02, &s03).await;
close_sessions(s01, s02, s03).await;
});
}

#[test]
fn zenoh_unicity_brokered() {
task::block_on(async {
zasync_executor_init!();
let _ = env_logger::try_init();
let r = open_router_session().await;

let (s01, s02, s03) = open_client_sessions().await;
test_unicity_pubsub(&s01, &s02, &s03).await;
test_unicity_qryrep(&s01, &s02, &s03).await;
close_sessions(s01, s02, s03).await;

close_router_session(r).await;
});
}