Skip to content

Commit

Permalink
fix: faster tests
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Dec 6, 2024
1 parent baea692 commit a6f92ca
Showing 1 changed file with 56 additions and 14 deletions.
70 changes: 56 additions & 14 deletions p2p/tests/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use recon::{FullInterests, Recon, ReconInterestProvider};
use test_log::test;

use ceramic_p2p::{Config, Metrics, NetworkEvent, Node, PeerKeyInterests};
use tokio::{sync::mpsc::Receiver, task::JoinHandle};
use tokio::{sync::mpsc::Receiver, task::JoinHandle, time::timeout};
use tracing::debug;

#[derive(Debug)]
Expand Down Expand Up @@ -78,7 +78,7 @@ impl TestRunnerBuilder {

let client = client.try_p2p()?;

let addr = tokio::time::timeout(Duration::from_millis(500), get_addr_loop(client.clone()))
let addr = timeout(Duration::from_millis(500), get_addr_loop(client.clone()))
.await
.context("timed out before getting a listening address for the node")??;
Ok(TestRunner {
Expand Down Expand Up @@ -130,8 +130,8 @@ async fn test_local_peer_id() -> Result<()> {

#[test(tokio::test(flavor = "multi_thread", worker_threads = 2))]
async fn test_two_nodes() -> Result<()> {
let test_runner_a = TestRunnerBuilder::new().build().await?;
let test_runner_b = TestRunnerBuilder::new().build().await?;
let mut test_runner_a = TestRunnerBuilder::new().build().await?;
let mut test_runner_b = TestRunnerBuilder::new().build().await?;
let addrs_b = vec![test_runner_b.addr.clone()];
debug!(?test_runner_a.peer_id, ?test_runner_b.peer_id, "peer ids");

Expand All @@ -143,19 +143,23 @@ async fn test_two_nodes() -> Result<()> {
// connect
test_runner_a.client.connect(peer_id_b, addrs_b).await?;

// Make sure the peers have had time to negotiate protocols
tokio::time::sleep(Duration::from_millis(3_000)).await;
timeout(
Duration::from_millis(500),
wait_bi_connected(&mut test_runner_a, &mut test_runner_b),
)
.await
.context("waiting for a and b to connect")?;

is_bi_connected(&test_runner_a, &test_runner_b).await?;
assert!(is_bi_connected(&test_runner_a, &test_runner_b).await?);

Ok(())
}

#[test(tokio::test(flavor = "multi_thread", worker_threads = 2))]
async fn test_three_nodes() -> Result<()> {
let test_runner_a = TestRunnerBuilder::new().build().await?;
let test_runner_b = TestRunnerBuilder::new().build().await?;
let test_runner_c = TestRunnerBuilder::new().build().await?;
let mut test_runner_a = TestRunnerBuilder::new().build().await?;
let mut test_runner_b = TestRunnerBuilder::new().build().await?;
let mut test_runner_c = TestRunnerBuilder::new().build().await?;

let peer_id_a = test_runner_a.client.local_peer_id().await?;
assert_eq!(test_runner_a.peer_id, peer_id_a);
Expand All @@ -169,18 +173,37 @@ async fn test_three_nodes() -> Result<()> {
.client
.connect(peer_id_b, vec![test_runner_b.addr.clone()])
.await?;

timeout(
Duration::from_millis(500),
wait_bi_connected(&mut test_runner_a, &mut test_runner_b),
)
.await
.context("waiting for a and b to connect")?;
assert!(is_bi_connected(&test_runner_a, &test_runner_b).await?);

// connect b to c
test_runner_b
.client
.connect(peer_id_c, vec![test_runner_c.addr.clone()])
.await?;

// Make sure the peers have had time to negotiate protocols
tokio::time::sleep(Duration::from_millis(3_000)).await;

assert!(is_bi_connected(&test_runner_a, &test_runner_b).await?);
timeout(
Duration::from_millis(500),
wait_bi_connected(&mut test_runner_b, &mut test_runner_c),
)
.await
.context("waiting for b and c to connect")?;
assert!(is_bi_connected(&test_runner_b, &test_runner_c).await?);

// We expect that a and c find each other through b and become connected
timeout(
//This one takes longer as we need recon to run and sync peers
Duration::from_millis(3_000),
wait_bi_connected(&mut test_runner_a, &mut test_runner_c),
)
.await
.context("waiting for a and c to connect")?;
assert!(is_bi_connected(&test_runner_a, &test_runner_c).await?);

Ok(())
Expand All @@ -196,6 +219,25 @@ async fn is_connected(a: &TestRunner, b: &TestRunner) -> Result<bool> {
Ok(peers.contains_key(&b.peer_id))
}

// Waits until a emits event that it is connected to b
// and b emits an event that it is connected to a.
async fn wait_bi_connected(a: &mut TestRunner, b: &mut TestRunner) {
wait_connected(a, b).await;
wait_connected(b, a).await;
}
// Waits until a emits event that it is connected to b
// and b emits an event that it is connected to a.
async fn wait_connected(a: &mut TestRunner, b: &mut TestRunner) {
while let Some(event) = a.network_events.recv().await {
match event {
NetworkEvent::PeerConnected(id) if id == b.peer_id => {
break;
}
_ => {}
}
}
}

#[test(tokio::test)]
async fn test_cancel_listen_for_identify() -> Result<()> {
let mut test_runner_a = TestRunnerBuilder::new().build().await?;
Expand Down

0 comments on commit a6f92ca

Please sign in to comment.