Skip to content

Commit

Permalink
Add linkstate liveliness tests
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierHecart committed Aug 20, 2024
1 parent cf00089 commit a0c775d
Showing 1 changed file with 119 additions and 20 deletions.
139 changes: 119 additions & 20 deletions zenoh/tests/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,13 +230,16 @@ impl Task {
loop {
tokio::select! {
_ = token.cancelled() => break,
res = sub.recv_async() => {
if res.is_ok() {
res = tokio::time::timeout(TIMEOUT, sub.recv_async()) => {
if res?.is_ok() {
counter += 1;
if counter >= MSG_COUNT {
println!("LivelinessSub received sufficient amount of messages. Done.");
break;
}
} else {
println!("LivelinessSub recv error.");
break;
}
}
}
Expand Down Expand Up @@ -1049,6 +1052,7 @@ async fn three_node_combination_multicast() -> Result<()> {
Ok(())
}

#[cfg(feature = "unstable")]
#[tokio::test(flavor = "multi_thread", worker_threads = 9)]
async fn peer_linkstate() -> Result<()> {
zenoh_util::try_init_log_from_env();
Expand Down Expand Up @@ -1089,6 +1093,8 @@ async fn peer_linkstate() -> Result<()> {

let ke_pubsub = format!("peer_linkstate_keyexpr_pubsub_{idx}");
let ke_getqueryable = format!("peer_linkstate_keyexpr_getqueryable_{idx}");
let ke_subliveliness = format!("peer_linkstate_keyexpr_subliveliness_{idx}");
let ke_getliveliness = format!("peer_linkstate_keyexpr_getliveliness_{idx}");

use rand::Rng;
let mut rng = rand::thread_rng();
Expand All @@ -1103,7 +1109,7 @@ async fn peer_linkstate() -> Result<()> {
..Default::default()
};

let (pub_node, queryable_node) = {
let (pub_node, queryable_node, liveliness_node, livelinessloop_node) = {
let base = Node {
mode: WhatAmI::Peer,
connect: vec![locator.clone()],
Expand All @@ -1120,7 +1126,7 @@ async fn peer_linkstate() -> Result<()> {
)])]);
pub_node.warmup += Duration::from_millis(rng.gen_range(0..500));

let mut queryable_node = base;
let mut queryable_node = base.clone();
queryable_node.name = "Queryable Peer".to_string();
queryable_node.con_task =
ConcurrentTask::from([SequentialTask::from([Task::Queryable(
Expand All @@ -1129,10 +1135,31 @@ async fn peer_linkstate() -> Result<()> {
)])]);
queryable_node.warmup += Duration::from_millis(rng.gen_range(0..500));

(pub_node, queryable_node)
let mut liveliness_node = base.clone();
liveliness_node.name = "Liveliness Peer".to_string();
liveliness_node.con_task =
ConcurrentTask::from([SequentialTask::from([Task::Liveliness(
ke_getliveliness.clone(),
)])]);
liveliness_node.warmup += Duration::from_millis(rng.gen_range(0..500));

let mut livelinessloop_node = base;
livelinessloop_node.name = "LivelinessLoop Peer".to_string();
livelinessloop_node.con_task =
ConcurrentTask::from([SequentialTask::from([Task::LivelinessLoop(
ke_subliveliness.clone(),
)])]);
livelinessloop_node.warmup += Duration::from_millis(rng.gen_range(0..500));

(
pub_node,
queryable_node,
liveliness_node,
livelinessloop_node,
)
};

let (sub_node, get_node) = {
let (sub_node, get_node, livelinessget_node, livelinesssub_node) = {
let base = Node {
mode: WhatAmI::Peer,
connect: vec![locator],
Expand All @@ -1149,30 +1176,50 @@ async fn peer_linkstate() -> Result<()> {
])]);
sub_node.warmup += Duration::from_millis(rng.gen_range(0..500));

let mut get_node = base;
let mut get_node = base.clone();
get_node.name = "Get Peer".to_string();
get_node.con_task = ConcurrentTask::from([SequentialTask::from([
Task::Get(ke_getqueryable, msg_size),
Task::Checkpoint,
])]);
get_node.warmup += Duration::from_millis(rng.gen_range(0..500));

(sub_node, get_node)
let mut livelinessget_node = base.clone();
livelinessget_node.name = "LivelinessGet Peer".to_string();
livelinessget_node.con_task = ConcurrentTask::from([SequentialTask::from([
Task::LivelinessGet(ke_getliveliness),
Task::Checkpoint,
])]);
livelinessget_node.warmup += Duration::from_millis(rng.gen_range(0..500));

let mut livelinesssub_node = base;
livelinesssub_node.name = "LivelinessSub Peer".to_string();
livelinesssub_node.con_task = ConcurrentTask::from([SequentialTask::from([
Task::LivelinessSub(ke_subliveliness),
Task::Checkpoint,
])]);
livelinesssub_node.warmup += Duration::from_millis(rng.gen_range(0..500));

(sub_node, get_node, livelinessget_node, livelinesssub_node)
};

(
Recipe::new([dummy_node.clone(), pub_node, sub_node]),
Recipe::new([dummy_node, queryable_node, get_node]),
Recipe::new([dummy_node.clone(), queryable_node, get_node]),
Recipe::new([dummy_node.clone(), liveliness_node, livelinessget_node]),
Recipe::new([dummy_node, livelinessloop_node, livelinesssub_node]),
)
})
.collect();

for chunks in recipe_list.chunks(PARALLEL_RECIPES).map(|x| x.to_vec()) {
let mut join_set = tokio::task::JoinSet::new();
for (pubsub, getqueryable) in chunks {
for (pubsub, getqueryable, getlivelienss, subliveliness) in chunks {
join_set.spawn(async move {
pubsub.run().await?;
getqueryable.run().await?;
getlivelienss.run().await?;
subliveliness.run().await?;
Result::Ok(())
});
}
Expand All @@ -1186,6 +1233,7 @@ async fn peer_linkstate() -> Result<()> {
Ok(())
}

#[cfg(feature = "unstable")]
#[tokio::test(flavor = "multi_thread", worker_threads = 9)]
async fn router_linkstate() -> Result<()> {
zenoh_util::try_init_log_from_env();
Expand Down Expand Up @@ -1213,6 +1261,8 @@ async fn router_linkstate() -> Result<()> {

let ke_pubsub = format!("router_linkstate_keyexpr_pubsub_{idx}");
let ke_getqueryable = format!("router_linkstate_keyexpr_getqueryable_{idx}");
let ke_subliveliness = format!("router_linkstate_keyexpr_subliveliness_{idx}");
let ke_getliveliness = format!("router_linkstate_keyexpr_getliveliness_{idx}");

let router1_node = Node {
name: "Router 1".to_string(),
Expand Down Expand Up @@ -1243,7 +1293,7 @@ async fn router_linkstate() -> Result<()> {
..Default::default()
};

let (pub_node, queryable_node) = {
let (pub_node, queryable_node, liveliness_node, livelinessloop_node) = {
let base = Node {
mode: WhatAmI::Client,
connect: vec![locator1.clone()],
Expand All @@ -1258,18 +1308,37 @@ async fn router_linkstate() -> Result<()> {
msg_size,
)])]);

let mut queryable_node = base;
let mut queryable_node = base.clone();
queryable_node.name = "Queryable Client".to_string();
queryable_node.con_task =
ConcurrentTask::from([SequentialTask::from([Task::Queryable(
ke_getqueryable.clone(),
msg_size,
)])]);

(pub_node, queryable_node)
let mut liveliness_node = base.clone();
liveliness_node.name = "Liveliness Client".to_string();
liveliness_node.con_task =
ConcurrentTask::from([SequentialTask::from([Task::Liveliness(
ke_getliveliness.clone(),
)])]);

let mut livelinessloop_node = base;
livelinessloop_node.name = "LivelinessLoop Client".to_string();
livelinessloop_node.con_task =
ConcurrentTask::from([SequentialTask::from([Task::LivelinessLoop(
ke_subliveliness.clone(),
)])]);

(
pub_node,
queryable_node,
liveliness_node,
livelinessloop_node,
)
};

let (sub_node, get_node) = {
let (sub_node, get_node, livelinessget_node, livelinesssub_node) = {
let base = Node {
mode: WhatAmI::Client,
connect: vec![locator3],
Expand All @@ -1284,14 +1353,28 @@ async fn router_linkstate() -> Result<()> {
Task::Checkpoint,
])]);

let mut get_node = base;
let mut get_node = base.clone();
get_node.name = "Get Client".to_string();
get_node.con_task = ConcurrentTask::from([SequentialTask::from([
Task::Get(ke_getqueryable, msg_size),
Task::Checkpoint,
])]);

(sub_node, get_node)
let mut livelinessget_node = base.clone();
livelinessget_node.name = "LivelinessGet Client".to_string();
livelinessget_node.con_task = ConcurrentTask::from([SequentialTask::from([
Task::LivelinessGet(ke_getliveliness),
Task::Checkpoint,
])]);

let mut livelinesssub_node = base;
livelinesssub_node.name = "LivelinessSub Client".to_string();
livelinesssub_node.con_task = ConcurrentTask::from([SequentialTask::from([
Task::LivelinessSub(ke_subliveliness),
Task::Checkpoint,
])]);

(sub_node, get_node, livelinessget_node, livelinesssub_node)
};

(
Expand All @@ -1302,23 +1385,39 @@ async fn router_linkstate() -> Result<()> {
pub_node,
sub_node,
]),
Recipe::new([
router1_node.clone(),
router2_node.clone(),
router3_node.clone(),
queryable_node,
get_node,
]),
Recipe::new([
router1_node.clone(),
router2_node.clone(),
router3_node.clone(),
liveliness_node,
livelinessget_node,
]),
Recipe::new([
router1_node,
router2_node,
router3_node,
queryable_node,
get_node,
livelinessloop_node,
livelinesssub_node,
]),
)
})
.collect();

for chunks in recipe_list.chunks(PARALLEL_RECIPES).map(|x| x.to_vec()) {
for chunks in recipe_list.chunks(1).map(|x| x.to_vec()) {
let mut join_set = tokio::task::JoinSet::new();
for (pubsub, getqueryable) in chunks {
for (pubsub, getqueryable, getlivelienss, subliveliness) in chunks {
join_set.spawn(async move {
pubsub.run().await?;
getqueryable.run().await?;
getlivelienss.run().await?;
subliveliness.run().await?;
Result::Ok(())
});
}
Expand Down

0 comments on commit a0c775d

Please sign in to comment.