Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add iroh-router support & optional in-memory rpc client #2

Merged
merged 2 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ iroh-blobs = { version = "0.28.0" }
iroh-io = { version = "0.6.0", features = ["stats"] }
iroh-metrics = { version = "0.28.0", optional = true }
iroh-net = { version = "0.28.0" }
iroh-router = "0.28.0"
meadowcap = "0.1.0"
nested_enum_utils = "0.1.0"
postcard = { version = "1", default-features = false, features = [ "alloc", "use-std", "experimental-derive", ] }
quic-rpc = "0.15.0"
quic-rpc = "0.15.1"
quic-rpc-derive = "0.15.0"
rand = "0.8.5"
rand_core = "0.6.4"
Expand Down
42 changes: 39 additions & 3 deletions src/engine.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
//! Engine for driving a willow store and synchronisation sessions.

use std::sync::{Arc, OnceLock};

use anyhow::Result;
use futures_lite::future::Boxed;
use futures_util::{
future::{MapErr, Shared},
FutureExt, TryFutureExt,
};
use iroh_net::{endpoint::Connection, Endpoint, NodeId};
use iroh_net::{
endpoint::{Connecting, Connection},
Endpoint, NodeId,
};
use iroh_router::ProtocolHandler;
use tokio::{
sync::{mpsc, oneshot},
task::JoinError,
Expand All @@ -14,6 +21,7 @@ use tokio_util::task::AbortOnDropHandle;
use tracing::{debug, error, error_span, Instrument};

use crate::{
rpc::{client::MemClient, handler::RpcHandler},
session::{
intents::{Intent, IntentHandle},
SessionInit,
Expand All @@ -39,6 +47,7 @@ const PEER_MANAGER_INBOX_CAP: usize = 128;
#[derive(Debug, Clone)]
pub struct Engine {
actor_handle: ActorHandle,
pub(crate) endpoint: Endpoint,
peer_manager_inbox: mpsc::Sender<peer_manager::Input>,
// `Engine` needs to be `Clone + Send`, and we need to `task.await` in its `shutdown()` impl.
// So we need
Expand All @@ -47,11 +56,20 @@ pub struct Engine {
// - `AbortOnDropHandle` to make sure that the `task` is cancelled when all `Node`s are dropped
// (`Shared` acts like an `Arc` around its inner future).
peer_manager_task: Shared<MapErr<AbortOnDropHandle<Result<(), String>>, JoinErrToStr>>,
rpc_handler: Arc<OnceLock<crate::rpc::handler::RpcHandler>>,
}

pub(crate) type JoinErrToStr = Box<dyn Fn(JoinError) -> String + Send + Sync + 'static>;

impl Engine {
/// Get an in memory client to interact with the willow engine.
pub fn client(&self) -> &MemClient {
&self
.rpc_handler
.get_or_init(|| RpcHandler::new(self.clone()))
.client
}

/// Start the Willow engine.
///
/// This needs an `endpoint` to connect to other peers, and a `create_store` closure which
Expand All @@ -74,8 +92,12 @@ impl Engine {
let me = endpoint.node_id();
let actor_handle = ActorHandle::spawn(create_store, me);
let (pm_inbox_tx, pm_inbox_rx) = mpsc::channel(PEER_MANAGER_INBOX_CAP);
let peer_manager =
PeerManager::new(actor_handle.clone(), endpoint, pm_inbox_rx, accept_opts);
let peer_manager = PeerManager::new(
actor_handle.clone(),
endpoint.clone(),
pm_inbox_rx,
accept_opts,
);
let peer_manager_task = tokio::task::spawn(
async move { peer_manager.run().await.map_err(|e| e.to_string()) }
.instrument(error_span!("peer_manager", me=%me.fmt_short())),
Expand All @@ -85,8 +107,10 @@ impl Engine {
.shared();
Engine {
actor_handle,
endpoint,
peer_manager_inbox: pm_inbox_tx,
peer_manager_task,
rpc_handler: Default::default(),
}
}

Expand Down Expand Up @@ -148,3 +172,15 @@ impl std::ops::Deref for Engine {
&self.actor_handle
}
}

impl ProtocolHandler for Engine {
fn accept(self: Arc<Self>, conn: Connecting) -> Boxed<Result<()>> {
Box::pin(async move { self.handle_connection(conn.await?).await })
}

fn shutdown(self: Arc<Self>) -> Boxed<()> {
Box::pin(async move {
(&**self).shutdown().await.ok();
})
}
}
44 changes: 27 additions & 17 deletions src/rpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,28 +47,43 @@ use crate::{
store::traits::{StoreEvent, SubscribeParams},
};

/// Type alias for a memory-backed client.
pub type MemClient = Client<
quic_rpc::transport::flume::FlumeConnector<
crate::rpc::proto::Response,
crate::rpc::proto::Request,
>,
>;

/// Iroh Willow client.
#[derive(Debug, Clone, RefCast)]
#[repr(transparent)]
pub struct Client<C: quic_rpc::Connector<RpcService> = quic_rpc::client::BoxedConnector<RpcService>>
where
C: ConnectionErrors<SendError = anyhow::Error>,
{
pub(super) rpc: RpcClient<C>,
}

impl<C: quic_rpc::Connector<RpcService>> Client<C>
where
C: ConnectionErrors<SendError = anyhow::Error>,
{
impl<C: quic_rpc::Connector<RpcService>> Client<C> {
pub fn new(rpc: RpcClient<C>) -> Self {
Self { rpc }
}

pub fn boxed(self) -> Client<quic_rpc::client::BoxedConnector<RpcService>>
where
C: quic_rpc::transport::boxed::BoxableConnector<
crate::rpc::proto::Response,
crate::rpc::proto::Request,
>,
{
Client {
rpc: self.rpc.boxed(),
}
}

/// Create a new namespace in the Willow store.
pub async fn create(&self, kind: NamespaceKind, owner: UserId) -> Result<Space<C>> {
let req = CreateNamespaceRequest { kind, owner };
let res = self.rpc.rpc(req).await??;
let res = self.rpc.rpc(req).await.map_err(|e| anyhow::anyhow!(e))??;
Ok(Space::new(self.rpc.clone(), res.0))
}

Expand Down Expand Up @@ -138,11 +153,11 @@ where
let req = SyncWithPeerRequest { peer, init };
let (update_tx, event_rx) = self.rpc.bidi(req).await?;

let update_tx = SinkExt::with(
update_tx,
|update| async move { Ok(SyncWithPeerUpdate(update)) },
let update_tx: UpdateSender = Box::pin(
update_tx
.with(|update| async move { Ok(SyncWithPeerUpdate(update)) })
.sink_map_err(|e: <C as ConnectionErrors>::SendError| e.into()),
);
let update_tx: UpdateSender = Box::pin(update_tx);

let event_rx = Box::pin(event_rx.map(|res| match res {
Ok(Ok(SyncWithPeerResponse::Event(event))) => event,
Expand Down Expand Up @@ -187,17 +202,12 @@ where
/// A space to store entries in.
#[derive(Debug, Clone)]
pub struct Space<C: quic_rpc::Connector<RpcService> = quic_rpc::client::BoxedConnector<RpcService>>
where
C: ConnectionErrors<SendError = anyhow::Error>,
{
rpc: RpcClient<C>,
namespace_id: NamespaceId,
}

impl<C: quic_rpc::Connector<RpcService>> Space<C>
where
C: ConnectionErrors<SendError = anyhow::Error>,
{
impl<C: quic_rpc::Connector<RpcService>> Space<C> {
fn new(rpc: RpcClient<C>, namespace_id: NamespaceId) -> Self {
Self { rpc, namespace_id }
}
Expand Down
50 changes: 38 additions & 12 deletions src/rpc/handler.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
use anyhow::Result;
use futures_lite::Stream;
use futures_util::{SinkExt, StreamExt};
use iroh_net::Endpoint;
use quic_rpc::server::{ChannelTypes, RpcChannel, RpcServerError};
use quic_rpc::{
server::{ChannelTypes, RpcChannel, RpcServerError},
RpcClient, RpcServer,
};
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::task::AbortOnDropHandle;

use crate::{form::EntryOrForm, rpc::proto::*, Engine};

fn map_err(err: anyhow::Error) -> RpcError {
RpcError::new(&*err)
}
use crate::{
form::EntryOrForm,
rpc::{client::MemClient, proto::*},
Engine,
};

impl Engine {
pub async fn handle_spaces_request<C: ChannelTypes<RpcService>>(
self,
endpoint: Endpoint,
msg: Request,
chan: RpcChannel<RpcService, C>,
) -> Result<(), RpcServerError<C>> {
Expand Down Expand Up @@ -165,15 +167,15 @@ impl Engine {
.await
}
Addr(msg) => {
chan.rpc(msg, endpoint, |endpoint, _req| async move {
let addr = endpoint.node_addr().await.map_err(map_err)?;
chan.rpc(msg, self, |engine, _req| async move {
let addr = engine.endpoint.node_addr().await.map_err(map_err)?;
Ok(addr)
})
.await
}
AddAddr(msg) => {
chan.rpc(msg, endpoint, |endpoint, req| async move {
endpoint.add_node_addr(req.addr).map_err(map_err)?;
chan.rpc(msg, self, |engine, req| async move {
engine.endpoint.add_node_addr(req.addr).map_err(map_err)?;
Ok(())
})
.await
Expand All @@ -182,6 +184,26 @@ impl Engine {
}
}

#[derive(derive_more::Debug)]
pub(crate) struct RpcHandler {
/// Client to hand out
#[debug("MemClient")]
pub(crate) client: MemClient,
/// Handler task
pub(crate) _handler: AbortOnDropHandle<()>,
}

impl RpcHandler {
pub(crate) fn new(engine: Engine) -> Self {
let (listener, connector) = quic_rpc::transport::flume::channel(1);
let listener = RpcServer::new(listener);
let client = MemClient::new(RpcClient::new(connector));
let _handler = listener
.spawn_accept_loop(move |req, chan| engine.clone().handle_spaces_request(req, chan));
Self { client, _handler }
}
}

// TODO: Try to use the streams directly instead of spawning two tasks.
async fn sync_with_peer(
engine: &Engine,
Expand Down Expand Up @@ -214,3 +236,7 @@ async fn sync_with_peer(
});
Ok(())
}

fn map_err(err: anyhow::Error) -> RpcError {
RpcError::new(&*err)
}
Loading
Loading