Skip to content

Commit

Permalink
Merge pull request #9 from n0-computer/cli
Browse files Browse the repository at this point in the history
feat: add cli
  • Loading branch information
rklaehn authored Nov 18, 2024
2 parents ba0f6b0 + 9968d39 commit 7c90c3f
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 0 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ quic-rpc = { version = "0.15.1", optional = true }
quic-rpc-derive = { version = "0.15.0", optional = true }
strum = { version = "0.26", optional = true }
serde-error = "0.1.3"
clap = { version = "4", features = ["derive"], optional = true }
hex = { version = "0.4.3", optional = true }

[dev-dependencies]
clap = { version = "4", features = ["derive"] }
Expand Down Expand Up @@ -85,6 +87,11 @@ rpc = [
"dep:quic-rpc-derive",
"dep:strum",
]
cli = [
"rpc",
"dep:clap",
"dep:hex"
]

[[example]]
name = "chat"
Expand Down
128 changes: 128 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
//! Define the gossiping subcommands.

use std::str::FromStr as _;

use anyhow::{Context, Result};
use clap::{ArgGroup, Subcommand};
use futures_lite::StreamExt;
use futures_util::SinkExt;
use iroh_net::NodeId;
use tokio::io::AsyncBufReadExt;

use crate::rpc::client::SubscribeOpts;

/// Commands to manage gossiping.
#[derive(Subcommand, Debug, Clone)]
#[allow(clippy::large_enum_variant)]
pub enum GossipCommands {
/// Subscribe to a gossip topic
#[command(
long_about = r#"Subscribe to a gossip topic
Example usage:
$ <cmd> gossip subscribe --topic test --start
This will print the current node's id. Open another terminal
or another machine and you can join the same topic:
# on another machine/terminal
$ <cmd> gossip subscribe --topic test <other node_id> --start
Any lines entered in stdin will be sent to the given topic
and received messages will be printed to stdout line-by-line.
The process waits for Ctrl+C to exit."#,
group(
ArgGroup::new("input")
.required(true)
.args(&["topic", "raw_topic"])
)
)]
Subscribe {
/// The topic to subscribe to.
///
/// This will be hashed with BLAKE3 to get the actual topic ID.
#[clap(long)]
topic: Option<String>,
/// The raw topic to subscribe to as hex. Needs to be 32 bytes, i.e. 64 hex characters.
#[clap(long)]
raw_topic: Option<String>,
/// The set of nodes that are also part of the gossip swarm to bootstrap with.
///
/// If empty, this will bootstrap a new swarm. Running the command will print
/// the node's `NodeId`, which can be used as the bootstrap argument in other nodes.
bootstrap: Vec<String>,
/// If enabled, all gossip events will be printed, including neighbor up/down events.
#[clap(long, short)]
verbose: bool,
},
}

impl GossipCommands {
/// Runs the gossip command given the iroh client.
pub async fn run(self, gossip: &crate::rpc::client::Client) -> Result<()> {
match self {
Self::Subscribe {
topic,
raw_topic,
bootstrap,
verbose,
} => {
let bootstrap = bootstrap
.into_iter()
.map(|node_id| NodeId::from_str(&node_id).map_err(|e| {
anyhow::anyhow!("Failed to parse bootstrap node id \"{node_id}\": {e}\nMust be a valid base32-encoded iroh node id.")
}))
.collect::<Result<_, _>>()?;

let topic = match (topic, raw_topic) {
(Some(topic), None) => blake3::hash(topic.as_bytes()).into(),
(None, Some(raw_topic)) => {
let mut slice = [0; 32];
hex::decode_to_slice(raw_topic, &mut slice)
.context("failed to decode raw topic")?;
slice.into()
}
_ => anyhow::bail!("either topic or raw_topic must be provided"),
};

let opts = SubscribeOpts {
bootstrap,
subscription_capacity: 1024,
};

let (mut sink, mut stream) = gossip.subscribe_with_opts(topic, opts).await?;
let mut input_lines = tokio::io::BufReader::new(tokio::io::stdin()).lines();
loop {
tokio::select! {
line = input_lines.next_line() => {
let line = line.context("failed to read from stdin")?;
if let Some(line) = line {
sink.send(crate::net::Command::Broadcast(line.into())).await?;
} else {
break;
}
}
res = stream.next() => {
let res = res.context("gossip stream ended")?.context("failed to read gossip stream")?;
match res {
crate::net::Event::Gossip(event) => {
if verbose {
println!("{:?}", event);
} else if let crate::net::GossipEvent::Received(crate::net::Message { content, .. }) = event {
println!("{:?}", content);
}
}
crate::net::Event::Lagged => {
anyhow::bail!("gossip stream lagged");
}
};
}
}
}
}
}
Ok(())
}
}
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ pub mod metrics;
pub mod net;
pub mod proto;

#[cfg(feature = "cli")]
#[cfg_attr(iroh_docsrs, doc(cfg(feature = "cli")))]
pub mod cli;
#[cfg(feature = "rpc")]
#[cfg_attr(iroh_docsrs, doc(cfg(feature = "rpc")))]
pub mod rpc;
Expand Down

0 comments on commit 7c90c3f

Please sign in to comment.