From a6f92ca1622af801df688aedce1eb4c0dd53d846 Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Fri, 6 Dec 2024 14:23:56 -0700 Subject: [PATCH] fix: faster tests --- p2p/tests/node.rs | 70 +++++++++++++++++++++++++++++++++++++---------- 1 file changed, 56 insertions(+), 14 deletions(-) diff --git a/p2p/tests/node.rs b/p2p/tests/node.rs index 20e9a111..e40ef6ca 100644 --- a/p2p/tests/node.rs +++ b/p2p/tests/node.rs @@ -10,7 +10,7 @@ use recon::{FullInterests, Recon, ReconInterestProvider}; use test_log::test; use ceramic_p2p::{Config, Metrics, NetworkEvent, Node, PeerKeyInterests}; -use tokio::{sync::mpsc::Receiver, task::JoinHandle}; +use tokio::{sync::mpsc::Receiver, task::JoinHandle, time::timeout}; use tracing::debug; #[derive(Debug)] @@ -78,7 +78,7 @@ impl TestRunnerBuilder { let client = client.try_p2p()?; - let addr = tokio::time::timeout(Duration::from_millis(500), get_addr_loop(client.clone())) + let addr = timeout(Duration::from_millis(500), get_addr_loop(client.clone())) .await .context("timed out before getting a listening address for the node")??; Ok(TestRunner { @@ -130,8 +130,8 @@ async fn test_local_peer_id() -> Result<()> { #[test(tokio::test(flavor = "multi_thread", worker_threads = 2))] async fn test_two_nodes() -> Result<()> { - let test_runner_a = TestRunnerBuilder::new().build().await?; - let test_runner_b = TestRunnerBuilder::new().build().await?; + let mut test_runner_a = TestRunnerBuilder::new().build().await?; + let mut test_runner_b = TestRunnerBuilder::new().build().await?; let addrs_b = vec![test_runner_b.addr.clone()]; debug!(?test_runner_a.peer_id, ?test_runner_b.peer_id, "peer ids"); @@ -143,19 +143,23 @@ async fn test_two_nodes() -> Result<()> { // connect test_runner_a.client.connect(peer_id_b, addrs_b).await?; - // Make sure the peers have had time to negotiate protocols - tokio::time::sleep(Duration::from_millis(3_000)).await; + timeout( + Duration::from_millis(500), + wait_bi_connected(&mut test_runner_a, &mut test_runner_b), + ) + .await + .context("waiting for a and b to connect")?; - is_bi_connected(&test_runner_a, &test_runner_b).await?; + assert!(is_bi_connected(&test_runner_a, &test_runner_b).await?); Ok(()) } #[test(tokio::test(flavor = "multi_thread", worker_threads = 2))] async fn test_three_nodes() -> Result<()> { - let test_runner_a = TestRunnerBuilder::new().build().await?; - let test_runner_b = TestRunnerBuilder::new().build().await?; - let test_runner_c = TestRunnerBuilder::new().build().await?; + let mut test_runner_a = TestRunnerBuilder::new().build().await?; + let mut test_runner_b = TestRunnerBuilder::new().build().await?; + let mut test_runner_c = TestRunnerBuilder::new().build().await?; let peer_id_a = test_runner_a.client.local_peer_id().await?; assert_eq!(test_runner_a.peer_id, peer_id_a); @@ -169,18 +173,37 @@ async fn test_three_nodes() -> Result<()> { .client .connect(peer_id_b, vec![test_runner_b.addr.clone()]) .await?; + + timeout( + Duration::from_millis(500), + wait_bi_connected(&mut test_runner_a, &mut test_runner_b), + ) + .await + .context("waiting for a and b to connect")?; + assert!(is_bi_connected(&test_runner_a, &test_runner_b).await?); + // connect b to c test_runner_b .client .connect(peer_id_c, vec![test_runner_c.addr.clone()]) .await?; - // Make sure the peers have had time to negotiate protocols - tokio::time::sleep(Duration::from_millis(3_000)).await; - - assert!(is_bi_connected(&test_runner_a, &test_runner_b).await?); + timeout( + Duration::from_millis(500), + wait_bi_connected(&mut test_runner_b, &mut test_runner_c), + ) + .await + .context("waiting for b and c to connect")?; assert!(is_bi_connected(&test_runner_b, &test_runner_c).await?); + // We expect that a and c find each other through b and become connected + timeout( + //This one takes longer as we need recon to run and sync peers + Duration::from_millis(3_000), + wait_bi_connected(&mut test_runner_a, &mut test_runner_c), + ) + .await + .context("waiting for a and c to connect")?; assert!(is_bi_connected(&test_runner_a, &test_runner_c).await?); Ok(()) @@ -196,6 +219,25 @@ async fn is_connected(a: &TestRunner, b: &TestRunner) -> Result { Ok(peers.contains_key(&b.peer_id)) } +// Waits until a emits event that it is connected to b +// and b emits an event that it is connected to a. +async fn wait_bi_connected(a: &mut TestRunner, b: &mut TestRunner) { + wait_connected(a, b).await; + wait_connected(b, a).await; +} +// Waits until a emits event that it is connected to b +// and b emits an event that it is connected to a. +async fn wait_connected(a: &mut TestRunner, b: &mut TestRunner) { + while let Some(event) = a.network_events.recv().await { + match event { + NetworkEvent::PeerConnected(id) if id == b.peer_id => { + break; + } + _ => {} + } + } +} + #[test(tokio::test)] async fn test_cancel_listen_for_identify() -> Result<()> { let mut test_runner_a = TestRunnerBuilder::new().build().await?;