Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial work for handel integration test #32

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion spectrum-crypto/src/digest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::marker::PhantomData;
use serde::{Deserialize, Serialize, Serializer};
use thiserror::Error;

#[derive(Eq, PartialEq, Copy, Clone, Ord, PartialOrd)]
#[derive(Debug, Eq, PartialEq, Copy, Clone, Ord, PartialOrd)]
pub struct Blake2b;

#[derive(Eq, PartialEq, Copy, Clone, Ord, PartialOrd)]
Expand Down
5 changes: 3 additions & 2 deletions spectrum-network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ integration_tests = []
algebra-core = { version = "0.1.0", path = "../algebra-core" }
spectrum-crypto = { version = "0.1.0", path = "../spectrum-crypto" }
libp2p = { version = "0.51.*", features = ["noise", "yamux", "secp256k1"] }
libp2p-identity = "0.1.1"
libp2p-identity = { version = "0.1.1", features = ["secp256k1"]}
futures = "0.3.21"
async-std = { version = "1.10.0", features = ["attributes"] }
unsigned-varint = { version = "0.7.1", features = ["futures", "asynchronous_codec"] }
Expand All @@ -39,4 +39,5 @@ group = "0.13.*"
nonempty = "0.8.1"

[dev-dependencies]
libp2p = { version = "0.51.*", features = ["noise", "yamux", "async-std"] }
libp2p = { version = "0.51.*", features = ["noise", "yamux", "async-std", "secp256k1"] }
log4rs_test_utils = {version = "0.2.3", featuers = ["test_logging"]}
16 changes: 12 additions & 4 deletions spectrum-network/src/network_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use libp2p::swarm::{
NetworkBehaviour, NotifyHandler, PollParameters, ToSwarm,
};
use libp2p::{Multiaddr, PeerId};
use log::{trace, warn};
use log::{error, trace, warn};

use crate::one_shot_upgrade::OneShotMessage;
use crate::peer_conn_handler::message_sink::MessageSink;
Expand Down Expand Up @@ -162,6 +162,7 @@ impl NetworkAPI for NetworkMailbox {
);
}
fn send_one_shot_message(&self, peer: PeerId, protocol: ProtocolTag, message: RawMessage) {
warn!("Sending one shot message!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
let _ = futures::executor::block_on(self.mailbox_snd.clone().send(
NetworkControllerIn::SendOneShotMessage {
peer,
Expand Down Expand Up @@ -311,6 +312,7 @@ where
}
}
ProtocolConfig::OneShot(one_shot) => {
warn!("init_conn_handler: {:?}", one_shot);
one_shot_protocols.insert(
*protocol_id,
OneShotProtocol {
Expand Down Expand Up @@ -450,6 +452,7 @@ where

FromSwarm::DialFailure(DialFailure { peer_id, .. }) => {
if let Some(peer_id) = peer_id {
warn!("QQQQQQQ: dialfailure {:?}", peer_id);
self.peers.dial_failure(peer_id);
}
}
Expand Down Expand Up @@ -478,7 +481,7 @@ where
out_channel,
handshake,
} => {
trace!("Protocol {} opened with peer {}", protocol_tag, peer_id);
error!("Protocol {} opened with peer {}", protocol_tag, peer_id);
if let Some(ConnectedPeer::Connected {
enabled_protocols, ..
}) = self.enabled_peers.get_mut(&peer_id)
Expand All @@ -487,7 +490,7 @@ where
let protocol_ver = protocol_tag.protocol_ver();
match enabled_protocols.entry(protocol_id) {
Entry::Occupied(mut entry) => {
trace!(
error!(
"Current state of protocol {:?} is {:?}",
protocol_id,
entry.get().0
Expand Down Expand Up @@ -570,6 +573,7 @@ where
protocol_tag,
content,
} => {
panic!("SSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSS");
if let Some((_, han)) = self.supported_protocols.get(&protocol_tag.protocol_id()) {
han.incoming_msg(peer_id, protocol_tag.protocol_ver(), content);
}
Expand Down Expand Up @@ -720,6 +724,7 @@ where
Entry::Occupied(mut enabled_peer) => match enabled_peer.get_mut() {
ConnectedPeer::Connected { conn_id, .. }
| ConnectedPeer::PendingApprove(conn_id) => {
panic!("AAA");
// if the peer is enabled already we reuse existing connection
self.pending_actions.push_back(ToSwarm::NotifyHandler {
peer_id: peer,
Expand All @@ -740,9 +745,12 @@ where
content: message,
});
}
ConnectedPeer::PendingDisconnect(_) => {} // todo: wait for disconnect; reconnect?
ConnectedPeer::PendingDisconnect(_) => {
panic!("CCC");
} // todo: wait for disconnect; reconnect?
},
Entry::Vacant(not_enabled_peer) => {
warn!("DDD");
self.pending_actions
.push_back(ToSwarm::Dial { opts: peer.into() });
not_enabled_peer.insert(ConnectedPeer::PendingConnect {
Expand Down
4 changes: 2 additions & 2 deletions spectrum-network/src/peer_conn_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use libp2p::swarm::{
ConnectionHandler, ConnectionHandlerEvent, KeepAlive, NegotiatedSubstream, SubstreamProtocol,
};
use libp2p::PeerId;
use log::trace;
use log::{trace, warn};
use rand::rngs::OsRng;
use rand::RngCore;

Expand Down Expand Up @@ -460,7 +460,7 @@ impl ConnectionHandler for PeerConnHandler {
protocol: future::Either::Right(rid),
..
}) => {
trace!("{:?} has been fired", rid);
warn!("{:?} has been fired", rid);
self.pending_one_shots.remove(&rid);
}

Expand Down
3 changes: 2 additions & 1 deletion spectrum-network/src/peer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use futures::channel::{mpsc, oneshot};
use futures::{SinkExt, Stream};
use libp2p::swarm::ConnectionId;
use libp2p::PeerId;
use log::{error, info, trace};
use log::{error, info, trace, warn};
use wasm_timer::Delay;

use crate::peer_conn_handler::ConnHandlerError;
Expand Down Expand Up @@ -488,6 +488,7 @@ impl<S: PeersState> PeerManagerNotificationsBehavior for PeerManager<S> {

fn on_connection_established(&mut self, peer_id: PeerId, conn_id: ConnectionId) {
if let Some(PeerInState::Connected(mut cp)) = self.state.peer(&peer_id) {
warn!("ConnectedPeer: confirming connection {:?}", cp);
cp.confirm_connection();
} else {
error!("Peer {} hasn't been acknowledged as connected", peer_id)
Expand Down
11 changes: 8 additions & 3 deletions spectrum-network/src/protocol_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use futures::channel::mpsc::Receiver;
use futures::Stream;
pub use libp2p::swarm::NetworkBehaviour;
use libp2p::PeerId;
use log::{error, trace};
use log::{error, trace, warn};

use crate::network_controller::NetworkAPI;
use crate::peer_conn_handler::message_sink::MessageSink;
Expand Down Expand Up @@ -254,21 +254,25 @@ where
continue;
}
Poll::Ready(None) => return Poll::Ready(None), // terminate, behaviour is exhausted
Poll::Pending => {}
Poll::Pending => {
warn!("Behaviour returned PENDING");
}
}

// 2. Poll incoming events.
if let Poll::Ready(Some(notif)) = Stream::poll_next(Pin::new(&mut self.inbox), cx) {
warn!("ProtocolHandler.inbox: {:?}", notif);
match notif {
ProtocolEvent::Connected(peer_id) => {
trace!("Connected {:?}", peer_id);
warn!("Connected {:?}", peer_id);
self.behaviour.inject_peer_connected(peer_id);
}
ProtocolEvent::Message {
peer_id,
protocol_ver: negotiated_ver,
content,
} => {
warn!("ProtocolEvent::Message received");
if let Ok(msg) = codec::decode::<
<<TBehaviour as ProtocolBehaviour>::TProto as ProtocolSpec>::TMessage,
>(content)
Expand Down Expand Up @@ -330,6 +334,7 @@ where
handshake,
} => {
self.peers.insert(peer_id, sink);
error!("peers: {:?}", self.peers);
match handshake.map(
codec::decode::<
<<TBehaviour as ProtocolBehaviour>::TProto as ProtocolSpec>::THandshake,
Expand Down
8 changes: 7 additions & 1 deletion spectrum-network/src/protocol_handler/handel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::time::{Duration, Instant};

use either::{Either, Left, Right};
use libp2p::PeerId;
use log::{trace, warn};
use void::Void;

use algebra_core::CommutativePartialSemigroup;
Expand Down Expand Up @@ -97,6 +98,8 @@ pub struct Handel<C, P, PP> {
level_activation_schedule: Vec<Option<Instant>>,
}

pub type MakeHandel<'a, C, P, PP> = dyn Fn(HandelConfig, C, P, PP) -> Box<dyn HandelRound<'a, C, PP>>;

impl<C, P, PP> Handel<C, P, PP>
where
C: CommutativePartialSemigroup + Weighted + VerifiableAgainst<P> + Eq + Clone + Debug,
Expand Down Expand Up @@ -323,7 +326,7 @@ where
/// Sends messages for one node from each active level.
fn run_dissemination(&mut self) {
let own_contrib = self.get_own_contribution();
for (lix, lvl) in &mut self.levels.iter_mut().enumerate() {
for (lix, lvl) in &mut self.levels.iter_mut().enumerate().skip(1) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should start from level 1, since level 0 is empty

if let Some(active_lvl) = lvl {
let peers_at_level = self.peer_partitions.peers_at_level(lix, PeerOrd::CVP);
let maybe_next_peer = active_lvl
Expand Down Expand Up @@ -470,9 +473,12 @@ where
self.try_disseminatate();
self.try_activate_levels();
if let Some(out) = self.outbox.pop_front() {
warn!("Handel::poll: return Left({:?})", out);

return Poll::Ready(Left(out));
}
if let Some(ca) = self.get_complete_aggregate() {
warn!("Handel::poll: return Right(ca)");
return Poll::Ready(Right(ca));
}
Poll::Pending
Expand Down
11 changes: 8 additions & 3 deletions spectrum-network/src/protocol_handler/handel/partitioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,9 @@ pub struct BinomialPeerPartitions<R> {
rng: R,
}

#[derive(Clone)]
pub struct MakeBinomialPeerPartitions<R> {
rng: R,
pub rng: R,
}

impl<R> MakePeerPartitions for MakeBinomialPeerPartitions<R>
Expand Down Expand Up @@ -149,11 +150,15 @@ where
.collect::<Vec<_>>()
})
.collect::<Vec<_>>();
let partitions_by_vp = ordered_by_vp(&rng, cleared_partitions.clone(), host_peer_ix);
let partitions_by_cvp = ordered_by_cvp(&rng, cleared_partitions, host_peer_ix, num_nodes);
println!("partitions_by_vp: {:?}", partitions_by_vp);
println!("partitions_by_cvp: {:?}", partitions_by_cvp);
Self {
peers: all_peers,
peer_index: total_index,
partitions_by_vp: ordered_by_vp(&rng, cleared_partitions.clone(), host_peer_ix),
partitions_by_cvp: ordered_by_cvp(&rng, cleared_partitions, host_peer_ix, num_nodes),
partitions_by_vp,
partitions_by_cvp,
rng,
}
}
Expand Down
Loading