Skip to content
This repository has been archived by the owner on Apr 29, 2024. It is now read-only.

Commit

Permalink
cli: Add timing information to rad-sync
Browse files Browse the repository at this point in the history
  • Loading branch information
cloudhead committed Mar 19, 2024
1 parent ddd4bde commit f15afa8
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 15 deletions.
33 changes: 28 additions & 5 deletions radicle-cli/src/commands/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ Options
--seed <nid> Sync with the given node (may be specified multiple times)
-r, --replicas <count> Sync with a specific number of seeds
-v, --verbose Verbose output
--debug Print debug information afer sync
--help Print help
"#,
};
Expand Down Expand Up @@ -171,6 +172,7 @@ pub enum SyncDirection {
#[derive(Default, Debug)]
pub struct Options {
pub rid: Option<RepoId>,
pub debug: bool,
pub verbose: bool,
pub timeout: time::Duration,
pub sort_by: SortBy,
Expand All @@ -188,13 +190,17 @@ impl Args for Options {
let mut fetch = false;
let mut announce = false;
let mut inventory = false;
let mut debug = false;
let mut replicas = None;
let mut seeds = BTreeSet::new();
let mut sort_by = SortBy::default();
let mut op: Option<Operation> = None;

while let Some(arg) = parser.next()? {
match arg {
Long("debug") => {
debug = true;
}
Long("verbose") | Short('v') => {
verbose = true;
}
Expand Down Expand Up @@ -279,6 +285,7 @@ impl Args for Options {
Ok((
Options {
rid,
debug,
verbose,
timeout,
sort_by,
Expand Down Expand Up @@ -334,7 +341,14 @@ pub fn run(options: Options, ctx: impl term::Context) -> anyhow::Result<()> {
}
}
if [SyncDirection::Announce, SyncDirection::Both].contains(&direction) {
announce_refs(rid, settings, options.timeout, &mut node, &profile)?;
announce_refs(
rid,
settings,
options.timeout,
options.debug,
&mut node,
&profile,
)?;
}
}
Operation::Synchronize(SyncMode::Inventory) => {
Expand Down Expand Up @@ -417,6 +431,7 @@ fn announce_refs(
rid: RepoId,
settings: RepoSync,
timeout: time::Duration,
debug: bool,
node: &mut Node,
profile: &Profile,
) -> anyhow::Result<()> {
Expand Down Expand Up @@ -469,18 +484,18 @@ fn announce_refs(
return Ok(());
}

let mut spinner = term::spinner(format!("Syncing with {} node(s)..", unsynced.len()));
let mut spinner = term::spinner(format!("Found {} seed(s)..", unsynced.len()));
let result = node.announce(rid, unsynced, timeout, |event, replicas| match event {
node::AnnounceEvent::Announced => ControlFlow::Continue(()),
node::AnnounceEvent::RefsSynced { remote } => {
spinner.message(format!("Synced with {remote}.."));
node::AnnounceEvent::RefsSynced { remote, time } => {
spinner.message(format!("Synced with {remote} in {time:?}.."));

// We're done syncing when both of these conditions are met:
//
// 1. We've matched or exceeded our target replica count.
// 2. We've synced with the seeds specified manually.
if replicas.len() >= settings.replicas
&& settings.seeds.iter().all(|s| replicas.contains(s))
&& settings.seeds.iter().all(|s| replicas.contains_key(s))
{
ControlFlow::Break(())
} else {
Expand All @@ -494,6 +509,14 @@ fn announce_refs(
} else {
spinner.message(format!("Synced with {} node(s)", result.synced.len()));
spinner.finish();
if debug {
for (seed, time) in &result.synced {
term::println(
" ",
term::format::dim(format!("Synced with {seed} in {time:?}")),
);
}
}
}
for seed in result.timeout {
term::notice!("Seed {seed} timed out..");
Expand Down
4 changes: 2 additions & 2 deletions radicle-cli/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ fn announce_(rid: RepoId, node: &mut Node) -> Result<(), radicle::node::Error> {
Duration::from_secs(9),
|event, _| match event {
node::AnnounceEvent::Announced => ControlFlow::Continue(()),
node::AnnounceEvent::RefsSynced { remote } => {
spinner.message(format!("Synced with {remote}.."));
node::AnnounceEvent::RefsSynced { remote, time } => {
spinner.message(format!("Synced with {remote} in {time:?}.."));
ControlFlow::Continue(())
}
},
Expand Down
7 changes: 5 additions & 2 deletions radicle-remote-helper/src/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -706,9 +706,12 @@ fn sync(
DEFAULT_SYNC_TIMEOUT,
|event, _| match event {
node::AnnounceEvent::Announced => ControlFlow::Continue(()),
node::AnnounceEvent::RefsSynced { remote } => {
node::AnnounceEvent::RefsSynced { remote, time } => {
replicated.insert(remote);
spinner.message(format!("Synced with {}..", cli::format::dim(remote)));
spinner.message(format!(
"Synced with {} in {time:?}..",
cli::format::dim(remote)
));
ControlFlow::Continue(())
}
},
Expand Down
25 changes: 19 additions & 6 deletions radicle/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -633,14 +633,17 @@ pub struct AnnounceResult {
/// Nodes that timed out.
pub timeout: Vec<NodeId>,
/// Nodes that synced.
pub synced: Vec<NodeId>,
pub synced: Vec<(NodeId, time::Duration)>,
}

/// A sync event, emitted by [`Node::announce`].
#[derive(Debug)]
pub enum AnnounceEvent {
/// Refs were synced with the given node.
RefsSynced { remote: NodeId },
RefsSynced {
remote: NodeId,
time: time::Duration,
},
/// Refs were announced to all given nodes.
Announced,
}
Expand Down Expand Up @@ -914,14 +917,15 @@ impl Node {
rid: RepoId,
seeds: impl IntoIterator<Item = NodeId>,
timeout: time::Duration,
mut callback: impl FnMut(AnnounceEvent, &HashSet<PublicKey>) -> ControlFlow<()>,
mut callback: impl FnMut(AnnounceEvent, &HashMap<PublicKey, time::Duration>) -> ControlFlow<()>,
) -> Result<AnnounceResult, Error> {
let events = self.subscribe(timeout)?;
let refs = self.announce_refs(rid)?;

let mut unsynced = seeds.into_iter().collect::<BTreeSet<_>>();
let mut synced = HashSet::new();
let mut synced = HashMap::new();
let mut timeout: Vec<NodeId> = Vec::new();
let started = time::Instant::now();

callback(AnnounceEvent::Announced, &synced);

Expand All @@ -932,13 +936,22 @@ impl Node {
rid: rid_,
at,
}) if rid == rid_ && refs.at == at => {
let elapsed = started.elapsed();
log::debug!(target: "radicle", "Received {e:?}");

unsynced.remove(&remote);
// We can receive synced events from nodes we didn't directly announce to,
// and it's possible to receive duplicates as well.
if synced.insert(remote) {
if callback(AnnounceEvent::RefsSynced { remote }, &synced).is_break() {
if synced.insert(remote, elapsed).is_none() {
if callback(
AnnounceEvent::RefsSynced {
remote,
time: elapsed,
},
&synced,
)
.is_break()
{
break;
}
}
Expand Down

0 comments on commit f15afa8

Please sign in to comment.