Skip to content

Commit

Permalink
chore(gossispsub): deprecate futures-ticker
Browse files Browse the repository at this point in the history
to address [RUSTSEC-2024-0384 ](https://rustsec.org/advisories/RUSTSEC-2024-0384.html). Use `futures-timer` and `Delay` instead

Pull-Request: #5674.
  • Loading branch information
jxs authored Nov 21, 2024
1 parent 13b9ea2 commit c3a21d1
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 43 deletions.
41 changes: 15 additions & 26 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions protocols/gossipsub/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
## 0.48.0

- Deprecate `futures-ticker` and use `futures-timer` instead.
See [PR 5674](https://github.com/libp2p/rust-libp2p/pull/5674).
- Apply `max_transmit_size` to the inner message instead of the final payload.
See [PR 5642](https://github.com/libp2p/rust-libp2p/pull/5642).

Expand Down
4 changes: 2 additions & 2 deletions protocols/gossipsub/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ keywords = ["peer-to-peer", "libp2p", "networking"]
categories = ["network-programming", "asynchronous"]

[features]
wasm-bindgen = ["getrandom/js"]
wasm-bindgen = ["getrandom/js", "futures-timer/wasm-bindgen"]

[dependencies]
asynchronous-codec = { workspace = true }
Expand All @@ -21,7 +21,6 @@ bytes = "1.6"
either = "1.11"
fnv = "1.0.7"
futures = { workspace = true }
futures-ticker = "0.0.3"
getrandom = "0.2.15"
hex_fmt = "0.3.0"
web-time = { workspace = true }
Expand All @@ -39,6 +38,7 @@ tracing = { workspace = true }

# Metrics dependencies
prometheus-client = { workspace = true }
futures-timer = "3.0.3"

[dev-dependencies]
hex = "0.4.2"
Expand Down
25 changes: 12 additions & 13 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use std::{
time::Duration,
};

use futures::StreamExt;
use futures_ticker::Ticker;
use futures::FutureExt;
use futures_timer::Delay;
use prometheus_client::registry::Registry;
use rand::{seq::SliceRandom, thread_rng};

Expand Down Expand Up @@ -283,7 +283,7 @@ pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {
mcache: MessageCache,

/// Heartbeat interval stream.
heartbeat: Ticker,
heartbeat: Delay,

/// Number of heartbeats since the beginning of time; this allows us to amortize some resource
/// clean up -- eg backoff clean up.
Expand All @@ -301,7 +301,7 @@ pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {

/// Stores optional peer score data together with thresholds, decay interval and gossip
/// promises.
peer_score: Option<(PeerScore, PeerScoreThresholds, Ticker, GossipPromises)>,
peer_score: Option<(PeerScore, PeerScoreThresholds, Delay, GossipPromises)>,

/// Counts the number of `IHAVE` received from each peer since the last heartbeat.
count_received_ihave: HashMap<PeerId, usize>,
Expand Down Expand Up @@ -448,10 +448,7 @@ where
config.backoff_slack(),
),
mcache: MessageCache::new(config.history_gossip(), config.history_length()),
heartbeat: Ticker::new_with_next(
config.heartbeat_interval(),
config.heartbeat_initial_delay(),
),
heartbeat: Delay::new(config.heartbeat_interval() + config.heartbeat_initial_delay()),
heartbeat_ticks: 0,
px_peers: HashSet::new(),
outbound_peers: HashSet::new(),
Expand Down Expand Up @@ -879,7 +876,7 @@ where
return Err("Peer score set twice".into());
}

let interval = Ticker::new(params.decay_interval);
let interval = Delay::new(params.decay_interval);
let peer_score = PeerScore::new_with_message_delivery_time_callback(params, callback);
self.peer_score = Some((peer_score, threshold, interval, GossipPromises::default()));
Ok(())
Expand Down Expand Up @@ -1145,7 +1142,7 @@ where
}

fn score_below_threshold_from_scores(
peer_score: &Option<(PeerScore, PeerScoreThresholds, Ticker, GossipPromises)>,
peer_score: &Option<(PeerScore, PeerScoreThresholds, Delay, GossipPromises)>,
peer_id: &PeerId,
threshold: impl Fn(&PeerScoreThresholds) -> f64,
) -> (bool, f64) {
Expand Down Expand Up @@ -3105,14 +3102,16 @@ where
}

// update scores
if let Some((peer_score, _, interval, _)) = &mut self.peer_score {
while let Poll::Ready(Some(_)) = interval.poll_next_unpin(cx) {
if let Some((peer_score, _, delay, _)) = &mut self.peer_score {
if delay.poll_unpin(cx).is_ready() {
peer_score.refresh_scores();
delay.reset(peer_score.params.decay_interval);
}
}

while let Poll::Ready(Some(_)) = self.heartbeat.poll_next_unpin(cx) {
if self.heartbeat.poll_unpin(cx).is_ready() {
self.heartbeat();
self.heartbeat.reset(self.config.heartbeat_interval());
}

Poll::Pending
Expand Down
5 changes: 3 additions & 2 deletions protocols/gossipsub/src/peer_score.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,15 @@ mod tests;
const TIME_CACHE_DURATION: u64 = 120;

pub(crate) struct PeerScore {
params: PeerScoreParams,
/// The score parameters.
pub(crate) params: PeerScoreParams,
/// The stats per PeerId.
peer_stats: HashMap<PeerId, PeerStats>,
/// Tracking peers per IP.
peer_ips: HashMap<IpAddr, HashSet<PeerId>>,
/// Message delivery tracking. This is a time-cache of [`DeliveryRecord`]s.
deliveries: TimeCache<MessageId, DeliveryRecord>,
/// callback for monitoring message delivery times
/// Callback for monitoring message delivery times.
message_delivery_time_callback: Option<fn(&PeerId, &TopicHash, f64)>,
}

Expand Down

0 comments on commit c3a21d1

Please sign in to comment.