diff --git a/crates/net/discv4/src/config.rs b/crates/net/discv4/src/config.rs index b9ce366e8b43..1f90169b55ce 100644 --- a/crates/net/discv4/src/config.rs +++ b/crates/net/discv4/src/config.rs @@ -34,7 +34,7 @@ pub struct Discv4Config { pub ping_interval: Duration, /// The duration of we consider a ping timed out. pub ping_expiration: Duration, - /// The rate at which lookups should be triggered. + /// The rate at which new random lookups should be triggered. pub lookup_interval: Duration, /// The duration of we consider a FindNode request timed out. pub request_timeout: Duration, diff --git a/crates/net/discv4/src/lib.rs b/crates/net/discv4/src/lib.rs index 2b1dc0ba8e0f..784901e586a6 100644 --- a/crates/net/discv4/src/lib.rs +++ b/crates/net/discv4/src/lib.rs @@ -47,7 +47,7 @@ use secp256k1::SecretKey; use std::{ cell::RefCell, collections::{btree_map, hash_map::Entry, BTreeMap, HashMap, VecDeque}, - io, + fmt, io, net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4}, pin::Pin, rc::Rc, @@ -130,6 +130,12 @@ const ENDPOINT_PROOF_EXPIRATION: Duration = Duration::from_secs(24 * 60 * 60); /// Duration used to expire nodes from the routing table 1hr const EXPIRE_DURATION: Duration = Duration::from_secs(60 * 60); +// Restricts how many udp messages can be processed in a single [Discv4Service::poll] call. +// +// This will act as a manual yield point when draining the socket messages where the most CPU +// expensive part is handling outgoing messages: encoding and hashing the packet +const UDP_MESSAGE_POLL_LOOP_BUDGET: i32 = 4; + type EgressSender = mpsc::Sender<(Bytes, SocketAddr)>; type EgressReceiver = mpsc::Receiver<(Bytes, SocketAddr)>; @@ -139,6 +145,10 @@ pub(crate) type IngressReceiver = mpsc::Receiver; type NodeRecordSender = OneshotSender>; /// The Discv4 frontend +/// +/// This communicates with the [Discv4Service] by sending commands over a channel. +/// +/// See also [Discv4::spawn] #[derive(Debug, Clone)] pub struct Discv4 { /// The address of the udp socket @@ -397,8 +407,10 @@ impl Discv4 { } /// Manages discv4 peer discovery over UDP. +/// +/// This is a [Stream] to handles incoming and outgoing discv4 messages and emits updates via: +/// [Discv4Service::update_stream]. #[must_use = "Stream does nothing unless polled"] -#[allow(missing_debug_implementations)] pub struct Discv4Service { /// Local address of the UDP socket. local_address: SocketAddr, @@ -419,8 +431,12 @@ pub struct Discv4Service { /// The routing table. kbuckets: KBucketsTable, /// Receiver for incoming messages + /// + /// Receives incoming messages from the UDP task. ingress: IngressReceiver, /// Sender for sending outgoing messages + /// + /// Sends outgoind messages to the UDP task. egress: EgressSender, /// Buffered pending pings to apply backpressure. /// @@ -446,7 +462,7 @@ pub struct Discv4Service { commands_rx: mpsc::UnboundedReceiver, /// All subscribers for table updates update_listeners: Vec>, - /// The interval when to trigger lookups + /// The interval when to trigger random lookups lookup_interval: Interval, /// Used to rotate targets to lookup lookup_rotator: LookupTargetRotator, @@ -577,7 +593,7 @@ impl Discv4Service { } } - /// Returns the current enr sequence + /// Returns the current enr sequence of the local record. fn enr_seq(&self) -> Option { (self.config.enable_eip868).then(|| self.local_eip_868_enr.seq()) } @@ -601,19 +617,19 @@ impl Discv4Service { } /// Returns the [PeerId] that identifies this node - pub fn local_peer_id(&self) -> &PeerId { + pub const fn local_peer_id(&self) -> &PeerId { &self.local_node_record.id } /// Returns the address of the UDP socket - pub fn local_addr(&self) -> SocketAddr { + pub const fn local_addr(&self) -> SocketAddr { self.local_address } /// Returns the ENR of this service. /// /// Note: this will include the external address if resolved. - pub fn local_enr(&self) -> NodeRecord { + pub const fn local_enr(&self) -> NodeRecord { self.local_node_record } @@ -680,7 +696,7 @@ impl Discv4Service { }) } - /// Creates a new channel for [`DiscoveryUpdate`]s + /// Creates a new bounded channel for [`DiscoveryUpdate`]s. pub fn update_stream(&mut self) -> ReceiverStream { let (tx, rx) = mpsc::channel(512); self.update_listeners.push(tx); @@ -762,7 +778,9 @@ impl Discv4Service { self.pending_find_nodes.insert(node.id, FindNodeRequest::new(ctx)); } - /// Notifies all listeners + /// Notifies all listeners. + /// + /// Removes all listeners that are closed. fn notify(&mut self, update: DiscoveryUpdate) { self.update_listeners.retain_mut(|listener| match listener.try_send(update.clone()) { Ok(()) => true, @@ -855,6 +873,7 @@ impl Discv4Service { } } (Some(_), None) => { + // got an ENR self.send_enr_request(record); } _ => {} @@ -949,6 +968,8 @@ impl Discv4Service { } _ => return false, } + + // send the initial ping to the _new_ node self.try_ping(record, PingReason::InitialInsert); true } @@ -1540,10 +1561,11 @@ impl Discv4Service { } // trigger self lookup - if self.config.enable_lookup && self.lookup_interval.poll_tick(cx).is_ready() { - let _ = self.lookup_interval.poll_tick(cx); - let target = self.lookup_rotator.next(&self.local_node_record.id); - self.lookup_with(target, None); + if self.config.enable_lookup { + while self.lookup_interval.poll_tick(cx).is_ready() { + let target = self.lookup_rotator.next(&self.local_node_record.id); + self.lookup_with(target, None); + } } // re-ping some peers @@ -1558,7 +1580,7 @@ impl Discv4Service { self.set_external_ip_addr(ip); } - // process all incoming commands, this channel can never close + // drain all incoming `Discv4` commands, this channel can never close while let Poll::Ready(Some(cmd)) = self.commands_rx.poll_recv(cx) { match cmd { Discv4Command::Add(enr) => { @@ -1608,6 +1630,9 @@ impl Discv4Service { } } + // restricts how many messages we process in a single poll before yielding back control + let mut udp_message_budget = UDP_MESSAGE_POLL_LOOP_BUDGET; + // process all incoming datagrams while let Poll::Ready(Some(event)) = self.ingress.poll_recv(cx) { match event { @@ -1649,6 +1674,17 @@ impl Discv4Service { self.queued_events.push_back(event); } } + + udp_message_budget -= 1; + if udp_message_budget < 0 { + trace!(target: "discv4", budget=UDP_MESSAGE_POLL_LOOP_BUDGET, "exhausted message poll budget"); + if self.queued_events.is_empty() { + // we've exceeded the message budget and have no events to process + // this will make sure we're woken up again + cx.waker().wake_by_ref(); + } + break + } } // try resending buffered pings @@ -1656,12 +1692,12 @@ impl Discv4Service { // evict expired nodes while self.evict_expired_requests_interval.poll_tick(cx).is_ready() { - self.evict_expired_requests(Instant::now()) + self.evict_expired_requests(Instant::now()); } // evict expired nodes while self.expire_interval.poll_tick(cx).is_ready() { - self.received_pongs.evict_expired(Instant::now(), EXPIRE_DURATION) + self.received_pongs.evict_expired(Instant::now(), EXPIRE_DURATION); } if self.queued_events.is_empty() { @@ -1686,6 +1722,20 @@ impl Stream for Discv4Service { } } +impl fmt::Debug for Discv4Service { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Discv4Service") + .field("local_address", &self.local_address) + .field("local_peer_id", &self.local_peer_id()) + .field("local_node_record", &self.local_node_record) + .field("queued_pings", &self.queued_pings) + .field("pending_lookup", &self.pending_lookup) + .field("pending_find_nodes", &self.pending_find_nodes) + .field("lookup_interval", &self.lookup_interval) + .finish_non_exhaustive() + } +} + /// The Event type the Service stream produces. /// /// This is mainly used for testing purposes and represents messages the service processed @@ -1763,7 +1813,7 @@ pub(crate) async fn receive_loop(udp: Arc, tx: IngressSender, local_i } } -/// The commands sent from the frontend to the service +/// The commands sent from the frontend [Discv4] to the service [Discv4Service]. enum Discv4Command { Add(NodeRecord), SetTcpPort(u16), @@ -1779,6 +1829,7 @@ enum Discv4Command { } /// Event type receiver produces +#[derive(Debug)] pub(crate) enum IngressEvent { /// Encountered an error when reading a datagram message. RecvError(io::Error), @@ -1789,6 +1840,7 @@ pub(crate) enum IngressEvent { } /// Tracks a sent ping +#[derive(Debug)] struct PingRequest { // Timestamp when the request was sent. sent_at: Instant, @@ -1953,6 +2005,9 @@ struct LookupContextInner { /// The closest nodes closest_nodes: RefCell>, /// A listener for all the nodes retrieved in this lookup + /// + /// This is present if the lookup was triggered manually via [Discv4] and we want to return all + /// the nodes once the lookup finishes. listener: Option, } @@ -1981,6 +2036,7 @@ struct QueryNode { responded: bool, } +#[derive(Debug)] struct FindNodeRequest { // Timestamp when the request was sent. sent_at: Instant, @@ -2000,6 +2056,7 @@ impl FindNodeRequest { } } +#[derive(Debug)] struct EnrRequestState { // Timestamp when the request was sent. sent_at: Instant, @@ -2081,6 +2138,7 @@ impl NodeEntry { } /// Represents why a ping is issued +#[derive(Debug)] enum PingReason { /// Initial ping to a previously unknown peer that was inserted into the table. InitialInsert, @@ -2089,7 +2147,7 @@ enum PingReason { EstablishBond, /// Re-ping a peer. RePing, - /// Part of a lookup to ensure endpoint is proven. + /// Part of a lookup to ensure endpoint is proven before we can send a FindNode request. Lookup(NodeRecord, LookupContext), } @@ -2212,9 +2270,10 @@ mod tests { } } - #[tokio::test] + // Bootstraps with mainnet boot nodes + #[tokio::test(flavor = "multi_thread")] #[ignore] - async fn test_lookup() { + async fn test_mainnet_lookup() { reth_tracing::init_test_tracing(); let fork_id = ForkId { hash: ForkHash(hex!("743f3d89")), next: 16191202 }; @@ -2396,6 +2455,7 @@ mod tests { // done assert_eq!(service.pending_find_nodes.len(), 2); } + #[tokio::test] async fn test_no_local_in_closest() { reth_tracing::init_test_tracing();