From 8858d4c40ec58bbef7d5f1ca2c4ae2f27c9f6519 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Tue, 20 Aug 2024 18:23:52 +0200 Subject: [PATCH] Add linkstate liveliness tests --- zenoh/tests/routing.rs | 137 +++++++++++++++++++++++++++++++++++------ 1 file changed, 117 insertions(+), 20 deletions(-) diff --git a/zenoh/tests/routing.rs b/zenoh/tests/routing.rs index dac1606024..d34e52ccc0 100644 --- a/zenoh/tests/routing.rs +++ b/zenoh/tests/routing.rs @@ -230,13 +230,16 @@ impl Task { loop { tokio::select! { _ = token.cancelled() => break, - res = sub.recv_async() => { - if res.is_ok() { + res = tokio::time::timeout(TIMEOUT, sub.recv_async()) => { + if res?.is_ok() { counter += 1; if counter >= MSG_COUNT { println!("LivelinessSub received sufficient amount of messages. Done."); break; } + } else { + println!("LivelinessSub recv error."); + break; } } } @@ -1089,6 +1092,8 @@ async fn peer_linkstate() -> Result<()> { let ke_pubsub = format!("peer_linkstate_keyexpr_pubsub_{idx}"); let ke_getqueryable = format!("peer_linkstate_keyexpr_getqueryable_{idx}"); + let ke_subliveliness = format!("peer_linkstate_keyexpr_subliveliness_{idx}"); + let ke_getliveliness = format!("peer_linkstate_keyexpr_getliveliness_{idx}"); use rand::Rng; let mut rng = rand::thread_rng(); @@ -1103,7 +1108,7 @@ async fn peer_linkstate() -> Result<()> { ..Default::default() }; - let (pub_node, queryable_node) = { + let (pub_node, queryable_node, liveliness_node, livelinessloop_node) = { let base = Node { mode: WhatAmI::Peer, connect: vec![locator.clone()], @@ -1120,7 +1125,7 @@ async fn peer_linkstate() -> Result<()> { )])]); pub_node.warmup += Duration::from_millis(rng.gen_range(0..500)); - let mut queryable_node = base; + let mut queryable_node = base.clone(); queryable_node.name = "Queryable Peer".to_string(); queryable_node.con_task = ConcurrentTask::from([SequentialTask::from([Task::Queryable( @@ -1129,10 +1134,31 @@ async fn peer_linkstate() -> Result<()> { )])]); queryable_node.warmup += Duration::from_millis(rng.gen_range(0..500)); - (pub_node, queryable_node) + let mut liveliness_node = base.clone(); + liveliness_node.name = "Liveliness Peer".to_string(); + liveliness_node.con_task = + ConcurrentTask::from([SequentialTask::from([Task::Liveliness( + ke_getliveliness.clone(), + )])]); + liveliness_node.warmup += Duration::from_millis(rng.gen_range(0..500)); + + let mut livelinessloop_node = base; + livelinessloop_node.name = "LivelinessLoop Peer".to_string(); + livelinessloop_node.con_task = + ConcurrentTask::from([SequentialTask::from([Task::LivelinessLoop( + ke_subliveliness.clone(), + )])]); + livelinessloop_node.warmup += Duration::from_millis(rng.gen_range(0..500)); + + ( + pub_node, + queryable_node, + liveliness_node, + livelinessloop_node, + ) }; - let (sub_node, get_node) = { + let (sub_node, get_node, livelinessget_node, livelinesssub_node) = { let base = Node { mode: WhatAmI::Peer, connect: vec![locator], @@ -1149,7 +1175,7 @@ async fn peer_linkstate() -> Result<()> { ])]); sub_node.warmup += Duration::from_millis(rng.gen_range(0..500)); - let mut get_node = base; + let mut get_node = base.clone(); get_node.name = "Get Peer".to_string(); get_node.con_task = ConcurrentTask::from([SequentialTask::from([ Task::Get(ke_getqueryable, msg_size), @@ -1157,22 +1183,42 @@ async fn peer_linkstate() -> Result<()> { ])]); get_node.warmup += Duration::from_millis(rng.gen_range(0..500)); - (sub_node, get_node) + let mut livelinessget_node = base.clone(); + livelinessget_node.name = "LivelinessGet Peer".to_string(); + livelinessget_node.con_task = ConcurrentTask::from([SequentialTask::from([ + Task::LivelinessGet(ke_getliveliness), + Task::Checkpoint, + ])]); + livelinessget_node.warmup += Duration::from_millis(rng.gen_range(0..500)); + + let mut livelinesssub_node = base; + livelinesssub_node.name = "LivelinessSub Peer".to_string(); + livelinesssub_node.con_task = ConcurrentTask::from([SequentialTask::from([ + Task::LivelinessSub(ke_subliveliness), + Task::Checkpoint, + ])]); + livelinesssub_node.warmup += Duration::from_millis(rng.gen_range(0..500)); + + (sub_node, get_node, livelinessget_node, livelinesssub_node) }; ( Recipe::new([dummy_node.clone(), pub_node, sub_node]), - Recipe::new([dummy_node, queryable_node, get_node]), + Recipe::new([dummy_node.clone(), queryable_node, get_node]), + Recipe::new([dummy_node.clone(), liveliness_node, livelinessget_node]), + Recipe::new([dummy_node, livelinessloop_node, livelinesssub_node]), ) }) .collect(); for chunks in recipe_list.chunks(PARALLEL_RECIPES).map(|x| x.to_vec()) { let mut join_set = tokio::task::JoinSet::new(); - for (pubsub, getqueryable) in chunks { + for (pubsub, getqueryable, getlivelienss, subliveliness) in chunks { join_set.spawn(async move { pubsub.run().await?; getqueryable.run().await?; + getlivelienss.run().await?; + subliveliness.run().await?; Result::Ok(()) }); } @@ -1213,6 +1259,8 @@ async fn router_linkstate() -> Result<()> { let ke_pubsub = format!("router_linkstate_keyexpr_pubsub_{idx}"); let ke_getqueryable = format!("router_linkstate_keyexpr_getqueryable_{idx}"); + let ke_subliveliness = format!("router_linkstate_keyexpr_subliveliness_{idx}"); + let ke_getliveliness = format!("router_linkstate_keyexpr_getliveliness_{idx}"); let router1_node = Node { name: "Router 1".to_string(), @@ -1243,7 +1291,7 @@ async fn router_linkstate() -> Result<()> { ..Default::default() }; - let (pub_node, queryable_node) = { + let (pub_node, queryable_node, liveliness_node, livelinessloop_node) = { let base = Node { mode: WhatAmI::Client, connect: vec![locator1.clone()], @@ -1258,7 +1306,7 @@ async fn router_linkstate() -> Result<()> { msg_size, )])]); - let mut queryable_node = base; + let mut queryable_node = base.clone(); queryable_node.name = "Queryable Client".to_string(); queryable_node.con_task = ConcurrentTask::from([SequentialTask::from([Task::Queryable( @@ -1266,10 +1314,29 @@ async fn router_linkstate() -> Result<()> { msg_size, )])]); - (pub_node, queryable_node) + let mut liveliness_node = base.clone(); + liveliness_node.name = "Liveliness Client".to_string(); + liveliness_node.con_task = + ConcurrentTask::from([SequentialTask::from([Task::Liveliness( + ke_getliveliness.clone(), + )])]); + + let mut livelinessloop_node = base; + livelinessloop_node.name = "LivelinessLoop Client".to_string(); + livelinessloop_node.con_task = + ConcurrentTask::from([SequentialTask::from([Task::LivelinessLoop( + ke_subliveliness.clone(), + )])]); + + ( + pub_node, + queryable_node, + liveliness_node, + livelinessloop_node, + ) }; - let (sub_node, get_node) = { + let (sub_node, get_node, livelinessget_node, livelinesssub_node) = { let base = Node { mode: WhatAmI::Client, connect: vec![locator3], @@ -1284,14 +1351,28 @@ async fn router_linkstate() -> Result<()> { Task::Checkpoint, ])]); - let mut get_node = base; + let mut get_node = base.clone(); get_node.name = "Get Client".to_string(); get_node.con_task = ConcurrentTask::from([SequentialTask::from([ Task::Get(ke_getqueryable, msg_size), Task::Checkpoint, ])]); - (sub_node, get_node) + let mut livelinessget_node = base.clone(); + livelinessget_node.name = "LivelinessGet Client".to_string(); + livelinessget_node.con_task = ConcurrentTask::from([SequentialTask::from([ + Task::LivelinessGet(ke_getliveliness), + Task::Checkpoint, + ])]); + + let mut livelinesssub_node = base; + livelinesssub_node.name = "LivelinessSub Client".to_string(); + livelinesssub_node.con_task = ConcurrentTask::from([SequentialTask::from([ + Task::LivelinessSub(ke_subliveliness), + Task::Checkpoint, + ])]); + + (sub_node, get_node, livelinessget_node, livelinesssub_node) }; ( @@ -1302,23 +1383,39 @@ async fn router_linkstate() -> Result<()> { pub_node, sub_node, ]), + Recipe::new([ + router1_node.clone(), + router2_node.clone(), + router3_node.clone(), + queryable_node, + get_node, + ]), + Recipe::new([ + router1_node.clone(), + router2_node.clone(), + router3_node.clone(), + liveliness_node, + livelinessget_node, + ]), Recipe::new([ router1_node, router2_node, router3_node, - queryable_node, - get_node, + livelinessloop_node, + livelinesssub_node, ]), ) }) .collect(); - for chunks in recipe_list.chunks(PARALLEL_RECIPES).map(|x| x.to_vec()) { + for chunks in recipe_list.chunks(1).map(|x| x.to_vec()) { let mut join_set = tokio::task::JoinSet::new(); - for (pubsub, getqueryable) in chunks { + for (pubsub, getqueryable, getlivelienss, subliveliness) in chunks { join_set.spawn(async move { pubsub.run().await?; getqueryable.run().await?; + getlivelienss.run().await?; + subliveliness.run().await?; Result::Ok(()) }); }