diff --git a/Cargo.lock b/Cargo.lock index 57f6df3862f..ec986f43d57 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1629,9 +1629,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" dependencies = [ "futures-core", "futures-sink", @@ -1639,9 +1639,9 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" [[package]] name = "futures-executor" @@ -1657,9 +1657,9 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" [[package]] name = "futures-lite" @@ -1688,9 +1688,9 @@ dependencies = [ [[package]] name = "futures-macro" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", @@ -1710,26 +1710,15 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" [[package]] name = "futures-task" -version = "0.3.30" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" - -[[package]] -name = "futures-ticker" -version = "0.0.3" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9763058047f713632a52e916cc7f6a4b3fc6e9fc1ff8c5b1dc49e5a89041682e" -dependencies = [ - "futures", - "futures-timer", - "instant", -] +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" [[package]] name = "futures-timer" @@ -1743,9 +1732,9 @@ dependencies = [ [[package]] name = "futures-util" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ "futures-channel", "futures-core", @@ -2743,7 +2732,7 @@ dependencies = [ "either", "fnv", "futures", - "futures-ticker", + "futures-timer", "getrandom 0.2.15", "hex", "hex_fmt", diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index 7357170ff93..7bf021c761e 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -1,5 +1,7 @@ ## 0.48.0 +- Deprecate `futures-ticker` and use `futures-timer` instead. + See [PR 5674](https://github.com/libp2p/rust-libp2p/pull/5674). - Apply `max_transmit_size` to the inner message instead of the final payload. See [PR 5642](https://github.com/libp2p/rust-libp2p/pull/5642). diff --git a/protocols/gossipsub/Cargo.toml b/protocols/gossipsub/Cargo.toml index ca6185a85e4..1d58fc98896 100644 --- a/protocols/gossipsub/Cargo.toml +++ b/protocols/gossipsub/Cargo.toml @@ -11,7 +11,7 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [features] -wasm-bindgen = ["getrandom/js"] +wasm-bindgen = ["getrandom/js", "futures-timer/wasm-bindgen"] [dependencies] asynchronous-codec = { workspace = true } @@ -21,7 +21,6 @@ bytes = "1.6" either = "1.11" fnv = "1.0.7" futures = { workspace = true } -futures-ticker = "0.0.3" getrandom = "0.2.15" hex_fmt = "0.3.0" web-time = { workspace = true } @@ -39,6 +38,7 @@ tracing = { workspace = true } # Metrics dependencies prometheus-client = { workspace = true } +futures-timer = "3.0.3" [dev-dependencies] hex = "0.4.2" diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index bf94a5b7920..d0fd3127f72 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -29,8 +29,8 @@ use std::{ time::Duration, }; -use futures::StreamExt; -use futures_ticker::Ticker; +use futures::FutureExt; +use futures_timer::Delay; use prometheus_client::registry::Registry; use rand::{seq::SliceRandom, thread_rng}; @@ -283,7 +283,7 @@ pub struct Behaviour { mcache: MessageCache, /// Heartbeat interval stream. - heartbeat: Ticker, + heartbeat: Delay, /// Number of heartbeats since the beginning of time; this allows us to amortize some resource /// clean up -- eg backoff clean up. @@ -301,7 +301,7 @@ pub struct Behaviour { /// Stores optional peer score data together with thresholds, decay interval and gossip /// promises. - peer_score: Option<(PeerScore, PeerScoreThresholds, Ticker, GossipPromises)>, + peer_score: Option<(PeerScore, PeerScoreThresholds, Delay, GossipPromises)>, /// Counts the number of `IHAVE` received from each peer since the last heartbeat. count_received_ihave: HashMap, @@ -448,10 +448,7 @@ where config.backoff_slack(), ), mcache: MessageCache::new(config.history_gossip(), config.history_length()), - heartbeat: Ticker::new_with_next( - config.heartbeat_interval(), - config.heartbeat_initial_delay(), - ), + heartbeat: Delay::new(config.heartbeat_interval() + config.heartbeat_initial_delay()), heartbeat_ticks: 0, px_peers: HashSet::new(), outbound_peers: HashSet::new(), @@ -879,7 +876,7 @@ where return Err("Peer score set twice".into()); } - let interval = Ticker::new(params.decay_interval); + let interval = Delay::new(params.decay_interval); let peer_score = PeerScore::new_with_message_delivery_time_callback(params, callback); self.peer_score = Some((peer_score, threshold, interval, GossipPromises::default())); Ok(()) @@ -1145,7 +1142,7 @@ where } fn score_below_threshold_from_scores( - peer_score: &Option<(PeerScore, PeerScoreThresholds, Ticker, GossipPromises)>, + peer_score: &Option<(PeerScore, PeerScoreThresholds, Delay, GossipPromises)>, peer_id: &PeerId, threshold: impl Fn(&PeerScoreThresholds) -> f64, ) -> (bool, f64) { @@ -3105,14 +3102,16 @@ where } // update scores - if let Some((peer_score, _, interval, _)) = &mut self.peer_score { - while let Poll::Ready(Some(_)) = interval.poll_next_unpin(cx) { + if let Some((peer_score, _, delay, _)) = &mut self.peer_score { + if delay.poll_unpin(cx).is_ready() { peer_score.refresh_scores(); + delay.reset(peer_score.params.decay_interval); } } - while let Poll::Ready(Some(_)) = self.heartbeat.poll_next_unpin(cx) { + if self.heartbeat.poll_unpin(cx).is_ready() { self.heartbeat(); + self.heartbeat.reset(self.config.heartbeat_interval()); } Poll::Pending diff --git a/protocols/gossipsub/src/peer_score.rs b/protocols/gossipsub/src/peer_score.rs index ac24fc91970..4df8f162ed9 100644 --- a/protocols/gossipsub/src/peer_score.rs +++ b/protocols/gossipsub/src/peer_score.rs @@ -44,14 +44,15 @@ mod tests; const TIME_CACHE_DURATION: u64 = 120; pub(crate) struct PeerScore { - params: PeerScoreParams, /// The score parameters. + pub(crate) params: PeerScoreParams, + /// The stats per PeerId. peer_stats: HashMap, /// Tracking peers per IP. peer_ips: HashMap>, /// Message delivery tracking. This is a time-cache of [`DeliveryRecord`]s. deliveries: TimeCache, - /// callback for monitoring message delivery times + /// Callback for monitoring message delivery times. message_delivery_time_callback: Option, }