diff --git a/spectrum-crypto/src/digest.rs b/spectrum-crypto/src/digest.rs index 55f8e89a..a60c5652 100644 --- a/spectrum-crypto/src/digest.rs +++ b/spectrum-crypto/src/digest.rs @@ -7,7 +7,7 @@ use std::marker::PhantomData; use serde::{Deserialize, Serialize, Serializer}; use thiserror::Error; -#[derive(Eq, PartialEq, Copy, Clone, Ord, PartialOrd)] +#[derive(Debug, Eq, PartialEq, Copy, Clone, Ord, PartialOrd)] pub struct Blake2b; #[derive(Eq, PartialEq, Copy, Clone, Ord, PartialOrd)] diff --git a/spectrum-network/Cargo.toml b/spectrum-network/Cargo.toml index 5095ad14..752662d9 100644 --- a/spectrum-network/Cargo.toml +++ b/spectrum-network/Cargo.toml @@ -12,7 +12,7 @@ integration_tests = [] algebra-core = { version = "0.1.0", path = "../algebra-core" } spectrum-crypto = { version = "0.1.0", path = "../spectrum-crypto" } libp2p = { version = "0.51.*", features = ["noise", "yamux", "secp256k1"] } -libp2p-identity = "0.1.1" +libp2p-identity = { version = "0.1.1", features = ["secp256k1"]} futures = "0.3.21" async-std = { version = "1.10.0", features = ["attributes"] } unsigned-varint = { version = "0.7.1", features = ["futures", "asynchronous_codec"] } @@ -39,4 +39,5 @@ group = "0.13.*" nonempty = "0.8.1" [dev-dependencies] -libp2p = { version = "0.51.*", features = ["noise", "yamux", "async-std"] } +libp2p = { version = "0.51.*", features = ["noise", "yamux", "async-std", "secp256k1"] } +log4rs_test_utils = {version = "0.2.3", featuers = ["test_logging"]} diff --git a/spectrum-network/src/network_controller.rs b/spectrum-network/src/network_controller.rs index e6d11570..6b55b4ac 100644 --- a/spectrum-network/src/network_controller.rs +++ b/spectrum-network/src/network_controller.rs @@ -14,7 +14,7 @@ use libp2p::swarm::{ NetworkBehaviour, NotifyHandler, PollParameters, ToSwarm, }; use libp2p::{Multiaddr, PeerId}; -use log::{trace, warn}; +use log::{error, trace, warn}; use crate::one_shot_upgrade::OneShotMessage; use crate::peer_conn_handler::message_sink::MessageSink; @@ -162,6 +162,7 @@ impl NetworkAPI for NetworkMailbox { ); } fn send_one_shot_message(&self, peer: PeerId, protocol: ProtocolTag, message: RawMessage) { + warn!("Sending one shot message!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"); let _ = futures::executor::block_on(self.mailbox_snd.clone().send( NetworkControllerIn::SendOneShotMessage { peer, @@ -311,6 +312,7 @@ where } } ProtocolConfig::OneShot(one_shot) => { + warn!("init_conn_handler: {:?}", one_shot); one_shot_protocols.insert( *protocol_id, OneShotProtocol { @@ -450,6 +452,7 @@ where FromSwarm::DialFailure(DialFailure { peer_id, .. }) => { if let Some(peer_id) = peer_id { + warn!("QQQQQQQ: dialfailure {:?}", peer_id); self.peers.dial_failure(peer_id); } } @@ -478,7 +481,7 @@ where out_channel, handshake, } => { - trace!("Protocol {} opened with peer {}", protocol_tag, peer_id); + error!("Protocol {} opened with peer {}", protocol_tag, peer_id); if let Some(ConnectedPeer::Connected { enabled_protocols, .. }) = self.enabled_peers.get_mut(&peer_id) @@ -487,7 +490,7 @@ where let protocol_ver = protocol_tag.protocol_ver(); match enabled_protocols.entry(protocol_id) { Entry::Occupied(mut entry) => { - trace!( + error!( "Current state of protocol {:?} is {:?}", protocol_id, entry.get().0 @@ -570,6 +573,7 @@ where protocol_tag, content, } => { + panic!("SSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSS"); if let Some((_, han)) = self.supported_protocols.get(&protocol_tag.protocol_id()) { han.incoming_msg(peer_id, protocol_tag.protocol_ver(), content); } @@ -720,6 +724,7 @@ where Entry::Occupied(mut enabled_peer) => match enabled_peer.get_mut() { ConnectedPeer::Connected { conn_id, .. } | ConnectedPeer::PendingApprove(conn_id) => { + panic!("AAA"); // if the peer is enabled already we reuse existing connection self.pending_actions.push_back(ToSwarm::NotifyHandler { peer_id: peer, @@ -740,9 +745,12 @@ where content: message, }); } - ConnectedPeer::PendingDisconnect(_) => {} // todo: wait for disconnect; reconnect? + ConnectedPeer::PendingDisconnect(_) => { + panic!("CCC"); + } // todo: wait for disconnect; reconnect? }, Entry::Vacant(not_enabled_peer) => { + warn!("DDD"); self.pending_actions .push_back(ToSwarm::Dial { opts: peer.into() }); not_enabled_peer.insert(ConnectedPeer::PendingConnect { diff --git a/spectrum-network/src/peer_conn_handler.rs b/spectrum-network/src/peer_conn_handler.rs index 1a351adf..128653c7 100644 --- a/spectrum-network/src/peer_conn_handler.rs +++ b/spectrum-network/src/peer_conn_handler.rs @@ -16,7 +16,7 @@ use libp2p::swarm::{ ConnectionHandler, ConnectionHandlerEvent, KeepAlive, NegotiatedSubstream, SubstreamProtocol, }; use libp2p::PeerId; -use log::trace; +use log::{trace, warn}; use rand::rngs::OsRng; use rand::RngCore; @@ -460,7 +460,7 @@ impl ConnectionHandler for PeerConnHandler { protocol: future::Either::Right(rid), .. }) => { - trace!("{:?} has been fired", rid); + warn!("{:?} has been fired", rid); self.pending_one_shots.remove(&rid); } diff --git a/spectrum-network/src/peer_manager.rs b/spectrum-network/src/peer_manager.rs index eef186c2..21da08de 100644 --- a/spectrum-network/src/peer_manager.rs +++ b/spectrum-network/src/peer_manager.rs @@ -10,7 +10,7 @@ use futures::channel::{mpsc, oneshot}; use futures::{SinkExt, Stream}; use libp2p::swarm::ConnectionId; use libp2p::PeerId; -use log::{error, info, trace}; +use log::{error, info, trace, warn}; use wasm_timer::Delay; use crate::peer_conn_handler::ConnHandlerError; @@ -488,6 +488,7 @@ impl PeerManagerNotificationsBehavior for PeerManager { fn on_connection_established(&mut self, peer_id: PeerId, conn_id: ConnectionId) { if let Some(PeerInState::Connected(mut cp)) = self.state.peer(&peer_id) { + warn!("ConnectedPeer: confirming connection {:?}", cp); cp.confirm_connection(); } else { error!("Peer {} hasn't been acknowledged as connected", peer_id) diff --git a/spectrum-network/src/protocol_handler.rs b/spectrum-network/src/protocol_handler.rs index f4d4512a..6c3f7b6b 100644 --- a/spectrum-network/src/protocol_handler.rs +++ b/spectrum-network/src/protocol_handler.rs @@ -9,7 +9,7 @@ use futures::channel::mpsc::Receiver; use futures::Stream; pub use libp2p::swarm::NetworkBehaviour; use libp2p::PeerId; -use log::{error, trace}; +use log::{error, trace, warn}; use crate::network_controller::NetworkAPI; use crate::peer_conn_handler::message_sink::MessageSink; @@ -254,14 +254,17 @@ where continue; } Poll::Ready(None) => return Poll::Ready(None), // terminate, behaviour is exhausted - Poll::Pending => {} + Poll::Pending => { + warn!("Behaviour returned PENDING"); + } } // 2. Poll incoming events. if let Poll::Ready(Some(notif)) = Stream::poll_next(Pin::new(&mut self.inbox), cx) { + warn!("ProtocolHandler.inbox: {:?}", notif); match notif { ProtocolEvent::Connected(peer_id) => { - trace!("Connected {:?}", peer_id); + warn!("Connected {:?}", peer_id); self.behaviour.inject_peer_connected(peer_id); } ProtocolEvent::Message { @@ -269,6 +272,7 @@ where protocol_ver: negotiated_ver, content, } => { + warn!("ProtocolEvent::Message received"); if let Ok(msg) = codec::decode::< <::TProto as ProtocolSpec>::TMessage, >(content) @@ -330,6 +334,7 @@ where handshake, } => { self.peers.insert(peer_id, sink); + error!("peers: {:?}", self.peers); match handshake.map( codec::decode::< <::TProto as ProtocolSpec>::THandshake, diff --git a/spectrum-network/src/protocol_handler/handel.rs b/spectrum-network/src/protocol_handler/handel.rs index 7df525d8..1271ded6 100644 --- a/spectrum-network/src/protocol_handler/handel.rs +++ b/spectrum-network/src/protocol_handler/handel.rs @@ -7,6 +7,7 @@ use std::time::{Duration, Instant}; use either::{Either, Left, Right}; use libp2p::PeerId; +use log::{trace, warn}; use void::Void; use algebra_core::CommutativePartialSemigroup; @@ -97,6 +98,8 @@ pub struct Handel { level_activation_schedule: Vec>, } +pub type MakeHandel<'a, C, P, PP> = dyn Fn(HandelConfig, C, P, PP) -> Box>; + impl Handel where C: CommutativePartialSemigroup + Weighted + VerifiableAgainst

+ Eq + Clone + Debug, @@ -323,7 +326,7 @@ where /// Sends messages for one node from each active level. fn run_dissemination(&mut self) { let own_contrib = self.get_own_contribution(); - for (lix, lvl) in &mut self.levels.iter_mut().enumerate() { + for (lix, lvl) in &mut self.levels.iter_mut().enumerate().skip(1) { if let Some(active_lvl) = lvl { let peers_at_level = self.peer_partitions.peers_at_level(lix, PeerOrd::CVP); let maybe_next_peer = active_lvl @@ -470,9 +473,12 @@ where self.try_disseminatate(); self.try_activate_levels(); if let Some(out) = self.outbox.pop_front() { + warn!("Handel::poll: return Left({:?})", out); + return Poll::Ready(Left(out)); } if let Some(ca) = self.get_complete_aggregate() { + warn!("Handel::poll: return Right(ca)"); return Poll::Ready(Right(ca)); } Poll::Pending diff --git a/spectrum-network/src/protocol_handler/handel/partitioning.rs b/spectrum-network/src/protocol_handler/handel/partitioning.rs index 966566bd..cab38e88 100644 --- a/spectrum-network/src/protocol_handler/handel/partitioning.rs +++ b/spectrum-network/src/protocol_handler/handel/partitioning.rs @@ -95,8 +95,9 @@ pub struct BinomialPeerPartitions { rng: R, } +#[derive(Clone)] pub struct MakeBinomialPeerPartitions { - rng: R, + pub rng: R, } impl MakePeerPartitions for MakeBinomialPeerPartitions @@ -149,11 +150,15 @@ where .collect::>() }) .collect::>(); + let partitions_by_vp = ordered_by_vp(&rng, cleared_partitions.clone(), host_peer_ix); + let partitions_by_cvp = ordered_by_cvp(&rng, cleared_partitions, host_peer_ix, num_nodes); + println!("partitions_by_vp: {:?}", partitions_by_vp); + println!("partitions_by_cvp: {:?}", partitions_by_cvp); Self { peers: all_peers, peer_index: total_index, - partitions_by_vp: ordered_by_vp(&rng, cleared_partitions.clone(), host_peer_ix), - partitions_by_cvp: ordered_by_cvp(&rng, cleared_partitions, host_peer_ix, num_nodes), + partitions_by_vp, + partitions_by_cvp, rng, } } diff --git a/spectrum-network/src/protocol_handler/sigma_aggregation.rs b/spectrum-network/src/protocol_handler/sigma_aggregation.rs index 75d079d5..432f85b2 100644 --- a/spectrum-network/src/protocol_handler/sigma_aggregation.rs +++ b/spectrum-network/src/protocol_handler/sigma_aggregation.rs @@ -10,6 +10,7 @@ use futures::Stream; use k256::{Scalar, SecretKey}; use libp2p::PeerId; +use log::{trace, warn}; use spectrum_crypto::digest::Digest256; use crate::protocol::SIGMA_AGGR_PROTOCOL_ID; @@ -21,7 +22,7 @@ use crate::protocol_handler::sigma_aggregation::crypto::{ pre_commitment, response, schnorr_commitment_pair, }; use crate::protocol_handler::sigma_aggregation::message::{ - SigmaAggrMessage, SigmaAggrMessageV1, SigmaAggrSpec, + SigmaAggrMessage, SigmaAggrMessageV1, SigmaAggrSpec, SIGMA_AGGR_V1, }; use crate::protocol_handler::sigma_aggregation::types::{ AggregateCommitment, Commitment, CommitmentSecret, CommitmentsVerifInput, CommitmentsWithProofs, @@ -32,8 +33,8 @@ use crate::protocol_handler::ProtocolBehaviour; use crate::protocol_handler::ProtocolBehaviourOut; use crate::types::ProtocolId; -mod crypto; -mod message; +pub mod crypto; +pub mod message; pub mod types; struct AggregatePreCommitments<'a, H, PP> { @@ -53,12 +54,12 @@ struct AggregatePreCommitments<'a, H, PP> { host_commitment: Commitment, /// `σ_i`. Dlog proof of knowledge for `Y_i`. host_explusion_proof: Signature, - handel: Box>, + handel: Box + Send>, } impl<'a, H, PP> AggregatePreCommitments<'a, H, PP> where - PP: PeerPartitions + 'a, + PP: PeerPartitions + Send + 'a, { fn init>( host_sk: SecretKey, @@ -70,6 +71,7 @@ where let host_pk = PublicKey::from(host_sk.clone()); let host_pid = PeerId::from(host_pk); let peers = committee.iter().map(PeerId::from).collect(); + warn!("AggregatePreCommitments::init: peers = {:?}", peers); let partitions = partitioner.make(host_pid, peers); let committee_indexed = committee .iter() @@ -118,6 +120,7 @@ where pre_commitments, message_digest_bytes: self.message_digest.as_ref().to_vec(), }; + warn!("AggregatePreCommitments::complete-------------------------------------------"); AggregateSchnorrCommitments { host_sk: self.host_sk, host_ix: self.host_ix, @@ -154,12 +157,12 @@ struct AggregateSchnorrCommitments<'a, H, PP> { host_commitment: Commitment, /// `σ_i`. Dlog proof of knowledge for `Y_i`. host_explusion_proof: Signature, - handel: Box>, + handel: Box + Send>, } impl<'a, H, PP> AggregateSchnorrCommitments<'a, H, PP> where - PP: PeerPartitions + 'a, + PP: PeerPartitions + Send + 'a, { fn complete( self, @@ -231,7 +234,7 @@ struct AggregateResponses<'a, H, PP> { /// `σ_i`. Dlog proof of knowledge for `Y_i`. host_explusion_proof: Signature, commitments_with_proofs: CommitmentsWithProofs, - handel: Box>, + handel: Box + Send>, } impl<'a, H, PP> AggregateResponses<'a, H, PP> { @@ -284,11 +287,33 @@ where outbox: VecDeque>, } +impl<'a, H, MPP> SigmaAggregation<'a, H, MPP> +where + MPP: MakePeerPartitions + Clone + Send, +{ + pub fn new( + host_sk: SecretKey, + handel_conf: HandelConfig, + partitioner: MPP, + inbox: Receiver>, + outbox: VecDeque>, + ) -> Self { + Self { + host_sk, + handel_conf, + task: None, + partitioner, + inbox, + outbox, + } + } +} + impl<'a, H, MPP> ProtocolBehaviour for SigmaAggregation<'a, H, MPP> where H: Debug, - MPP: MakePeerPartitions + Clone, - MPP::PP: 'a, + MPP: MakePeerPartitions + Send + Clone, + MPP::PP: 'a + Send, { type TProto = SigmaAggrSpec; @@ -301,6 +326,7 @@ where peer_id: PeerId, SigmaAggrMessage::SigmaAggrMessageV1(msg): SigmaAggrMessage, ) { + warn!("SigmaAggregation::inject_message(): {:?}", msg); match &mut self.task { Some(AggregationTask { state: AggregationState::AggregatePreCommitments(ref mut pre_commitment), @@ -334,8 +360,10 @@ where &mut self, cx: &mut Context<'_>, ) -> Poll>> { + warn!("SigmaAggregation::poll: ENTERING"); loop { if let Some(out) = self.outbox.pop_front() { + warn!("ZZZZ: {:?}", out); return Poll::Ready(Some(out)); } @@ -358,6 +386,7 @@ where }); } } + continue; } if let Some(task) = self.task.take() { @@ -366,74 +395,84 @@ where state: AggregationState::AggregatePreCommitments(mut st), channel, } => { + //println!("SigmaAggregation::poll: inbox received Reset({:?})", new_message); match st.handel.poll(cx) { - Poll::Ready(out) => match out { - Either::Left(cmd) => match cmd { - ProtocolBehaviourOut::Send { peer_id, message } => { - self.outbox.push_back(ProtocolBehaviourOut::Send { - peer_id, - message: SigmaAggrMessage::SigmaAggrMessageV1( - SigmaAggrMessageV1::PreCommitments(message), + Poll::Ready(out) => { + match out { + Either::Left(cmd) => match cmd { + ProtocolBehaviourOut::Send { peer_id, message } => { + warn!("AAAAAAAAAAAAAAAAAAAAAA"); + self.outbox.push_back(ProtocolBehaviourOut::NetworkAction( + NetworkAction::SendOneShotMessage { + peer: peer_id, + use_version: SIGMA_AGGR_V1, + message: SigmaAggrMessage::SigmaAggrMessageV1( + SigmaAggrMessageV1::PreCommitments(message), + ), + }, + )); + } + ProtocolBehaviourOut::NetworkAction(NetworkAction::BanPeer(pid)) => { + self.outbox.push_back(ProtocolBehaviourOut::NetworkAction( + NetworkAction::BanPeer(pid), + )); + } + ProtocolBehaviourOut::NetworkAction(_) => {} + }, + Either::Right(pre_commitments) => { + self.task = Some(AggregationTask { + state: AggregationState::AggregateSchnorrCommitments( + st.complete(pre_commitments, self.handel_conf), ), + channel, }); } - ProtocolBehaviourOut::NetworkAction(NetworkAction::BanPeer(pid)) => { - self.outbox.push_back(ProtocolBehaviourOut::NetworkAction( - NetworkAction::BanPeer(pid), - )); - } - ProtocolBehaviourOut::NetworkAction(_) => {} - }, - Either::Right(pre_commitments) => { - self.task = Some(AggregationTask { - state: AggregationState::AggregateSchnorrCommitments( - st.complete(pre_commitments, self.handel_conf), - ), - channel, - }); - continue; } - }, + continue; + } Poll::Pending => {} } self.task = Some(AggregationTask { state: AggregationState::AggregatePreCommitments(st), channel, }); + warn!("Taken task: AggregatePreCommitments"); } AggregationTask { state: AggregationState::AggregateSchnorrCommitments(mut st), channel, } => { match st.handel.poll(cx) { - Poll::Ready(out) => match out { - Either::Left(cmd) => match cmd { - ProtocolBehaviourOut::Send { peer_id, message } => { - self.outbox.push_back(ProtocolBehaviourOut::Send { - peer_id, - message: SigmaAggrMessage::SigmaAggrMessageV1( - SigmaAggrMessageV1::Commitments(message), + Poll::Ready(out) => { + match out { + Either::Left(cmd) => match cmd { + ProtocolBehaviourOut::Send { peer_id, message } => { + self.outbox.push_back(ProtocolBehaviourOut::Send { + peer_id, + message: SigmaAggrMessage::SigmaAggrMessageV1( + SigmaAggrMessageV1::Commitments(message), + ), + }); + } + ProtocolBehaviourOut::NetworkAction(NetworkAction::BanPeer(pid)) => { + self.outbox.push_back(ProtocolBehaviourOut::NetworkAction( + NetworkAction::BanPeer(pid), + )); + } + ProtocolBehaviourOut::NetworkAction(_) => {} + }, + + Either::Right(commitments) => { + self.task = Some(AggregationTask { + state: AggregationState::AggregateResponses( + st.complete(commitments, self.handel_conf), ), + channel, }); } - ProtocolBehaviourOut::NetworkAction(NetworkAction::BanPeer(pid)) => { - self.outbox.push_back(ProtocolBehaviourOut::NetworkAction( - NetworkAction::BanPeer(pid), - )); - } - ProtocolBehaviourOut::NetworkAction(_) => {} - }, - - Either::Right(commitments) => { - self.task = Some(AggregationTask { - state: AggregationState::AggregateResponses( - st.complete(commitments, self.handel_conf), - ), - channel, - }); - continue; } - }, + continue; + } Poll::Pending => {} } self.task = Some(AggregationTask { @@ -446,33 +485,35 @@ where channel, } => { match st.handel.poll(cx) { - Poll::Ready(out) => match out { - Either::Left(cmd) => match cmd { - ProtocolBehaviourOut::Send { peer_id, message } => { - self.outbox.push_back(ProtocolBehaviourOut::Send { - peer_id, - message: SigmaAggrMessage::SigmaAggrMessageV1( - SigmaAggrMessageV1::Responses(message), - ), - }); - } - ProtocolBehaviourOut::NetworkAction(NetworkAction::BanPeer(pid)) => { - self.outbox.push_back(ProtocolBehaviourOut::NetworkAction( - NetworkAction::BanPeer(pid), - )); - } - ProtocolBehaviourOut::NetworkAction(_) => {} - }, - Either::Right(responses) => { - self.task = None; - let res = st.complete(responses); - // todo: support error case. - if channel.send(Ok(res)).is_err() { - // warn here. + Poll::Ready(out) => { + match out { + Either::Left(cmd) => match cmd { + ProtocolBehaviourOut::Send { peer_id, message } => { + self.outbox.push_back(ProtocolBehaviourOut::Send { + peer_id, + message: SigmaAggrMessage::SigmaAggrMessageV1( + SigmaAggrMessageV1::Responses(message), + ), + }); + } + ProtocolBehaviourOut::NetworkAction(NetworkAction::BanPeer(pid)) => { + self.outbox.push_back(ProtocolBehaviourOut::NetworkAction( + NetworkAction::BanPeer(pid), + )); + } + ProtocolBehaviourOut::NetworkAction(_) => {} + }, + Either::Right(responses) => { + self.task = None; + let res = st.complete(responses); + // todo: support error case. + if channel.send(Ok(res)).is_err() { + // warn here. + } } - continue; } - }, + continue; + } Poll::Pending => {} } self.task = Some(AggregationTask { @@ -482,6 +523,7 @@ where } } } + warn!("SigmaAggregation::poll: EXITING"); return Poll::Pending; } } diff --git a/spectrum-network/src/protocol_handler/sigma_aggregation/message.rs b/spectrum-network/src/protocol_handler/sigma_aggregation/message.rs index 84a452a6..4ca5b492 100644 --- a/spectrum-network/src/protocol_handler/sigma_aggregation/message.rs +++ b/spectrum-network/src/protocol_handler/sigma_aggregation/message.rs @@ -1,7 +1,7 @@ use serde::{Deserialize, Serialize}; use crate::protocol_handler::handel::message::HandelMessage; -use crate::protocol_handler::sigma_aggregation::types::{CommitmentsWithProofs, Responses, PreCommitments}; +use crate::protocol_handler::sigma_aggregation::types::{CommitmentsWithProofs, PreCommitments, Responses}; use crate::protocol_handler::versioning::Versioned; use crate::protocol_handler::ProtocolSpec; use crate::types::ProtocolVer; diff --git a/spectrum-network/tests/integration_tests/mod.rs b/spectrum-network/tests/integration_tests/mod.rs index 7af0b074..9924670c 100644 --- a/spectrum-network/tests/integration_tests/mod.rs +++ b/spectrum-network/tests/integration_tests/mod.rs @@ -1,13 +1,22 @@ -use std::{collections::HashMap, time::Duration}; +use std::{ + collections::{HashMap, HashSet, VecDeque}, + time::Duration, +}; +use elliptic_curve::rand_core::OsRng; use futures::channel::mpsc::Sender; use futures::{ channel::{mpsc, oneshot}, SinkExt, StreamExt, }; +use k256::SecretKey; use libp2p::swarm::SwarmBuilder; use libp2p::{identity, swarm::SwarmEvent, Multiaddr, PeerId, Swarm}; +use log::{warn, LevelFilter}; +use log4rs_test_utils::test_logging::init_logging_once_for; +use rand::Rng; +use spectrum_crypto::digest::{Blake2b, Digest256}; use spectrum_network::protocol::{OneShotProtocolConfig, OneShotProtocolSpec, ProtocolConfig}; use spectrum_network::protocol_api::ProtocolEvent; use spectrum_network::types::{ProtocolTag, RawMessage}; @@ -19,9 +28,19 @@ use spectrum_network::{ peers_state::PeerRepo, NetworkingConfig, PeerManager, PeerManagerConfig, PeersMailbox, }, - protocol::{StatefulProtocolConfig, StatefulProtocolSpec, SYNC_PROTOCOL_ID}, + protocol::{StatefulProtocolConfig, StatefulProtocolSpec, SIGMA_AGGR_PROTOCOL_ID, SYNC_PROTOCOL_ID}, protocol_api::ProtocolMailbox, protocol_handler::{ + aggregation::AggregationAction, + handel::{ + partitioning::{MakeBinomialPeerPartitions, PseudoRandomGenPerm}, + HandelConfig, Threshold, + }, + sigma_aggregation::{ + message::{SigmaAggrMessage, SigmaAggrSpec, SIGMA_AGGR_V1}, + types::PublicKey, + SigmaAggregation, + }, sync::{ message::{SyncMessage, SyncMessageV1, SyncSpec}, NodeStatus, SyncBehaviour, @@ -106,12 +125,22 @@ async fn one_shot_messaging() { // Though we spawn multiple tasks we use this single channel for messaging. let (msg_tx, _msg_rx) = mpsc::channel::<(Peer, Msg)>(10); - let (abortable_peer_0, handle_0) = futures::future::abortable( - create_swarm::>(local_key_0, nc_0, addr_0, Peer::First, msg_tx.clone()), - ); - let (abortable_peer_1, handle_1) = futures::future::abortable( - create_swarm::>(local_key_1, nc_1, addr_1, Peer::Second, msg_tx), - ); + let (abortable_peer_0, handle_0) = + futures::future::abortable(create_swarm::, Peer>( + local_key_0, + nc_0, + addr_0, + Peer::First, + msg_tx.clone(), + )); + let (abortable_peer_1, handle_1) = + futures::future::abortable(create_swarm::, Peer>( + local_key_1, + nc_1, + addr_1, + Peer::Second, + msg_tx, + )); let (cancel_tx_0, cancel_rx_0) = oneshot::channel::<()>(); let (cancel_tx_1, cancel_rx_1) = oneshot::channel::<()>(); @@ -209,8 +238,10 @@ async fn integration_test_0() { // Though we spawn multiple tasks we use this single channel for messaging. let (msg_tx, mut msg_rx) = mpsc::channel::<(Peer, Msg)>(10); - let (mut sync_handler_0, nc_0) = make_swarm_components(peers_0, sync_behaviour_0, 10); - let (mut sync_handler_1, nc_1) = make_swarm_components(peers_1, sync_behaviour_1, 10); + let (mut sync_handler_0, nc_0) = + make_swarm_components(peers_0, sync_behaviour_0, SYNC_PROTOCOL_ID, SyncSpec::v1(), 10); + let (mut sync_handler_1, nc_1) = + make_swarm_components(peers_1, sync_behaviour_1, SYNC_PROTOCOL_ID, SyncSpec::v1(), 10); let mut msg_tx_sync_handler_0 = msg_tx.clone(); let sync_handler_0_handle = async_std::task::spawn(async move { @@ -232,12 +263,22 @@ async fn integration_test_0() { } }); - let (abortable_peer_0, handle_0) = futures::future::abortable( - create_swarm::>(local_key_0, nc_0, addr_0, Peer::First, msg_tx.clone()), - ); - let (abortable_peer_1, handle_1) = futures::future::abortable( - create_swarm::>(local_key_1, nc_1, addr_1, Peer::Second, msg_tx), - ); + let (abortable_peer_0, handle_0) = + futures::future::abortable(create_swarm::, Peer>( + local_key_0, + nc_0, + addr_0, + Peer::First, + msg_tx.clone(), + )); + let (abortable_peer_1, handle_1) = + futures::future::abortable(create_swarm::, Peer>( + local_key_1, + nc_1, + addr_1, + Peer::Second, + msg_tx, + )); let (cancel_tx_0, cancel_rx_0) = oneshot::channel::<()>(); let (cancel_tx_1, cancel_rx_1) = oneshot::channel::<()>(); @@ -377,8 +418,10 @@ async fn integration_test_1() { let (msg_tx, msg_rx) = mpsc::channel::<(Peer, Msg)>(10); let (fake_msg_tx, fake_msg_rx) = mpsc::channel::<(Peer, Msg)>(10); - let (mut sync_handler_0, nc_0) = make_swarm_components(peers_0, sync_behaviour_0, 10); - let (mut sync_handler_1, nc_1) = make_swarm_components(peers_1, fake_sync_behaviour, 10); + let (mut sync_handler_0, nc_0) = + make_swarm_components(peers_0, sync_behaviour_0, SYNC_PROTOCOL_ID, SyncSpec::v1(), 10); + let (mut sync_handler_1, nc_1) = + make_swarm_components(peers_1, fake_sync_behaviour, SYNC_PROTOCOL_ID, SyncSpec::v1(), 10); let mut msg_tx_sync_handler_0 = msg_tx.clone(); let sync_handler_0_handle = async_std::task::spawn(async move { @@ -400,11 +443,16 @@ async fn integration_test_1() { } }); - let (abortable_peer_0, handle_0) = futures::future::abortable( - create_swarm::>(local_key_0, nc_0, addr_0, Peer::First, msg_tx), - ); + let (abortable_peer_0, handle_0) = + futures::future::abortable(create_swarm::, Peer>( + local_key_0, + nc_0, + addr_0, + Peer::First, + msg_tx, + )); let (abortable_peer_1, handle_1) = - futures::future::abortable(create_swarm::>( + futures::future::abortable(create_swarm::, Peer>( local_key_1, nc_1, addr_1, @@ -568,8 +616,20 @@ async fn integration_test_peer_punish_too_slow() { // It's crucial to have a buffer of size 1 for this test let msg_buffer_size = 1; - let (mut sync_handler_0, nc_0) = make_swarm_components(peers_0, sync_behaviour_0, msg_buffer_size); - let (mut sync_handler_1, nc_1) = make_swarm_components(peers_1, sync_behaviour_1, msg_buffer_size); + let (mut sync_handler_0, nc_0) = make_swarm_components( + peers_0, + sync_behaviour_0, + SYNC_PROTOCOL_ID, + SyncSpec::v1(), + msg_buffer_size, + ); + let (mut sync_handler_1, nc_1) = make_swarm_components( + peers_1, + sync_behaviour_1, + SYNC_PROTOCOL_ID, + SyncSpec::v1(), + msg_buffer_size, + ); let mut msg_tx_sync_handler_0 = msg_tx.clone(); let sync_handler_0_handle = async_std::task::spawn(async move { @@ -592,7 +652,7 @@ async fn integration_test_peer_punish_too_slow() { }); let (abortable_peer_0, handle_0) = - futures::future::abortable(create_swarm::>( + futures::future::abortable(create_swarm::, Peer>( local_key_0, nc_0, addr_0, @@ -600,7 +660,7 @@ async fn integration_test_peer_punish_too_slow() { msg_tx.clone(), )); let (abortable_peer_1, handle_1) = - futures::future::abortable(create_swarm::>( + futures::future::abortable(create_swarm::, Peer>( local_key_1, nc_1, addr_1, @@ -746,9 +806,12 @@ async fn integration_test_2() { // Though we spawn multiple tasks we use this single channel for messaging. let (msg_tx, mut msg_rx) = mpsc::channel::<(Peer, Msg)>(10); - let (mut sync_handler_0, nc_0) = make_swarm_components(peers_0, sync_behaviour_0, 10); - let (mut sync_handler_1, nc_1) = make_swarm_components(peers_1, sync_behaviour_1, 10); - let (mut sync_handler_2, nc_2) = make_swarm_components(peers_2, sync_behaviour_2, 10); + let (mut sync_handler_0, nc_0) = + make_swarm_components(peers_0, sync_behaviour_0, SYNC_PROTOCOL_ID, SyncSpec::v1(), 10); + let (mut sync_handler_1, nc_1) = + make_swarm_components(peers_1, sync_behaviour_1, SYNC_PROTOCOL_ID, SyncSpec::v1(), 10); + let (mut sync_handler_2, nc_2) = + make_swarm_components(peers_2, sync_behaviour_2, SYNC_PROTOCOL_ID, SyncSpec::v1(), 10); let mut msg_tx_sync_handler_0 = msg_tx.clone(); let sync_handler_0_handle = async_std::task::spawn(async move { @@ -781,7 +844,7 @@ async fn integration_test_2() { }); let (abortable_peer_0, handle_0) = - futures::future::abortable(create_swarm::>( + futures::future::abortable(create_swarm::, Peer>( local_key_0, nc_0, addr_0.clone(), @@ -789,16 +852,21 @@ async fn integration_test_2() { msg_tx.clone(), )); let (abortable_peer_1, handle_1) = - futures::future::abortable(create_swarm::>( + futures::future::abortable(create_swarm::, Peer>( local_key_1, nc_1, addr_1.clone(), Peer::Second, msg_tx.clone(), )); - let (abortable_peer_2, handle_2) = futures::future::abortable( - create_swarm::>(local_key_2, nc_2, addr_2.clone(), Peer::Third, msg_tx), - ); + let (abortable_peer_2, handle_2) = + futures::future::abortable(create_swarm::, Peer>( + local_key_2, + nc_2, + addr_2.clone(), + Peer::Third, + msg_tx, + )); let (cancel_tx_0, cancel_rx_0) = oneshot::channel::<()>(); let (cancel_tx_1, cancel_rx_1) = oneshot::channel::<()>(); let (cancel_tx_2, cancel_rx_2) = oneshot::channel::<()>(); @@ -908,9 +976,659 @@ async fn integration_test_2() { ); } +fn k256_to_libsecp255k1(secret_key: SecretKey) -> identity::secp256k1::SecretKey { + identity::secp256k1::SecretKey::from_bytes(secret_key.to_bytes().as_mut_slice()).unwrap() +} +#[cfg_attr(feature = "test_peer_punish_too_slow", ignore)] +#[async_std::test] +async fn integration_test_handel_2_peers() { + // -------- -------- -------- + // | peer_0 | ~~~~~~~~> | peer_1 | ~~~~~~~~> | peer_2 | + // -------- -------- -------- + // ^ | + // | | + // | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~| + // + // In this scenario `peer_0`, `peer_1` and `peer_2` has `peer_1`, `peer_2` and `peer_0` as a + // bootstrap-peer, respectively (indicated by the arrows) + init_logging_once_for(vec![], LevelFilter::Warn, None); + + #[derive(Clone, Copy, Debug, PartialEq, Eq)] + enum Peer2 { + /// The tag for `peer_0` + First, + /// The tag for `peer_1` + Second, + } + + let mut rng = OsRng; + let sk_0 = SecretKey::random(&mut rng); + let sk_1 = SecretKey::random(&mut rng); + + let local_key_0 = identity::Keypair::from(identity::secp256k1::Keypair::from(k256_to_libsecp255k1( + sk_0.clone(), + ))); + let local_peer_id_0 = PeerId::from(local_key_0.public()); + let local_key_1 = identity::Keypair::from(identity::secp256k1::Keypair::from(k256_to_libsecp255k1( + sk_1.clone(), + ))); + let local_peer_id_1 = PeerId::from(local_key_1.public()); + + let addr_0: Multiaddr = "/ip4/127.0.0.1/tcp/1240".parse().unwrap(); + let addr_1: Multiaddr = "/ip4/127.0.0.1/tcp/1241".parse().unwrap(); + let peers_0 = vec![PeerDestination::PeerIdWithAddr(local_peer_id_1, addr_1.clone())]; + let peers_1 = vec![PeerDestination::PeerIdWithAddr(local_peer_id_0, addr_0.clone())]; + + let handel_conf = HandelConfig { + threshold: Threshold { num: 2, denom: 2 }, + window_shrinking_factor: 4, + initial_scoring_window: 3, + fast_path_window: 10, + dissemination_interval: Duration::from_millis(20), + level_activation_delay: Duration::from_millis(50), + }; + let seed = rng.gen::<[u8; 32]>(); + + let new_committee: HashSet = + vec![PublicKey::from(sk_0.clone()), PublicKey::from(sk_1.clone())] + .into_iter() + .collect(); + let new_message = Digest256::::from([0; 32]); + + let (channel_0, rx_aggregation_0) = futures::channel::oneshot::channel(); + let (channel_1, rx_aggregation_1) = futures::channel::oneshot::channel(); + + let action_0 = AggregationAction::Reset { + new_committee: new_committee.clone(), + new_message, + channel: channel_0, + }; + let action_1 = AggregationAction::Reset { + new_committee: new_committee.clone(), + new_message, + channel: channel_1, + }; + + let gen_perm = PseudoRandomGenPerm::new(seed); + let (mut tx_0, inbox_0) = futures::channel::mpsc::channel::>(10); + let (mut tx_1, inbox_1) = futures::channel::mpsc::channel::>(10); + let sig_aggr_0 = SigmaAggregation::new( + sk_0, + handel_conf, + MakeBinomialPeerPartitions { + rng: gen_perm.clone(), + }, + inbox_0, + VecDeque::new(), + ); + let sig_aggr_1 = SigmaAggregation::new( + sk_1, + handel_conf, + MakeBinomialPeerPartitions { + rng: gen_perm.clone(), + }, + inbox_1, + VecDeque::new(), + ); + let sync_behaviour_0 = |_| sig_aggr_0; + let sync_behaviour_1 = |_| sig_aggr_1; + + // Though we spawn multiple tasks we use this single channel for messaging. + let (msg_tx, mut msg_rx) = mpsc::channel::<(Peer2, Msg)>(10); + + let (mut sync_handler_0, nc_0) = make_oneshot_swarm_components( + peers_0, + sync_behaviour_0, + SIGMA_AGGR_PROTOCOL_ID, + SIGMA_AGGR_V1, + 10, + ); + let (mut sync_handler_1, nc_1) = make_oneshot_swarm_components( + peers_1, + sync_behaviour_1, + SIGMA_AGGR_PROTOCOL_ID, + SIGMA_AGGR_V1, + 10, + ); + + let mut msg_tx_sync_handler_0 = msg_tx.clone(); + let sync_handler_0_handle = async_std::task::spawn(async move { + loop { + let msg = sync_handler_0.select_next_some().await; + msg_tx_sync_handler_0 + .try_send((Peer2::First, Msg::Protocol(msg))) + .unwrap(); + } + }); + + let mut msg_tx_sync_handler_1 = msg_tx.clone(); + let sync_handler_1_handle = async_std::task::spawn(async move { + loop { + let msg = sync_handler_1.select_next_some().await; + msg_tx_sync_handler_1 + .try_send((Peer2::Second, Msg::Protocol(msg))) + .unwrap(); + } + }); + + let (abortable_peer_0, handle_0) = + futures::future::abortable(create_swarm::< + SigmaAggregation>, + Peer2, + >( + local_key_0, nc_0, addr_0.clone(), Peer2::First, msg_tx.clone() + )); + let (abortable_peer_1, handle_1) = + futures::future::abortable(create_swarm::< + SigmaAggregation>, + Peer2, + >(local_key_1, nc_1, addr_1.clone(), Peer2::Second, msg_tx)); + + let (cancel_tx_0, cancel_rx_0) = oneshot::channel::<()>(); + let (cancel_tx_1, cancel_rx_1) = oneshot::channel::<()>(); + + let secs = 10; + + // Spawn tasks for peer_0 + async_std::task::spawn(async move { + let _ = cancel_rx_0.await; + handle_0.abort(); + sync_handler_0_handle.cancel().await; + }); + async_std::task::spawn(async move { + wasm_timer::Delay::new(Duration::from_secs(secs)).await.unwrap(); + cancel_tx_0.send(()).unwrap(); + }); + async_std::task::spawn(abortable_peer_0); + + // Spawn tasks for peer_1 + async_std::task::spawn(async move { + let _ = cancel_rx_1.await; + handle_1.abort(); + sync_handler_1_handle.cancel().await; + }); + async_std::task::spawn(async move { + wasm_timer::Delay::new(Duration::from_secs(secs)).await.unwrap(); + cancel_tx_1.send(()).unwrap(); + }); + async_std::task::spawn(abortable_peer_1); + + //wasm_timer::Delay::new(Duration::from_millis(20)).await.unwrap(); + // Send Reset message + tx_0.try_send(action_0).unwrap(); + tx_1.try_send(action_1).unwrap(); + + // Collect messages from the peers. Note that the while loop below will end since all tasks that + // use clones of `msg_tx` are guaranteed to drop, leading to the senders dropping too. + let mut nc_peer_0 = vec![]; + let mut nc_peer_1 = vec![]; + let mut prot_peer_0 = vec![]; + let mut prot_peer_1 = vec![]; + while let Some((peer, msg)) = msg_rx.next().await { + match msg { + Msg::NetworkController(nc_msg) => match peer { + Peer2::First => nc_peer_0.push(nc_msg), + Peer2::Second => nc_peer_1.push(nc_msg), + }, + Msg::Protocol(p_msg) => match peer { + Peer2::First => prot_peer_0.push(p_msg), + Peer2::Second => prot_peer_1.push(p_msg), + }, + } + } + warn!("Loop passed"); + let res_0 = rx_aggregation_0.await; + let res_1 = rx_aggregation_1.await; + dbg!(res_0); + + dbg!(&nc_peer_0); + dbg!(&nc_peer_1); + dbg!(&prot_peer_0); + dbg!(&prot_peer_1); + + // Check that `peer_0` is sending out the necessary `Peers` messages. + //assert!( + // prot_peer_0.contains(&SyncMessage::SyncMessageV1(SyncMessageV1::Peers(vec![ + // PeerDestination::PeerIdWithAddr(local_peer_id_1, addr_1) + // ]))) + //); + //assert!( + // prot_peer_0.contains(&SyncMessage::SyncMessageV1(SyncMessageV1::Peers(vec![ + // PeerDestination::PeerId(local_peer_id_2) + // ]))) + //); + + //// Check that `peer_1` is sending out the necessary `Peers` messages. + //assert!( + // prot_peer_1.contains(&SyncMessage::SyncMessageV1(SyncMessageV1::Peers(vec![ + // PeerDestination::PeerIdWithAddr(local_peer_id_2, addr_2) + // ]))) + //); + //assert!( + // prot_peer_1.contains(&SyncMessage::SyncMessageV1(SyncMessageV1::Peers(vec![ + // PeerDestination::PeerId(local_peer_id_0) + // ]))) + //); + + //// Check that `peer_2` is sending out the necessary `Peers` messages. + //assert!( + // prot_peer_2.contains(&SyncMessage::SyncMessageV1(SyncMessageV1::Peers(vec![ + // PeerDestination::PeerIdWithAddr(local_peer_id_0, addr_0) + // ]))) + //); + //assert!( + // prot_peer_2.contains(&SyncMessage::SyncMessageV1(SyncMessageV1::Peers(vec![ + // PeerDestination::PeerId(local_peer_id_1) + // ]))) + //); +} + +#[cfg_attr(feature = "test_peer_punish_too_slow", ignore)] +#[async_std::test] +async fn integration_test_handel_4_peers() { + // -------- -------- -------- + // | peer_0 | ~~~~~~~~> | peer_1 | ~~~~~~~~> | peer_2 | + // -------- -------- -------- + // ^ | + // | | + // | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~| + // + // In this scenario `peer_0`, `peer_1` and `peer_2` has `peer_1`, `peer_2` and `peer_0` as a + // bootstrap-peer, respectively (indicated by the arrows) + init_logging_once_for(vec![], LevelFilter::Error, None); + + #[derive(Clone, Copy, Debug, PartialEq, Eq)] + enum Peer4 { + /// The tag for `peer_0` + First, + /// The tag for `peer_1` + Second, + /// The tag for `peer_2` + Third, + /// The tag for `peer_3` + Fourth, + } + + let mut rng = OsRng; + let sk_0 = SecretKey::random(&mut rng); + let sk_1 = SecretKey::random(&mut rng); + let sk_2 = SecretKey::random(&mut rng); + let sk_3 = SecretKey::random(&mut rng); + + let local_key_0 = identity::Keypair::from(identity::secp256k1::Keypair::from(k256_to_libsecp255k1( + sk_0.clone(), + ))); + let local_peer_id_0 = PeerId::from(local_key_0.public()); + let local_key_1 = identity::Keypair::from(identity::secp256k1::Keypair::from(k256_to_libsecp255k1( + sk_1.clone(), + ))); + let local_peer_id_1 = PeerId::from(local_key_1.public()); + let local_key_2 = identity::Keypair::from(identity::secp256k1::Keypair::from(k256_to_libsecp255k1( + sk_2.clone(), + ))); + let local_peer_id_2 = PeerId::from(local_key_2.public()); + let local_key_3 = identity::Keypair::from(identity::secp256k1::Keypair::from(k256_to_libsecp255k1( + sk_3.clone(), + ))); + let local_peer_id_3 = PeerId::from(local_key_3.public()); + + let addr_0: Multiaddr = "/ip4/127.0.0.1/tcp/1240".parse().unwrap(); + let addr_1: Multiaddr = "/ip4/127.0.0.1/tcp/1241".parse().unwrap(); + let addr_2: Multiaddr = "/ip4/127.0.0.1/tcp/1242".parse().unwrap(); + let addr_3: Multiaddr = "/ip4/127.0.0.1/tcp/1243".parse().unwrap(); + let peers_0 = vec![ + PeerDestination::PeerIdWithAddr(local_peer_id_1, addr_1.clone()), + PeerDestination::PeerIdWithAddr(local_peer_id_2, addr_2.clone()), + PeerDestination::PeerIdWithAddr(local_peer_id_3, addr_3.clone()), + ]; + let peers_1 = vec![ + PeerDestination::PeerIdWithAddr(local_peer_id_2, addr_2.clone()), + PeerDestination::PeerIdWithAddr(local_peer_id_0, addr_0.clone()), + PeerDestination::PeerIdWithAddr(local_peer_id_3, addr_3.clone()), + ]; + let peers_2 = vec![ + PeerDestination::PeerIdWithAddr(local_peer_id_3, addr_3.clone()), + PeerDestination::PeerIdWithAddr(local_peer_id_0, addr_0.clone()), + PeerDestination::PeerIdWithAddr(local_peer_id_1, addr_1.clone()), + ]; + let peers_3 = vec![ + PeerDestination::PeerIdWithAddr(local_peer_id_0, addr_0.clone()), + PeerDestination::PeerIdWithAddr(local_peer_id_1, addr_1.clone()), + PeerDestination::PeerIdWithAddr(local_peer_id_2, addr_2.clone()), + ]; + + let handel_conf = HandelConfig { + threshold: Threshold { num: 2, denom: 4 }, + window_shrinking_factor: 4, + initial_scoring_window: 3, + fast_path_window: 10, + dissemination_interval: Duration::from_millis(20), + level_activation_delay: Duration::from_millis(50), + }; + let seed = rng.gen::<[u8; 32]>(); + + let new_committee: HashSet = vec![ + PublicKey::from(sk_0.clone()), + PublicKey::from(sk_1.clone()), + PublicKey::from(sk_2.clone()), + PublicKey::from(sk_3.clone()), + ] + .into_iter() + .collect(); + let new_message = Digest256::::from([0; 32]); + + let (channel_0, rx_aggregation_0) = futures::channel::oneshot::channel(); + let (channel_1, rx_aggregation_1) = futures::channel::oneshot::channel(); + let (channel_2, rx_aggregation_2) = futures::channel::oneshot::channel(); + let (channel_3, rx_aggregation_3) = futures::channel::oneshot::channel(); + + let action_0 = AggregationAction::Reset { + new_committee: new_committee.clone(), + new_message, + channel: channel_0, + }; + let action_1 = AggregationAction::Reset { + new_committee: new_committee.clone(), + new_message, + channel: channel_1, + }; + let action_2 = AggregationAction::Reset { + new_committee: new_committee.clone(), + new_message, + channel: channel_2, + }; + let action_3 = AggregationAction::Reset { + new_committee: new_committee.clone(), + new_message, + channel: channel_3, + }; + + let gen_perm = PseudoRandomGenPerm::new(seed); + let (mut tx_0, inbox_0) = futures::channel::mpsc::channel::>(10); + let (mut tx_1, inbox_1) = futures::channel::mpsc::channel::>(10); + let (mut tx_2, inbox_2) = futures::channel::mpsc::channel::>(10); + let (mut tx_3, inbox_3) = futures::channel::mpsc::channel::>(10); + let sig_aggr_0 = SigmaAggregation::new( + sk_0, + handel_conf, + MakeBinomialPeerPartitions { + rng: gen_perm.clone(), + }, + inbox_0, + VecDeque::new(), + ); + let sig_aggr_1 = SigmaAggregation::new( + sk_1, + handel_conf, + MakeBinomialPeerPartitions { + rng: gen_perm.clone(), + }, + inbox_1, + VecDeque::new(), + ); + let sig_aggr_2 = SigmaAggregation::new( + sk_2, + handel_conf, + MakeBinomialPeerPartitions { + rng: gen_perm.clone(), + }, + inbox_2, + VecDeque::new(), + ); + let sig_aggr_3 = SigmaAggregation::new( + sk_3, + handel_conf, + MakeBinomialPeerPartitions { rng: gen_perm }, + inbox_3, + VecDeque::new(), + ); + let sync_behaviour_0 = |_| sig_aggr_0; + let sync_behaviour_1 = |_| sig_aggr_1; + let sync_behaviour_2 = |_| sig_aggr_2; + let sync_behaviour_3 = |_| sig_aggr_3; + + // Though we spawn multiple tasks we use this single channel for messaging. + let (msg_tx, mut msg_rx) = mpsc::channel::<(Peer4, Msg)>(10); + + let (mut sync_handler_0, nc_0) = make_swarm_components( + peers_0, + sync_behaviour_0, + SIGMA_AGGR_PROTOCOL_ID, + SIGMA_AGGR_V1, + 10, + ); + let (mut sync_handler_1, nc_1) = make_swarm_components( + peers_1, + sync_behaviour_1, + SIGMA_AGGR_PROTOCOL_ID, + SIGMA_AGGR_V1, + 10, + ); + let (mut sync_handler_2, nc_2) = make_swarm_components( + peers_2, + sync_behaviour_2, + SIGMA_AGGR_PROTOCOL_ID, + SIGMA_AGGR_V1, + 10, + ); + let (mut sync_handler_3, nc_3) = make_swarm_components( + peers_3, + sync_behaviour_3, + SIGMA_AGGR_PROTOCOL_ID, + SIGMA_AGGR_V1, + 10, + ); + + let mut msg_tx_sync_handler_0 = msg_tx.clone(); + let sync_handler_0_handle = async_std::task::spawn(async move { + loop { + let msg = sync_handler_0.select_next_some().await; + msg_tx_sync_handler_0 + .try_send((Peer4::First, Msg::Protocol(msg))) + .unwrap(); + } + }); + + let mut msg_tx_sync_handler_1 = msg_tx.clone(); + let sync_handler_1_handle = async_std::task::spawn(async move { + loop { + let msg = sync_handler_1.select_next_some().await; + msg_tx_sync_handler_1 + .try_send((Peer4::Second, Msg::Protocol(msg))) + .unwrap(); + } + }); + + let mut msg_tx_sync_handler_2 = msg_tx.clone(); + let sync_handler_2_handle = async_std::task::spawn(async move { + loop { + let msg = sync_handler_2.select_next_some().await; + msg_tx_sync_handler_2 + .try_send((Peer4::Third, Msg::Protocol(msg))) + .unwrap(); + } + }); + + let mut msg_tx_sync_handler_3 = msg_tx.clone(); + let sync_handler_3_handle = async_std::task::spawn(async move { + loop { + let msg = sync_handler_3.select_next_some().await; + msg_tx_sync_handler_3 + .try_send((Peer4::Fourth, Msg::Protocol(msg))) + .unwrap(); + } + }); + + let (abortable_peer_0, handle_0) = + futures::future::abortable(create_swarm::< + SigmaAggregation>, + Peer4, + >( + local_key_0, nc_0, addr_0.clone(), Peer4::First, msg_tx.clone() + )); + let (abortable_peer_1, handle_1) = + futures::future::abortable(create_swarm::< + SigmaAggregation>, + Peer4, + >( + local_key_1, nc_1, addr_1.clone(), Peer4::Second, msg_tx.clone() + )); + let (abortable_peer_2, handle_2) = + futures::future::abortable(create_swarm::< + SigmaAggregation>, + Peer4, + >( + local_key_2, nc_2, addr_2.clone(), Peer4::Third, msg_tx.clone() + )); + let (abortable_peer_3, handle_3) = + futures::future::abortable(create_swarm::< + SigmaAggregation>, + Peer4, + >(local_key_3, nc_3, addr_3.clone(), Peer4::Fourth, msg_tx)); + + let (cancel_tx_0, cancel_rx_0) = oneshot::channel::<()>(); + let (cancel_tx_1, cancel_rx_1) = oneshot::channel::<()>(); + let (cancel_tx_2, cancel_rx_2) = oneshot::channel::<()>(); + let (cancel_tx_3, cancel_rx_3) = oneshot::channel::<()>(); + + let secs = 20; + + // Spawn tasks for peer_0 + async_std::task::spawn(async move { + let _ = cancel_rx_0.await; + handle_0.abort(); + sync_handler_0_handle.cancel().await; + }); + async_std::task::spawn(async move { + wasm_timer::Delay::new(Duration::from_secs(secs)).await.unwrap(); + cancel_tx_0.send(()).unwrap(); + }); + async_std::task::spawn(abortable_peer_0); + + // Spawn tasks for peer_1 + async_std::task::spawn(async move { + let _ = cancel_rx_1.await; + handle_1.abort(); + sync_handler_1_handle.cancel().await; + }); + async_std::task::spawn(async move { + wasm_timer::Delay::new(Duration::from_secs(secs)).await.unwrap(); + cancel_tx_1.send(()).unwrap(); + }); + async_std::task::spawn(abortable_peer_1); + + // Spawn tasks for peer_2 + async_std::task::spawn(async move { + let _ = cancel_rx_2.await; + handle_2.abort(); + sync_handler_2_handle.cancel().await; + }); + async_std::task::spawn(async move { + wasm_timer::Delay::new(Duration::from_secs(secs)).await.unwrap(); + cancel_tx_2.send(()).unwrap(); + }); + async_std::task::spawn(abortable_peer_2); + + // Spawn tasks for peer_3 + async_std::task::spawn(async move { + let _ = cancel_rx_3.await; + handle_3.abort(); + sync_handler_3_handle.cancel().await; + }); + async_std::task::spawn(async move { + wasm_timer::Delay::new(Duration::from_secs(secs)).await.unwrap(); + cancel_tx_3.send(()).unwrap(); + }); + async_std::task::spawn(abortable_peer_3); + + wasm_timer::Delay::new(Duration::from_secs(5)).await.unwrap(); + // Send Reset message + tx_0.try_send(action_0).unwrap(); + tx_1.try_send(action_1).unwrap(); + tx_2.try_send(action_2).unwrap(); + tx_3.try_send(action_3).unwrap(); + + // Collect messages from the peers. Note that the while loop below will end since all tasks that + // use clones of `msg_tx` are guaranteed to drop, leading to the senders dropping too. + let mut nc_peer_0 = vec![]; + let mut nc_peer_1 = vec![]; + let mut nc_peer_2 = vec![]; + let mut nc_peer_3 = vec![]; + let mut prot_peer_0 = vec![]; + let mut prot_peer_1 = vec![]; + let mut prot_peer_2 = vec![]; + let mut prot_peer_3 = vec![]; + while let Some((peer, msg)) = msg_rx.next().await { + match msg { + Msg::NetworkController(nc_msg) => match peer { + Peer4::First => nc_peer_0.push(nc_msg), + Peer4::Second => nc_peer_1.push(nc_msg), + Peer4::Third => nc_peer_2.push(nc_msg), + Peer4::Fourth => nc_peer_3.push(nc_msg), + }, + Msg::Protocol(p_msg) => match peer { + Peer4::First => prot_peer_0.push(p_msg), + Peer4::Second => prot_peer_1.push(p_msg), + Peer4::Third => prot_peer_2.push(p_msg), + Peer4::Fourth => prot_peer_3.push(p_msg), + }, + } + } + let res_0 = rx_aggregation_0.await; + let res_1 = rx_aggregation_1.await; + let res_2 = rx_aggregation_2.await; + let res_3 = rx_aggregation_3.await; + dbg!(res_0); + + dbg!(&nc_peer_0); + dbg!(&nc_peer_1); + dbg!(&nc_peer_2); + dbg!(&prot_peer_0); + dbg!(&prot_peer_1); + dbg!(&prot_peer_2); + + // Check that `peer_0` is sending out the necessary `Peers` messages. + //assert!( + // prot_peer_0.contains(&SyncMessage::SyncMessageV1(SyncMessageV1::Peers(vec![ + // PeerDestination::PeerIdWithAddr(local_peer_id_1, addr_1) + // ]))) + //); + //assert!( + // prot_peer_0.contains(&SyncMessage::SyncMessageV1(SyncMessageV1::Peers(vec![ + // PeerDestination::PeerId(local_peer_id_2) + // ]))) + //); + + //// Check that `peer_1` is sending out the necessary `Peers` messages. + //assert!( + // prot_peer_1.contains(&SyncMessage::SyncMessageV1(SyncMessageV1::Peers(vec![ + // PeerDestination::PeerIdWithAddr(local_peer_id_2, addr_2) + // ]))) + //); + //assert!( + // prot_peer_1.contains(&SyncMessage::SyncMessageV1(SyncMessageV1::Peers(vec![ + // PeerDestination::PeerId(local_peer_id_0) + // ]))) + //); + + //// Check that `peer_2` is sending out the necessary `Peers` messages. + //assert!( + // prot_peer_2.contains(&SyncMessage::SyncMessageV1(SyncMessageV1::Peers(vec![ + // PeerDestination::PeerIdWithAddr(local_peer_id_0, addr_0) + // ]))) + //); + //assert!( + // prot_peer_2.contains(&SyncMessage::SyncMessageV1(SyncMessageV1::Peers(vec![ + // PeerDestination::PeerId(local_peer_id_1) + // ]))) + //); +} + fn make_swarm_components( peers: Vec, gen_protocol_behaviour: F, + protocol_id: ProtocolId, + protocol_version: ProtocolVer, msg_buffer_size: usize, ) -> ( ProtocolHandler, @@ -945,7 +1663,7 @@ where let (peer_manager, peers) = PeerManager::new(peer_state, peer_manager_conf); let sync_conf = StatefulProtocolConfig { supported_versions: vec![( - SyncSpec::v1(), + protocol_version, StatefulProtocolSpec { max_message_size: 100, approve_required: true, @@ -953,6 +1671,67 @@ where )], }; + let (requests_snd, requests_recv) = mpsc::channel::(10); + let network_api = NetworkMailbox { + mailbox_snd: requests_snd, + }; + let (sync_handler, sync_mailbox) = + ProtocolHandler::new(gen_protocol_behaviour(peers.clone()), network_api, 10); + let nc = NetworkController::new( + peer_conn_handler_conf, + HashMap::from([(protocol_id, (ProtocolConfig::Stateful(sync_conf), sync_mailbox))]), + peers, + peer_manager, + requests_recv, + ); + + (sync_handler, nc) +} + +fn make_oneshot_swarm_components( + peers: Vec, + gen_protocol_behaviour: F, + protocol_id: ProtocolId, + protocol_version: ProtocolVer, + msg_buffer_size: usize, +) -> ( + ProtocolHandler, + NetworkController, ProtocolMailbox>, +) +where + P: ProtocolBehaviour + Unpin + Send + 'static, + F: FnOnce(PeersMailbox) -> P, +{ + let peer_conn_handler_conf = PeerConnHandlerConf { + async_msg_buffer_size: msg_buffer_size, + sync_msg_buffer_size: msg_buffer_size, + open_timeout: Duration::from_secs(60), + initial_keep_alive: Duration::from_secs(60), + }; + let netw_config = NetworkingConfig { + min_known_peers: 1, + min_outbound: 1, + max_inbound: 10, + max_outbound: 20, + }; + let peer_manager_conf = PeerManagerConfig { + min_acceptable_reputation: Reputation::from(0), + min_reputation: Reputation::from(0), + conn_reset_outbound_backoff: Duration::from_secs(120), + conn_alloc_interval: Duration::from_secs(30), + prot_alloc_interval: Duration::from_secs(30), + protocols_allocation: Vec::new(), + peer_manager_msg_buffer_size: 10, + }; + let peer_state = PeerRepo::new(netw_config, peers); + let (peer_manager, peers) = PeerManager::new(peer_state, peer_manager_conf); + let one_shot_proto_conf = OneShotProtocolConfig { + version: protocol_version, + spec: OneShotProtocolSpec { + max_message_size: 100, + }, + }; + let (requests_snd, requests_recv) = mpsc::channel::(10); let network_api = NetworkMailbox { mailbox_snd: requests_snd, @@ -962,8 +1741,8 @@ where let nc = NetworkController::new( peer_conn_handler_conf, HashMap::from([( - SYNC_PROTOCOL_ID, - (ProtocolConfig::Stateful(sync_conf), sync_mailbox), + protocol_id, + (ProtocolConfig::OneShot(one_shot_proto_conf), sync_mailbox), )]), peers, peer_manager, @@ -1014,17 +1793,18 @@ pub fn make_nc_without_protocol_handler( (nc, requests_snd) } -async fn create_swarm

( +async fn create_swarm( local_key: identity::Keypair, nc: NetworkController, ProtocolMailbox>, addr: Multiaddr, - peer: Peer, + peer: PT, mut tx: mpsc::Sender<( - Peer, + PT, Msg<<

::TProto as spectrum_network::protocol_handler::ProtocolSpec>::TMessage>, )>, ) where P: ProtocolBehaviour + Unpin + Send + 'static, + PT: std::fmt::Debug + Copy, { let transport = libp2p::development_transport(local_key.clone()).await.unwrap(); let local_peer_id = PeerId::from(local_key.public());