Skip to content

Commit

Permalink
chore: discv4 touchups (paradigmxyz#6639)
Browse files Browse the repository at this point in the history
Co-authored-by: Emilia Hane <[email protected]>
  • Loading branch information
mattsse and emhane authored Feb 18, 2024
1 parent cde7a1d commit 2b92948
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 21 deletions.
2 changes: 1 addition & 1 deletion crates/net/discv4/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub struct Discv4Config {
pub ping_interval: Duration,
/// The duration of we consider a ping timed out.
pub ping_expiration: Duration,
/// The rate at which lookups should be triggered.
/// The rate at which new random lookups should be triggered.
pub lookup_interval: Duration,
/// The duration of we consider a FindNode request timed out.
pub request_timeout: Duration,
Expand Down
100 changes: 80 additions & 20 deletions crates/net/discv4/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use secp256k1::SecretKey;
use std::{
cell::RefCell,
collections::{btree_map, hash_map::Entry, BTreeMap, HashMap, VecDeque},
io,
fmt, io,
net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4},
pin::Pin,
rc::Rc,
Expand Down Expand Up @@ -130,6 +130,12 @@ const ENDPOINT_PROOF_EXPIRATION: Duration = Duration::from_secs(24 * 60 * 60);
/// Duration used to expire nodes from the routing table 1hr
const EXPIRE_DURATION: Duration = Duration::from_secs(60 * 60);

// Restricts how many udp messages can be processed in a single [Discv4Service::poll] call.
//
// This will act as a manual yield point when draining the socket messages where the most CPU
// expensive part is handling outgoing messages: encoding and hashing the packet
const UDP_MESSAGE_POLL_LOOP_BUDGET: i32 = 4;

type EgressSender = mpsc::Sender<(Bytes, SocketAddr)>;
type EgressReceiver = mpsc::Receiver<(Bytes, SocketAddr)>;

Expand All @@ -139,6 +145,10 @@ pub(crate) type IngressReceiver = mpsc::Receiver<IngressEvent>;
type NodeRecordSender = OneshotSender<Vec<NodeRecord>>;

/// The Discv4 frontend
///
/// This communicates with the [Discv4Service] by sending commands over a channel.
///
/// See also [Discv4::spawn]
#[derive(Debug, Clone)]
pub struct Discv4 {
/// The address of the udp socket
Expand Down Expand Up @@ -397,8 +407,10 @@ impl Discv4 {
}

/// Manages discv4 peer discovery over UDP.
///
/// This is a [Stream] to handles incoming and outgoing discv4 messages and emits updates via:
/// [Discv4Service::update_stream].
#[must_use = "Stream does nothing unless polled"]
#[allow(missing_debug_implementations)]
pub struct Discv4Service {
/// Local address of the UDP socket.
local_address: SocketAddr,
Expand All @@ -419,8 +431,12 @@ pub struct Discv4Service {
/// The routing table.
kbuckets: KBucketsTable<NodeKey, NodeEntry>,
/// Receiver for incoming messages
///
/// Receives incoming messages from the UDP task.
ingress: IngressReceiver,
/// Sender for sending outgoing messages
///
/// Sends outgoind messages to the UDP task.
egress: EgressSender,
/// Buffered pending pings to apply backpressure.
///
Expand All @@ -446,7 +462,7 @@ pub struct Discv4Service {
commands_rx: mpsc::UnboundedReceiver<Discv4Command>,
/// All subscribers for table updates
update_listeners: Vec<mpsc::Sender<DiscoveryUpdate>>,
/// The interval when to trigger lookups
/// The interval when to trigger random lookups
lookup_interval: Interval,
/// Used to rotate targets to lookup
lookup_rotator: LookupTargetRotator,
Expand Down Expand Up @@ -577,7 +593,7 @@ impl Discv4Service {
}
}

/// Returns the current enr sequence
/// Returns the current enr sequence of the local record.
fn enr_seq(&self) -> Option<u64> {
(self.config.enable_eip868).then(|| self.local_eip_868_enr.seq())
}
Expand All @@ -601,19 +617,19 @@ impl Discv4Service {
}

/// Returns the [PeerId] that identifies this node
pub fn local_peer_id(&self) -> &PeerId {
pub const fn local_peer_id(&self) -> &PeerId {
&self.local_node_record.id
}

/// Returns the address of the UDP socket
pub fn local_addr(&self) -> SocketAddr {
pub const fn local_addr(&self) -> SocketAddr {
self.local_address
}

/// Returns the ENR of this service.
///
/// Note: this will include the external address if resolved.
pub fn local_enr(&self) -> NodeRecord {
pub const fn local_enr(&self) -> NodeRecord {
self.local_node_record
}

Expand Down Expand Up @@ -680,7 +696,7 @@ impl Discv4Service {
})
}

/// Creates a new channel for [`DiscoveryUpdate`]s
/// Creates a new bounded channel for [`DiscoveryUpdate`]s.
pub fn update_stream(&mut self) -> ReceiverStream<DiscoveryUpdate> {
let (tx, rx) = mpsc::channel(512);
self.update_listeners.push(tx);
Expand Down Expand Up @@ -762,7 +778,9 @@ impl Discv4Service {
self.pending_find_nodes.insert(node.id, FindNodeRequest::new(ctx));
}

/// Notifies all listeners
/// Notifies all listeners.
///
/// Removes all listeners that are closed.
fn notify(&mut self, update: DiscoveryUpdate) {
self.update_listeners.retain_mut(|listener| match listener.try_send(update.clone()) {
Ok(()) => true,
Expand Down Expand Up @@ -855,6 +873,7 @@ impl Discv4Service {
}
}
(Some(_), None) => {
// got an ENR
self.send_enr_request(record);
}
_ => {}
Expand Down Expand Up @@ -949,6 +968,8 @@ impl Discv4Service {
}
_ => return false,
}

// send the initial ping to the _new_ node
self.try_ping(record, PingReason::InitialInsert);
true
}
Expand Down Expand Up @@ -1540,10 +1561,11 @@ impl Discv4Service {
}

// trigger self lookup
if self.config.enable_lookup && self.lookup_interval.poll_tick(cx).is_ready() {
let _ = self.lookup_interval.poll_tick(cx);
let target = self.lookup_rotator.next(&self.local_node_record.id);
self.lookup_with(target, None);
if self.config.enable_lookup {
while self.lookup_interval.poll_tick(cx).is_ready() {
let target = self.lookup_rotator.next(&self.local_node_record.id);
self.lookup_with(target, None);
}
}

// re-ping some peers
Expand All @@ -1558,7 +1580,7 @@ impl Discv4Service {
self.set_external_ip_addr(ip);
}

// process all incoming commands, this channel can never close
// drain all incoming `Discv4` commands, this channel can never close
while let Poll::Ready(Some(cmd)) = self.commands_rx.poll_recv(cx) {
match cmd {
Discv4Command::Add(enr) => {
Expand Down Expand Up @@ -1608,6 +1630,9 @@ impl Discv4Service {
}
}

// restricts how many messages we process in a single poll before yielding back control
let mut udp_message_budget = UDP_MESSAGE_POLL_LOOP_BUDGET;

// process all incoming datagrams
while let Poll::Ready(Some(event)) = self.ingress.poll_recv(cx) {
match event {
Expand Down Expand Up @@ -1649,19 +1674,30 @@ impl Discv4Service {
self.queued_events.push_back(event);
}
}

udp_message_budget -= 1;
if udp_message_budget < 0 {
trace!(target: "discv4", budget=UDP_MESSAGE_POLL_LOOP_BUDGET, "exhausted message poll budget");
if self.queued_events.is_empty() {
// we've exceeded the message budget and have no events to process
// this will make sure we're woken up again
cx.waker().wake_by_ref();
}
break
}
}

// try resending buffered pings
self.ping_buffered();

// evict expired nodes
while self.evict_expired_requests_interval.poll_tick(cx).is_ready() {
self.evict_expired_requests(Instant::now())
self.evict_expired_requests(Instant::now());
}

// evict expired nodes
while self.expire_interval.poll_tick(cx).is_ready() {
self.received_pongs.evict_expired(Instant::now(), EXPIRE_DURATION)
self.received_pongs.evict_expired(Instant::now(), EXPIRE_DURATION);
}

if self.queued_events.is_empty() {
Expand All @@ -1686,6 +1722,20 @@ impl Stream for Discv4Service {
}
}

impl fmt::Debug for Discv4Service {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Discv4Service")
.field("local_address", &self.local_address)
.field("local_peer_id", &self.local_peer_id())
.field("local_node_record", &self.local_node_record)
.field("queued_pings", &self.queued_pings)
.field("pending_lookup", &self.pending_lookup)
.field("pending_find_nodes", &self.pending_find_nodes)
.field("lookup_interval", &self.lookup_interval)
.finish_non_exhaustive()
}
}

/// The Event type the Service stream produces.
///
/// This is mainly used for testing purposes and represents messages the service processed
Expand Down Expand Up @@ -1763,7 +1813,7 @@ pub(crate) async fn receive_loop(udp: Arc<UdpSocket>, tx: IngressSender, local_i
}
}

/// The commands sent from the frontend to the service
/// The commands sent from the frontend [Discv4] to the service [Discv4Service].
enum Discv4Command {
Add(NodeRecord),
SetTcpPort(u16),
Expand All @@ -1779,6 +1829,7 @@ enum Discv4Command {
}

/// Event type receiver produces
#[derive(Debug)]
pub(crate) enum IngressEvent {
/// Encountered an error when reading a datagram message.
RecvError(io::Error),
Expand All @@ -1789,6 +1840,7 @@ pub(crate) enum IngressEvent {
}

/// Tracks a sent ping
#[derive(Debug)]
struct PingRequest {
// Timestamp when the request was sent.
sent_at: Instant,
Expand Down Expand Up @@ -1953,6 +2005,9 @@ struct LookupContextInner {
/// The closest nodes
closest_nodes: RefCell<BTreeMap<Distance, QueryNode>>,
/// A listener for all the nodes retrieved in this lookup
///
/// This is present if the lookup was triggered manually via [Discv4] and we want to return all
/// the nodes once the lookup finishes.
listener: Option<NodeRecordSender>,
}

Expand Down Expand Up @@ -1981,6 +2036,7 @@ struct QueryNode {
responded: bool,
}

#[derive(Debug)]
struct FindNodeRequest {
// Timestamp when the request was sent.
sent_at: Instant,
Expand All @@ -2000,6 +2056,7 @@ impl FindNodeRequest {
}
}

#[derive(Debug)]
struct EnrRequestState {
// Timestamp when the request was sent.
sent_at: Instant,
Expand Down Expand Up @@ -2081,6 +2138,7 @@ impl NodeEntry {
}

/// Represents why a ping is issued
#[derive(Debug)]
enum PingReason {
/// Initial ping to a previously unknown peer that was inserted into the table.
InitialInsert,
Expand All @@ -2089,7 +2147,7 @@ enum PingReason {
EstablishBond,
/// Re-ping a peer.
RePing,
/// Part of a lookup to ensure endpoint is proven.
/// Part of a lookup to ensure endpoint is proven before we can send a FindNode request.
Lookup(NodeRecord, LookupContext),
}

Expand Down Expand Up @@ -2212,9 +2270,10 @@ mod tests {
}
}

#[tokio::test]
// Bootstraps with mainnet boot nodes
#[tokio::test(flavor = "multi_thread")]
#[ignore]
async fn test_lookup() {
async fn test_mainnet_lookup() {
reth_tracing::init_test_tracing();
let fork_id = ForkId { hash: ForkHash(hex!("743f3d89")), next: 16191202 };

Expand Down Expand Up @@ -2396,6 +2455,7 @@ mod tests {
// done
assert_eq!(service.pending_find_nodes.len(), 2);
}

#[tokio::test]
async fn test_no_local_in_closest() {
reth_tracing::init_test_tracing();
Expand Down

0 comments on commit 2b92948

Please sign in to comment.