Skip to content

Commit

Permalink
Merge pull request #4 from n0-computer/rpc
Browse files Browse the repository at this point in the history
feat!: move rpc types, rpc client, and rpc request handler from iroh to this crate
  • Loading branch information
rklaehn authored Oct 29, 2024
2 parents 85c32cd + 9ad456a commit 77d392c
Show file tree
Hide file tree
Showing 6 changed files with 299 additions and 3 deletions.
79 changes: 77 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 14 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ tokio = { version = "1", optional = true, features = ["io-util", "sync", "rt", "
tokio-util = { version = "0.7.12", optional = true, features = ["codec", "rt"] }
tracing = "0.1"

# rpc dependencies (optional)
nested_enum_utils = { version = "0.1.0", optional = true }
quic-rpc = { version = "0.13", optional = true }
quic-rpc-derive = { version = "0.13", optional = true }
strum = { version = "0.26", optional = true }
serde-error = "0.1.3"

[dev-dependencies]
clap = { version = "4", features = ["derive"] }
iroh-net = { version = "0.27.0", default-features = false, features = ["test-utils"] }
Expand All @@ -61,7 +68,7 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] }
url = "2.4.0"

[features]
default = ["net"]
default = ["net", "rpc"]
net = [
"dep:futures-lite",
"dep:iroh-net",
Expand All @@ -72,6 +79,12 @@ net = [
"dep:futures-util",
"dep:futures-concurrency"
]
rpc = [
"dep:nested_enum_utils",
"dep:quic-rpc",
"dep:quic-rpc-derive",
"dep:strum",
]

[[example]]
name = "chat"
Expand Down
10 changes: 10 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,13 @@ pub mod metrics;
#[cfg_attr(iroh_docsrs, doc(cfg(feature = "net")))]
pub mod net;
pub mod proto;

#[cfg(feature = "rpc")]
#[cfg_attr(iroh_docsrs, doc(cfg(feature = "rpc")))]
pub mod rpc;
#[cfg_attr(iroh_docsrs, doc(cfg(feature = "rpc")))]
#[cfg(feature = "rpc")]
pub use rpc::{
client::Client as RpcClient,
proto::{Request as RpcRequest, Response as RpcResponse, RpcService},
};
36 changes: 36 additions & 0 deletions src/rpc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
//! Provides a rpc protocol as well as a client for the protocol
use crate::net::Gossip;
pub use crate::net::{Command as SubscribeUpdate, Event as SubscribeResponse};
pub mod client;
pub mod proto;

impl Gossip {
/// Handle a gossip request from the RPC server.
pub async fn handle_rpc_request<S: quic_rpc::Service, C: quic_rpc::ServiceEndpoint<S>>(
&self,
msg: crate::rpc::proto::Request,
chan: quic_rpc::server::RpcChannel<crate::rpc::proto::RpcService, C, S>,
) -> Result<(), quic_rpc::server::RpcServerError<C>> {
use quic_rpc::server::RpcServerError;

use crate::rpc::proto::Request::*;
match msg {
Subscribe(msg) => {
let this = self.clone();
chan.bidi_streaming(msg, this, move |handler, req, updates| {
let stream = handler.join_with_stream(
req.topic,
crate::net::JoinOptions {
bootstrap: req.bootstrap,
subscription_capacity: req.subscription_capacity,
},
Box::pin(updates),
);
futures_util::TryStreamExt::map_err(stream, |e| serde_error::Error::new(&*e))
})
.await
}
Update(_msg) => Err(RpcServerError::UnexpectedUpdateMessage),
}
}
}
111 changes: 111 additions & 0 deletions src/rpc/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
//! Iroh gossip client.
//!
//! Create a [`Client`] with a [`quic_rpc::RpcClient`] and use it to interact
//! with a node that runs gossip.
use std::collections::BTreeSet;

use anyhow::Result;
use futures_lite::{Stream, StreamExt};
use futures_util::{Sink, SinkExt};
use iroh_net::NodeId;
use quic_rpc::client::BoxedServiceConnection;

use crate::{
net::{Command as SubscribeUpdate, Event as SubscribeResponse},
proto::TopicId,
rpc::proto::{RpcService, SubscribeRequest},
};

/// Iroh gossip client.
#[derive(Debug, Clone)]
pub struct Client<S = RpcService, C = BoxedServiceConnection<S>> {
pub(super) rpc: quic_rpc::RpcClient<RpcService, C, S>,
}

/// Options for subscribing to a gossip topic.
#[derive(Debug, Clone)]
pub struct SubscribeOpts {
/// Bootstrap nodes to connect to.
pub bootstrap: BTreeSet<NodeId>,
/// Subscription capacity.
pub subscription_capacity: usize,
}

impl Default for SubscribeOpts {
fn default() -> Self {
Self {
bootstrap: BTreeSet::new(),
subscription_capacity: 256,
}
}
}

impl<S, C> Client<S, C>
where
S: quic_rpc::Service,
C: quic_rpc::ServiceConnection<S>,
{
/// Creates a new gossip client.
pub fn new(rpc: quic_rpc::RpcClient<RpcService, C, S>) -> Self {
Self { rpc }
}

/// Subscribes to a gossip topic.
///
/// Returns a sink to send updates to the topic and a stream of responses.
///
/// Updates are either [Broadcast](crate::net::Command::Broadcast)
/// or [BroadcastNeighbors](crate::net::Command::BroadcastNeighbors).
///
/// Broadcasts are gossiped to the entire swarm, while BroadcastNeighbors are sent to
/// just the immediate neighbors of the node.
///
/// Responses are either [Gossip](crate::net::Event::Gossip) or
/// [Lagged](crate::net::Event::Lagged).
///
/// Gossip events contain the actual message content, as well as information about the
/// immediate neighbors of the node.
///
/// A Lagged event indicates that the gossip stream has not been consumed quickly enough.
/// You can adjust the buffer size with the [`SubscribeOpts::subscription_capacity`] option.
pub async fn subscribe_with_opts(
&self,
topic: TopicId,
opts: SubscribeOpts,
) -> Result<(
impl Sink<SubscribeUpdate, Error = anyhow::Error>,
impl Stream<Item = Result<SubscribeResponse>>,
)> {
let (sink, stream) = self
.rpc
.bidi(SubscribeRequest {
topic,
bootstrap: opts.bootstrap,
subscription_capacity: opts.subscription_capacity,
})
.await?;
let stream = stream.map(|item| anyhow::Ok(item??));
let sink = sink.sink_map_err(|_| anyhow::anyhow!("send error"));
Ok((sink, stream))
}

/// Subscribes to a gossip topic with default options.
pub async fn subscribe(
&self,
topic: impl Into<TopicId>,
bootstrap: impl IntoIterator<Item = impl Into<NodeId>>,
) -> Result<(
impl Sink<SubscribeUpdate, Error = anyhow::Error>,
impl Stream<Item = Result<SubscribeResponse>>,
)> {
let bootstrap = bootstrap.into_iter().map(Into::into).collect();
self.subscribe_with_opts(
topic.into(),
SubscribeOpts {
bootstrap,
..Default::default()
},
)
.await
}
}
51 changes: 51 additions & 0 deletions src/rpc/proto.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
//! The RPC protocol between client and node
use std::collections::BTreeSet;

use iroh_net::NodeId;
use nested_enum_utils::enum_conversions;
use quic_rpc_derive::rpc_requests;
use serde::{Deserialize, Serialize};

pub use crate::net::{Command as SubscribeUpdate, Event as SubscribeResponse};
use crate::proto::TopicId;

/// The RPC service type for the gossip protocol
#[derive(Debug, Clone)]
pub struct RpcService;

impl quic_rpc::Service for RpcService {
type Req = Request;
type Res = Response;
}

type RpcResult<T> = std::result::Result<T, serde_error::Error>;

#[allow(missing_docs)]
#[derive(strum::Display, Debug, Serialize, Deserialize)]
#[enum_conversions]
#[rpc_requests(RpcService)]
pub enum Request {
#[bidi_streaming(update = SubscribeUpdate, response = RpcResult<SubscribeResponse>)]
Subscribe(SubscribeRequest),
Update(SubscribeUpdate),
}

#[allow(missing_docs)]
#[derive(strum::Display, Debug, Serialize, Deserialize)]
#[enum_conversions]
pub enum Response {
Subscribe(RpcResult<SubscribeResponse>),
}

/// A request to the node to subscribe to gossip events.
///
/// This is basically a topic and additional options
#[derive(Serialize, Deserialize, Debug)]
pub struct SubscribeRequest {
/// The topic to subscribe to
pub topic: TopicId,
/// The nodes to bootstrap the subscription from
pub bootstrap: BTreeSet<NodeId>,
/// The capacity of the subscription
pub subscription_capacity: usize,
}

0 comments on commit 77d392c

Please sign in to comment.