Skip to content

Commit

Permalink
Track RPC events and emit them to interested parties
Browse files Browse the repository at this point in the history
It is useful when debugging to be able to track network events as they
occur. This is tricky to do by just tailing the logs as it's quite
noisy. This PR adds a `NetworkDiagnosticEvent` enum which is emitted to a
channel in `TinCans::diagnostic_events`. Any time we receive or send an
RPC message we send it to this channel. This allows interested parties
to subscribe to the channel and display RPC events for diagnostic
purposes.
  • Loading branch information
alexjg committed May 7, 2021
1 parent f9fa830 commit 3ccde30
Show file tree
Hide file tree
Showing 18 changed files with 199 additions and 29 deletions.
11 changes: 10 additions & 1 deletion daemon/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,14 @@ pub mod gossip;
pub mod include;

mod run_state;
pub use run_state::{config as run_config, Config as RunConfig, Event, Status, WaitingRoomEvent};
pub use run_state::{
config as run_config,
Config as RunConfig,
Event,
NetworkDiagnosticEvent,
Status,
WaitingRoomEvent,
};

mod subroutines;
use subroutines::Subroutines;
Expand Down Expand Up @@ -161,6 +168,7 @@ where
let (addrs_tx, addrs_rx) = watch::channel(vec![]);

let protocol_events = peer.subscribe().boxed();
let network_diagnostic_events = peer.subscribe_to_diagnostic_events().boxed();
let subroutines = Subroutines::new(
peer.clone(),
addrs_rx,
Expand All @@ -169,6 +177,7 @@ where
protocol_events,
subscriber,
control_receiver,
network_diagnostic_events,
)
.run()
.fuse()
Expand Down
11 changes: 10 additions & 1 deletion daemon/src/peer/run_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::{

use serde::Serialize;

pub use librad::net::protocol::event::NetworkDiagnosticEvent;
use librad::{
git::Urn,
net::{
Expand Down Expand Up @@ -88,6 +89,8 @@ pub enum Event {
},
/// A state change occurred in the waiting room
WaitingRoomTransition(WaitingRoomTransition<SystemTime>),
/// Logs of sent and received RPC messages
NetworkDiagnostic(NetworkDiagnosticEvent),
}

impl From<WaitingRoomTransition<SystemTime>> for Event {
Expand All @@ -96,6 +99,12 @@ impl From<WaitingRoomTransition<SystemTime>> for Event {
}
}

impl From<NetworkDiagnosticEvent> for Event {
fn from(event: NetworkDiagnosticEvent) -> Self {
Self::NetworkDiagnostic(event)
}
}

impl MaybeFrom<&Input> for Event {
fn maybe_from(input: &Input) -> Option<Self> {
match input {
Expand Down Expand Up @@ -209,6 +218,7 @@ impl RunState {
Input::PeerSync(peer_sync_input) => self.handle_peer_sync(&peer_sync_input),
Input::Request(request_input) => self.handle_request(request_input),
Input::Stats(stats_input) => self.handle_stats(stats_input),
Input::NetworkDiagnostic(ev) => vec![Command::EmitEvent(ev.into())],
};

log::trace!("TRANSITION END: {:?} {:?}", self.status, cmds);
Expand Down Expand Up @@ -318,7 +328,6 @@ impl RunState {
result,
} => {
cmds.extend(self.waiting_room.found(&urn, peer_id, SystemTime::now()));

if let PutResult::Applied(_) = result {
cmds.push(Command::Include(urn));
}
Expand Down
8 changes: 7 additions & 1 deletion daemon/src/peer/run_state/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@ use std::{net::SocketAddr, time::SystemTime};

use tokio::sync::oneshot;

use librad::{git::Urn, net, net::peer::ProtocolEvent, peer::PeerId};
use librad::{
git::Urn,
net,
net::{peer::ProtocolEvent, protocol::event::NetworkDiagnosticEvent},
peer::PeerId,
};

use crate::{
peer::announcement,
Expand All @@ -31,6 +36,7 @@ pub enum Input {
/// the network.
Request(Request),
Stats(Stats),
NetworkDiagnostic(NetworkDiagnosticEvent),
}

/// Announcement subroutine lifecycle events.
Expand Down
23 changes: 23 additions & 0 deletions daemon/src/peer/subroutines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//! Management of peer subroutine tasks driven by advancing the core state
//! machine with a stream of inputs, producing commands.
use librad::net::protocol::event::NetworkDiagnosticEvent;
use std::{
net::SocketAddr,
time::{Duration, SystemTime},
Expand Down Expand Up @@ -69,6 +70,7 @@ pub struct Subroutines {

impl Subroutines {
/// Constructs a new subroutines manager.
#[allow(clippy::too_many_arguments)]
pub fn new(
peer: net::peer::Peer<BoxedSigner>,
mut listen_addrs: watch::Receiver<Vec<SocketAddr>>,
Expand All @@ -77,6 +79,10 @@ impl Subroutines {
protocol_events: BoxStream<'static, Result<ProtocolEvent, net::protocol::RecvError>>,
subscriber: broadcast::Sender<Event>,
mut control_receiver: mpsc::Receiver<control::Request>,
network_diagnostic_events: BoxStream<
'static,
Result<NetworkDiagnosticEvent, net::protocol::RecvError>,
>,
) -> Self {
let announce_timer = if run_config.announce.interval.is_zero() {
None
Expand Down Expand Up @@ -126,6 +132,23 @@ impl Subroutines {
.boxed(),
);

coalesced.push(
network_diagnostic_events
.filter_map(|res| async move {
match res {
Ok(ev) => Some(Input::NetworkDiagnostic(ev)),
Err(err) => {
log::warn!(
"receive error for network diagnostic event log: {}",
err
);
None
},
}
})
.boxed(),
);

coalesced.push(
stream! {
while listen_addrs.changed().await.is_ok() {
Expand Down
2 changes: 1 addition & 1 deletion librad/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ features = ["tls-rustls"]

[dependencies.radicle-data]
path = "../data"
features = ["minicbor"]
features = ["minicbor", "serde"]

[dependencies.radicle-git-ext]
path = "../git-ext"
Expand Down
8 changes: 7 additions & 1 deletion librad/src/net/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use futures_timer::Delay;
use thiserror::Error;
use tokio::task::spawn_blocking;

use super::protocol::{self, gossip};
use super::protocol::{self, event::NetworkDiagnosticEvent, gossip};
use crate::{
git::{self, storage::Fetchers, Urn},
signer::Signer,
Expand Down Expand Up @@ -240,6 +240,12 @@ where
self.phone.subscribe()
}

pub fn subscribe_to_diagnostic_events(
&self,
) -> impl futures::Stream<Item = Result<NetworkDiagnosticEvent, protocol::RecvError>> {
self.phone.subscribe_diagnostic_events()
}

/// Borrow a [`git::storage::Storage`] from the pool, and run a blocking
/// computation on it.
pub async fn using_storage<F, A>(&self, blocking: F) -> Result<A, StorageError>
Expand Down
1 change: 1 addition & 0 deletions librad/src/net/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ pub fn accept<Store, Disco>(
state,
incoming,
periodic,
..
}: Bound<Store>,
disco: Disco,
) -> impl Future<Output = Result<!, quic::Error>>
Expand Down
8 changes: 5 additions & 3 deletions librad/src/net/protocol/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ use crate::PeerId;
mod storage;
pub use storage::{LocalStorage, PutResult};

#[derive(Clone, Debug, PartialEq, minicbor::Encode, minicbor::Decode)]
use serde::Serialize;

#[derive(Clone, Debug, PartialEq, minicbor::Encode, minicbor::Decode, Serialize)]
pub enum Message<Addr, Payload> {
#[n(0)]
#[cbor(array)]
Expand Down Expand Up @@ -46,7 +48,7 @@ pub(super) trait ErrorRateLimited {
#[derive(Debug, Error)]
pub enum Error<A, P>
where
A: Debug,
A: Debug + Clone + Ord,
P: Debug,
{
#[error("unsolicited message from {remote_id}")]
Expand All @@ -68,7 +70,7 @@ where
M: Membership,
S: LocalStorage<A, Update = P> + ErrorRateLimited,
F: Fn() -> PeerInfo<A>,
A: Clone + Debug + Send + 'static,
A: Clone + Debug + Send + Ord + 'static,
P: Clone + Debug,
{
use tick::Tock::*;
Expand Down
58 changes: 57 additions & 1 deletion librad/src/net/protocol/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@

use std::{collections::HashMap, net::SocketAddr};

use super::{broadcast, error, gossip, interrogation, membership};
use super::{broadcast, error, gossip, interrogation, io::Rpc, membership};
use crate::PeerId;

use serde::Serialize;

#[derive(Clone)]
pub enum Downstream {
Gossip(downstream::Gossip),
Expand Down Expand Up @@ -169,3 +171,57 @@ pub mod upstream {
}
}
}

#[derive(Clone, Debug, Serialize)]
#[serde(tag="type", rename_all="camelCase")]
pub enum NetworkDiagnosticEvent {
GossipSent {
to: SocketAddr,
message: broadcast::Message<SocketAddr, gossip::Payload>,
},
GossipReceived {
from: SocketAddr,
message: broadcast::Message<SocketAddr, gossip::Payload>,
},
HpvSent {
to: SocketAddr,
message: membership::Message<SocketAddr>,
},
HpvReceived {
from: SocketAddr,
message: membership::Message<SocketAddr>,
},
}

impl NetworkDiagnosticEvent {
pub(crate) fn hpv_sent(
to: SocketAddr,
message: membership::Message<SocketAddr>,
) -> NetworkDiagnosticEvent {
NetworkDiagnosticEvent::HpvSent { to, message }
}

pub(crate) fn hpv_received(
from: SocketAddr,
message: membership::Message<SocketAddr>,
) -> NetworkDiagnosticEvent {
NetworkDiagnosticEvent::HpvReceived { from, message }
}

pub(crate) fn gossip_received(
from: SocketAddr,
message: broadcast::Message<SocketAddr, gossip::Payload>,
) -> NetworkDiagnosticEvent {
NetworkDiagnosticEvent::GossipReceived { from, message }
}

pub(crate) fn rpc_sent(
to: SocketAddr,
rpc: Rpc<SocketAddr, gossip::Payload>,
) -> NetworkDiagnosticEvent {
match rpc {
Rpc::Membership(message) => NetworkDiagnosticEvent::HpvSent { to, message },
Rpc::Gossip(message) => NetworkDiagnosticEvent::GossipSent { to, message },
}
}
}
15 changes: 14 additions & 1 deletion librad/src/net/protocol/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use minicbor::{Decode, Decoder, Encode, Encoder};

use crate::{identities::git::Urn, peer::PeerId};

use serde::Serialize;

#[derive(Clone, Debug, PartialEq)]
pub enum Rev {
Git(git2::Oid),
Expand Down Expand Up @@ -43,8 +45,19 @@ impl<'de> Decode<'de> for Rev {
}
}

impl Serialize for Rev {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
match self {
Self::Git(oid) => serializer.serialize_str(oid.to_string().as_str()),
}
}
}

/// The gossip payload type
#[derive(Clone, Debug, PartialEq, Encode, Decode)]
#[derive(Clone, Debug, PartialEq, Encode, Decode, Serialize)]
#[cbor(array)]
pub struct Payload {
/// URN of an updated or wanted repo.
Expand Down
7 changes: 4 additions & 3 deletions librad/src/net/protocol/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ use std::{collections::BTreeSet, convert::TryFrom, option::NoneError};

use data::BoundedVec;
use minicbor::{Decode, Encode};
use serde::Serialize;
use typenum::U16;

use crate::peer::PeerId;

#[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd, Encode, Decode)]
#[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd, Encode, Decode, Serialize)]
#[repr(u8)]
pub enum Capability {
#[n(0)]
Expand Down Expand Up @@ -86,7 +87,7 @@ impl<Addr> From<PeerInfo<Addr>> for (PeerId, Vec<Addr>) {
}
}

#[derive(Debug, Clone, PartialEq, Encode)]
#[derive(Debug, Clone, PartialEq, Encode, Serialize)]
#[cbor(array)]
pub struct GenericPeerInfo<Addr, T> {
#[n(0)]
Expand Down Expand Up @@ -159,7 +160,7 @@ impl<'__b777, Addr: minicbor::Decode<'__b777>, T: minicbor::Decode<'__b777>>
})
}
}
#[derive(Debug, Clone, PartialEq, Encode)]
#[derive(Debug, Clone, PartialEq, Encode, Serialize)]
#[cbor(array)]
pub struct PeerAdvertisement<Addr> {
#[n(0)]
Expand Down
12 changes: 7 additions & 5 deletions librad/src/net/protocol/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// This file is part of radicle-link, distributed under the GPLv3 with Radicle
// Linking Exception. For full terms see the included LICENSE file.

use crate::net::protocol::event::NetworkDiagnosticEvent;
use std::{iter, net::SocketAddr};

use data::BoundedVec;
Expand Down Expand Up @@ -45,11 +46,12 @@ where
}

if let Some((conn, ingress)) = connect(&state.endpoint, peer, addrs).await {
let rpc_sent = send_rpc::<_, ()>(
&conn,
state.membership.hello(peer_advertisement(&state.endpoint)),
)
.await;
let msg = state.membership.hello(peer_advertisement(&state.endpoint));
let rpc: Rpc<_, ()> = msg.clone().into();
let rpc_sent = send_rpc::<_, ()>(&conn, rpc).await;
state
.phone
.emit_diagnostic_event(NetworkDiagnosticEvent::hpv_sent(conn.remote_addr(), msg));

match rpc_sent {
Err(e) => tracing::warn!(err = ?e, "failed to send membership hello"),
Expand Down
Loading

0 comments on commit 3ccde30

Please sign in to comment.