diff --git a/daemon/src/peer.rs b/daemon/src/peer.rs index d2c49ec10..fef72e18e 100644 --- a/daemon/src/peer.rs +++ b/daemon/src/peer.rs @@ -31,7 +31,14 @@ pub mod gossip; pub mod include; mod run_state; -pub use run_state::{config as run_config, Config as RunConfig, Event, Status, WaitingRoomEvent}; +pub use run_state::{ + config as run_config, + Config as RunConfig, + Event, + NetworkDiagnosticEvent, + Status, + WaitingRoomEvent, +}; mod subroutines; use subroutines::Subroutines; @@ -161,6 +168,7 @@ where let (addrs_tx, addrs_rx) = watch::channel(vec![]); let protocol_events = peer.subscribe().boxed(); + let network_diagnostic_events = peer.subscribe_to_diagnostic_events().boxed(); let subroutines = Subroutines::new( peer.clone(), addrs_rx, @@ -169,6 +177,7 @@ where protocol_events, subscriber, control_receiver, + network_diagnostic_events, ) .run() .fuse() diff --git a/daemon/src/peer/run_state.rs b/daemon/src/peer/run_state.rs index 552260a30..46757468f 100644 --- a/daemon/src/peer/run_state.rs +++ b/daemon/src/peer/run_state.rs @@ -13,6 +13,7 @@ use std::{ use serde::Serialize; +pub use librad::net::protocol::event::NetworkDiagnosticEvent; use librad::{ git::Urn, net::{ @@ -88,6 +89,8 @@ pub enum Event { }, /// A state change occurred in the waiting room WaitingRoomTransition(WaitingRoomTransition), + /// Logs of sent and received RPC messages + NetworkDiagnostic(NetworkDiagnosticEvent), } impl From> for Event { @@ -96,6 +99,12 @@ impl From> for Event { } } +impl From for Event { + fn from(event: NetworkDiagnosticEvent) -> Self { + Self::NetworkDiagnostic(event) + } +} + impl MaybeFrom<&Input> for Event { fn maybe_from(input: &Input) -> Option { match input { @@ -209,6 +218,7 @@ impl RunState { Input::PeerSync(peer_sync_input) => self.handle_peer_sync(&peer_sync_input), Input::Request(request_input) => self.handle_request(request_input), Input::Stats(stats_input) => self.handle_stats(stats_input), + Input::NetworkDiagnostic(ev) => vec![Command::EmitEvent(ev.into())], }; log::trace!("TRANSITION END: {:?} {:?}", self.status, cmds); @@ -318,7 +328,6 @@ impl RunState { result, } => { cmds.extend(self.waiting_room.found(&urn, peer_id, SystemTime::now())); - if let PutResult::Applied(_) = result { cmds.push(Command::Include(urn)); } diff --git a/daemon/src/peer/run_state/input.rs b/daemon/src/peer/run_state/input.rs index a345c244f..94ece1006 100644 --- a/daemon/src/peer/run_state/input.rs +++ b/daemon/src/peer/run_state/input.rs @@ -7,7 +7,12 @@ use std::{net::SocketAddr, time::SystemTime}; use tokio::sync::oneshot; -use librad::{git::Urn, net, net::peer::ProtocolEvent, peer::PeerId}; +use librad::{ + git::Urn, + net, + net::{peer::ProtocolEvent, protocol::event::NetworkDiagnosticEvent}, + peer::PeerId, +}; use crate::{ peer::announcement, @@ -31,6 +36,7 @@ pub enum Input { /// the network. Request(Request), Stats(Stats), + NetworkDiagnostic(NetworkDiagnosticEvent), } /// Announcement subroutine lifecycle events. diff --git a/daemon/src/peer/subroutines.rs b/daemon/src/peer/subroutines.rs index c8607591f..e65c0d8d0 100644 --- a/daemon/src/peer/subroutines.rs +++ b/daemon/src/peer/subroutines.rs @@ -6,6 +6,7 @@ //! Management of peer subroutine tasks driven by advancing the core state //! machine with a stream of inputs, producing commands. +use librad::net::protocol::event::NetworkDiagnosticEvent; use std::{ net::SocketAddr, time::{Duration, SystemTime}, @@ -69,6 +70,7 @@ pub struct Subroutines { impl Subroutines { /// Constructs a new subroutines manager. + #[allow(clippy::too_many_arguments)] pub fn new( peer: net::peer::Peer, mut listen_addrs: watch::Receiver>, @@ -77,6 +79,10 @@ impl Subroutines { protocol_events: BoxStream<'static, Result>, subscriber: broadcast::Sender, mut control_receiver: mpsc::Receiver, + network_diagnostic_events: BoxStream< + 'static, + Result, + >, ) -> Self { let announce_timer = if run_config.announce.interval.is_zero() { None @@ -126,6 +132,23 @@ impl Subroutines { .boxed(), ); + coalesced.push( + network_diagnostic_events + .filter_map(|res| async move { + match res { + Ok(ev) => Some(Input::NetworkDiagnostic(ev)), + Err(err) => { + log::warn!( + "receive error for network diagnostic event log: {}", + err + ); + None + }, + } + }) + .boxed(), + ); + coalesced.push( stream! { while listen_addrs.changed().await.is_ok() { diff --git a/librad/Cargo.toml b/librad/Cargo.toml index 3f1fe9f85..23b1df4c9 100644 --- a/librad/Cargo.toml +++ b/librad/Cargo.toml @@ -97,7 +97,7 @@ features = ["tls-rustls"] [dependencies.radicle-data] path = "../data" -features = ["minicbor"] +features = ["minicbor", "serde"] [dependencies.radicle-git-ext] path = "../git-ext" diff --git a/librad/src/net/peer.rs b/librad/src/net/peer.rs index 549d3ec8e..116297fb4 100644 --- a/librad/src/net/peer.rs +++ b/librad/src/net/peer.rs @@ -10,7 +10,7 @@ use futures_timer::Delay; use thiserror::Error; use tokio::task::spawn_blocking; -use super::protocol::{self, gossip}; +use super::protocol::{self, event::NetworkDiagnosticEvent, gossip}; use crate::{ git::{self, storage::Fetchers, Urn}, signer::Signer, @@ -240,6 +240,12 @@ where self.phone.subscribe() } + pub fn subscribe_to_diagnostic_events( + &self, + ) -> impl futures::Stream> { + self.phone.subscribe_diagnostic_events() + } + /// Borrow a [`git::storage::Storage`] from the pool, and run a blocking /// computation on it. pub async fn using_storage(&self, blocking: F) -> Result diff --git a/librad/src/net/protocol.rs b/librad/src/net/protocol.rs index 53b6d03a6..3b81f26e0 100644 --- a/librad/src/net/protocol.rs +++ b/librad/src/net/protocol.rs @@ -197,6 +197,7 @@ pub fn accept( state, incoming, periodic, + .. }: Bound, disco: Disco, ) -> impl Future> diff --git a/librad/src/net/protocol/broadcast.rs b/librad/src/net/protocol/broadcast.rs index 6e9a9967e..ba486173e 100644 --- a/librad/src/net/protocol/broadcast.rs +++ b/librad/src/net/protocol/broadcast.rs @@ -13,7 +13,9 @@ use crate::PeerId; mod storage; pub use storage::{LocalStorage, PutResult}; -#[derive(Clone, Debug, PartialEq, minicbor::Encode, minicbor::Decode)] +use serde::Serialize; + +#[derive(Clone, Debug, PartialEq, minicbor::Encode, minicbor::Decode, Serialize)] pub enum Message { #[n(0)] #[cbor(array)] @@ -46,7 +48,7 @@ pub(super) trait ErrorRateLimited { #[derive(Debug, Error)] pub enum Error where - A: Debug, + A: Debug + Clone + Ord, P: Debug, { #[error("unsolicited message from {remote_id}")] @@ -68,7 +70,7 @@ where M: Membership, S: LocalStorage + ErrorRateLimited, F: Fn() -> PeerInfo, - A: Clone + Debug + Send + 'static, + A: Clone + Debug + Send + Ord + 'static, P: Clone + Debug, { use tick::Tock::*; diff --git a/librad/src/net/protocol/event.rs b/librad/src/net/protocol/event.rs index fae2e518d..e284e6438 100644 --- a/librad/src/net/protocol/event.rs +++ b/librad/src/net/protocol/event.rs @@ -5,9 +5,11 @@ use std::{collections::HashMap, net::SocketAddr}; -use super::{broadcast, error, gossip, interrogation, membership}; +use super::{broadcast, error, gossip, interrogation, io::Rpc, membership}; use crate::PeerId; +use serde::Serialize; + #[derive(Clone)] pub enum Downstream { Gossip(downstream::Gossip), @@ -169,3 +171,57 @@ pub mod upstream { } } } + +#[derive(Clone, Debug, Serialize)] +#[serde(tag="type", rename_all="camelCase")] +pub enum NetworkDiagnosticEvent { + GossipSent { + to: SocketAddr, + message: broadcast::Message, + }, + GossipReceived { + from: SocketAddr, + message: broadcast::Message, + }, + HpvSent { + to: SocketAddr, + message: membership::Message, + }, + HpvReceived { + from: SocketAddr, + message: membership::Message, + }, +} + +impl NetworkDiagnosticEvent { + pub(crate) fn hpv_sent( + to: SocketAddr, + message: membership::Message, + ) -> NetworkDiagnosticEvent { + NetworkDiagnosticEvent::HpvSent { to, message } + } + + pub(crate) fn hpv_received( + from: SocketAddr, + message: membership::Message, + ) -> NetworkDiagnosticEvent { + NetworkDiagnosticEvent::HpvReceived { from, message } + } + + pub(crate) fn gossip_received( + from: SocketAddr, + message: broadcast::Message, + ) -> NetworkDiagnosticEvent { + NetworkDiagnosticEvent::GossipReceived { from, message } + } + + pub(crate) fn rpc_sent( + to: SocketAddr, + rpc: Rpc, + ) -> NetworkDiagnosticEvent { + match rpc { + Rpc::Membership(message) => NetworkDiagnosticEvent::HpvSent { to, message }, + Rpc::Gossip(message) => NetworkDiagnosticEvent::GossipSent { to, message }, + } + } +} diff --git a/librad/src/net/protocol/gossip.rs b/librad/src/net/protocol/gossip.rs index 8e0ca26aa..b8c182f31 100644 --- a/librad/src/net/protocol/gossip.rs +++ b/librad/src/net/protocol/gossip.rs @@ -7,6 +7,8 @@ use minicbor::{Decode, Decoder, Encode, Encoder}; use crate::{identities::git::Urn, peer::PeerId}; +use serde::Serialize; + #[derive(Clone, Debug, PartialEq)] pub enum Rev { Git(git2::Oid), @@ -43,8 +45,19 @@ impl<'de> Decode<'de> for Rev { } } +impl Serialize for Rev { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + match self { + Self::Git(oid) => serializer.serialize_str(oid.to_string().as_str()), + } + } +} + /// The gossip payload type -#[derive(Clone, Debug, PartialEq, Encode, Decode)] +#[derive(Clone, Debug, PartialEq, Encode, Decode, Serialize)] #[cbor(array)] pub struct Payload { /// URN of an updated or wanted repo. diff --git a/librad/src/net/protocol/info.rs b/librad/src/net/protocol/info.rs index 5cbc7da61..ddce8e018 100644 --- a/librad/src/net/protocol/info.rs +++ b/librad/src/net/protocol/info.rs @@ -7,11 +7,12 @@ use std::{collections::BTreeSet, convert::TryFrom, option::NoneError}; use data::BoundedVec; use minicbor::{Decode, Encode}; +use serde::Serialize; use typenum::U16; use crate::peer::PeerId; -#[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd, Encode, Decode)] +#[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd, Encode, Decode, Serialize)] #[repr(u8)] pub enum Capability { #[n(0)] @@ -86,7 +87,7 @@ impl From> for (PeerId, Vec) { } } -#[derive(Debug, Clone, PartialEq, Encode)] +#[derive(Debug, Clone, PartialEq, Encode, Serialize)] #[cbor(array)] pub struct GenericPeerInfo { #[n(0)] @@ -159,7 +160,7 @@ impl<'__b777, Addr: minicbor::Decode<'__b777>, T: minicbor::Decode<'__b777>> }) } } -#[derive(Debug, Clone, PartialEq, Encode)] +#[derive(Debug, Clone, PartialEq, Encode, Serialize)] #[cbor(array)] pub struct PeerAdvertisement { #[n(0)] diff --git a/librad/src/net/protocol/io.rs b/librad/src/net/protocol/io.rs index df5584188..5070d9b8b 100644 --- a/librad/src/net/protocol/io.rs +++ b/librad/src/net/protocol/io.rs @@ -3,6 +3,7 @@ // This file is part of radicle-link, distributed under the GPLv3 with Radicle // Linking Exception. For full terms see the included LICENSE file. +use crate::net::protocol::event::NetworkDiagnosticEvent; use std::{iter, net::SocketAddr}; use data::BoundedVec; @@ -45,11 +46,12 @@ where } if let Some((conn, ingress)) = connect(&state.endpoint, peer, addrs).await { - let rpc_sent = send_rpc::<_, ()>( - &conn, - state.membership.hello(peer_advertisement(&state.endpoint)), - ) - .await; + let msg = state.membership.hello(peer_advertisement(&state.endpoint)); + let rpc: Rpc<_, ()> = msg.clone().into(); + let rpc_sent = send_rpc::<_, ()>(&conn, rpc).await; + state + .phone + .emit_diagnostic_event(NetworkDiagnosticEvent::hpv_sent(conn.remote_addr(), msg)); match rpc_sent { Err(e) => tracing::warn!(err = ?e, "failed to send membership hello"), diff --git a/librad/src/net/protocol/io/recv/gossip.rs b/librad/src/net/protocol/io/recv/gossip.rs index 6a737c0d3..1df98f095 100644 --- a/librad/src/net/protocol/io/recv/gossip.rs +++ b/librad/src/net/protocol/io/recv/gossip.rs @@ -13,7 +13,7 @@ use futures_codec::FramedRead; use crate::{ net::{ - connection::RemotePeer, + connection::{RemoteAddr, RemotePeer}, protocol::{ broadcast, event, @@ -35,7 +35,7 @@ pub(in crate::net::protocol) async fn gossip( stream: Upgraded, ) where S: ProtocolStorage + Clone + 'static, - T: RemotePeer + AsyncRead + Unpin, + T: RemotePeer + RemoteAddr + AsyncRead + Unpin, { let mut recv = FramedRead::new(stream.into_stream(), codec::Gossip::new()); let remote_id = recv.remote_peer_id(); @@ -58,6 +58,12 @@ pub(in crate::net::protocol) async fn gossip( advertised_info: peer_advertisement(&state.endpoint), seen_addrs: iter::empty().into(), }; + state + .phone + .emit_diagnostic_event(event::NetworkDiagnosticEvent::gossip_received( + recv.remote_addr(), + msg.clone(), + )); match broadcast::apply( &state.membership, &state.storage, @@ -126,7 +132,7 @@ where } } -fn disconnect(remote_id: PeerId) -> membership::Tick { +fn disconnect(remote_id: PeerId) -> membership::Tick { membership::Tick::Reply { to: remote_id, message: membership::Message::Disconnect, diff --git a/librad/src/net/protocol/io/recv/membership.rs b/librad/src/net/protocol/io/recv/membership.rs index 9ac7002e2..ceb098775 100644 --- a/librad/src/net/protocol/io/recv/membership.rs +++ b/librad/src/net/protocol/io/recv/membership.rs @@ -15,6 +15,7 @@ use crate::{ net::{ connection::RemoteInfo, protocol::{ + event::NetworkDiagnosticEvent, gossip, io::{codec, peer_advertisement}, membership, @@ -47,6 +48,12 @@ pub(in crate::net::protocol) async fn membership( }, Ok(msg) => { + state + .phone + .emit_diagnostic_event(NetworkDiagnosticEvent::hpv_received( + remote_addr, + msg.clone(), + )); let info = || peer_advertisement(&state.endpoint); match membership::apply(&state.membership, &info, remote_id, remote_addr, msg) { Err(e) => { diff --git a/librad/src/net/protocol/io/send/rpc.rs b/librad/src/net/protocol/io/send/rpc.rs index e79d09725..60b92ebb0 100644 --- a/librad/src/net/protocol/io/send/rpc.rs +++ b/librad/src/net/protocol/io/send/rpc.rs @@ -15,7 +15,7 @@ use crate::net::{ upgrade, }; -#[derive(Debug)] +#[derive(Clone, Debug)] pub enum Rpc { Membership(membership::Message), Gossip(broadcast::Message), diff --git a/librad/src/net/protocol/membership/rpc.rs b/librad/src/net/protocol/membership/rpc.rs index 159f99a26..ec0b04e38 100644 --- a/librad/src/net/protocol/membership/rpc.rs +++ b/librad/src/net/protocol/membership/rpc.rs @@ -5,7 +5,9 @@ use crate::net::protocol::info::{PeerAdvertisement, PeerInfo}; -#[derive(Debug, Clone, PartialEq, minicbor::Encode, minicbor::Decode)] +use serde::Serialize; + +#[derive(Debug, Clone, PartialEq, minicbor::Encode, minicbor::Decode, Serialize)] pub enum Message { #[n(0)] #[cbor(array)] diff --git a/librad/src/net/protocol/tick.rs b/librad/src/net/protocol/tick.rs index 22f6d22ea..49deeb754 100644 --- a/librad/src/net/protocol/tick.rs +++ b/librad/src/net/protocol/tick.rs @@ -11,8 +11,8 @@ use futures::{ }; use tracing::Instrument as _; -use super::{error, gossip, io, membership, PeerInfo, ProtocolStorage, State}; -use crate::PeerId; +use super::{error, event, gossip, io, membership, PeerInfo, ProtocolStorage, State}; +use crate::{net::connection::RemoteAddr, PeerId}; #[derive(Debug)] pub(super) enum Tock { @@ -84,7 +84,7 @@ where }, Some(conn) => { - io::send_rpc(&conn, message) + io::send_rpc(&conn, message.clone()) .map_err(|e| { let membership::TnT { trans, ticks: cont } = state.membership.connection_lost(to); @@ -95,11 +95,19 @@ where source: e.into(), }) }) - .await + .await?; + state + .phone + .emit_diagnostic_event(event::NetworkDiagnosticEvent::rpc_sent( + conn.remote_addr(), + message, + )); + Ok(()) }, }, AttemptSend { to, message } => { + let phone = state.phone.clone(); let conn = match state.endpoint.get_connection(to.peer_id) { None => { let (conn, ingress) = io::connect_peer_info(&state.endpoint, to.clone()) @@ -111,9 +119,14 @@ where }, Some(conn) => Ok::<_, error::Tock>(conn), }?; - Ok(io::send_rpc(&conn, message) + io::send_rpc(&conn, message.clone()) .await - .map_err(error::BestEffortSend::SendGossip)?) + .map_err(error::BestEffortSend::SendGossip)?; + phone.emit_diagnostic_event(event::NetworkDiagnosticEvent::rpc_sent( + conn.remote_addr(), + message, + )); + Ok(()) }, Disconnect { peer } => { diff --git a/librad/src/net/protocol/tincans.rs b/librad/src/net/protocol/tincans.rs index 2d80aba1f..56fe1a7ba 100644 --- a/librad/src/net/protocol/tincans.rs +++ b/librad/src/net/protocol/tincans.rs @@ -22,6 +22,7 @@ use crate::PeerId; pub struct TinCans { pub(super) downstream: tincan::Sender, pub(super) upstream: tincan::Sender, + diagnostic_events: tincan::Sender, } impl TinCans { @@ -29,6 +30,7 @@ impl TinCans { Self { downstream: tincan::channel(16).0, upstream: tincan::channel(16).0, + diagnostic_events: tincan::channel(16).0, } } @@ -141,6 +143,18 @@ impl TinCans { pub(super) fn emit(&self, evt: impl Into) { self.upstream.send(evt.into()).ok(); } + + pub fn subscribe_diagnostic_events( + &self, + ) -> impl futures::Stream> { + let mut r = self.diagnostic_events.subscribe(); + async_stream::stream! { loop { yield r.recv().await } } + } + + pub(super) fn emit_diagnostic_event(&self, evt: impl Into) { + tracing::info!("Emitting log event"); + self.diagnostic_events.send(evt.into()).ok(); + } } impl Default for TinCans {