diff --git a/quinn/src/endpoint.rs b/quinn/src/endpoint.rs index f53c8dd2c1..fd3ef5dfd9 100644 --- a/quinn/src/endpoint.rs +++ b/quinn/src/endpoint.rs @@ -29,7 +29,7 @@ use udp::{RecvMeta, BATCH_SIZE}; use crate::{ connection::Connecting, work_limiter::WorkLimiter, ConnectionEvent, EndpointConfig, VarInt, - IO_LOOP_BOUND, MAX_TRANSMIT_QUEUE_CONTENTS_LEN, RECV_TIME_BOUND, SEND_TIME_BOUND, + IO_LOOP_BOUND, RECV_TIME_BOUND, }; /// A QUIC endpoint. @@ -334,7 +334,6 @@ impl Future for EndpointDriver { let mut keep_going = false; keep_going |= endpoint.drive_recv(cx, now)?; keep_going |= endpoint.handle_events(cx, &self.0.shared); - keep_going |= endpoint.drive_send(cx)?; if !endpoint.incoming.is_empty() { self.0.shared.incoming.notify_waiters(); @@ -376,7 +375,6 @@ pub(crate) struct EndpointInner { pub(crate) struct State { socket: Box, inner: proto::Endpoint, - outgoing: VecDeque, incoming: VecDeque, driver: Option, ipv6: bool, @@ -387,10 +385,7 @@ pub(crate) struct State { driver_lost: bool, recv_limiter: WorkLimiter, recv_buf: Box<[u8]>, - send_limiter: WorkLimiter, runtime: Arc, - /// The aggregateed contents length of the packets in the transmit queue - transmit_queue_contents_len: usize, } #[derive(Debug)] @@ -452,22 +447,38 @@ impl State { .send(ConnectionEvent::Proto(event)); } Some(DatagramEvent::Response(transmit)) => { - // Limiting the memory usage for items queued in the outgoing queue from endpoint - // generated packets. Otherwise, we may see a build-up of the queue under test with - // flood of initial packets against the endpoint. The sender with the sender-limiter - // may not keep up the pace of these packets queued into the queue. - if self.transmit_queue_contents_len - < MAX_TRANSMIT_QUEUE_CONTENTS_LEN - { - let contents_len = transmit.size; - self.outgoing.push_back(udp_transmit( + // Send if there's kernel buffer space; otherwise, drop it + // + // As an endpoint-generated packet, we know this is an + // immediate, stateless response to an unconnected peer, + // one of: + // + // - A version negotiation response due to an unknown version + // - A `CLOSE` due to a malformed or unwanted connection attempt + // - A stateless reset due to an unrecognized connection + // - A `Retry` packet due to a connection attempt when + // `use_retry` is set + // + // In each case, a well-behaved peer can be trusted to retry a + // few times, which is guaranteed to produce the same response + // from us. Repeated failures might at worst cause a peer's new + // connection attempt to time out, which is acceptable if we're + // under such heavy load that there's never room for this code + // to transmit. This is morally equivalent to the packet getting + // lost due to congestion further along the link, which + // similarly relies on peer retries for recovery. + // + // TODO: Pass a noop waker + // (https://github.com/rust-lang/rust/issues/98286) so we don't + // get spuriously woken after dropping a datagram on purpose. + let contents_len = transmit.size; + _ = self.socket.poll_send( + cx, + &[udp_transmit( transmit, buffer.split_to(contents_len).freeze(), - )); - self.transmit_queue_contents_len = self - .transmit_queue_contents_len - .saturating_add(contents_len); - } + )], + ); } None => {} } @@ -496,42 +507,6 @@ impl State { Ok(false) } - fn drive_send(&mut self, cx: &mut Context) -> Result { - self.send_limiter.start_cycle(); - - let result = loop { - if self.outgoing.is_empty() { - break Ok(false); - } - - if !self.send_limiter.allow_work() { - break Ok(true); - } - - match self.socket.poll_send(cx, self.outgoing.as_slices().0) { - Poll::Ready(Ok(n)) => { - let contents_len: usize = - self.outgoing.drain(..n).map(|t| t.contents.len()).sum(); - self.transmit_queue_contents_len = self - .transmit_queue_contents_len - .saturating_sub(contents_len); - // We count transmits instead of `poll_send` calls since the cost - // of a `sendmmsg` still linearly increases with number of packets. - self.send_limiter.record_work(n); - } - Poll::Pending => { - break Ok(false); - } - Poll::Ready(Err(e)) => { - break Err(e); - } - } - }; - - self.send_limiter.finish_cycle(); - result - } - fn handle_events(&mut self, cx: &mut Context, shared: &Shared) -> bool { for _ in 0..IO_LOOP_BOUND { let (ch, event) = match self.events.poll_recv(cx) { @@ -677,7 +652,6 @@ impl EndpointRef { inner, ipv6, events, - outgoing: VecDeque::new(), incoming: VecDeque::new(), driver: None, connections: ConnectionSet { @@ -689,9 +663,7 @@ impl EndpointRef { driver_lost: false, recv_buf: recv_buf.into(), recv_limiter: WorkLimiter::new(RECV_TIME_BOUND), - send_limiter: WorkLimiter::new(SEND_TIME_BOUND), runtime, - transmit_queue_contents_len: 0, }), })) } diff --git a/quinn/src/lib.rs b/quinn/src/lib.rs index 6fa602f2c3..86886b5e9a 100644 --- a/quinn/src/lib.rs +++ b/quinn/src/lib.rs @@ -110,14 +110,6 @@ const IO_LOOP_BOUND: usize = 160; /// batch of size 32 was observed to take 30us on some systems. const RECV_TIME_BOUND: Duration = Duration::from_micros(50); -/// The maximum amount of time that should be spent in `sendmsg()` calls per endpoint iteration -const SEND_TIME_BOUND: Duration = Duration::from_micros(50); - -/// The maximum size of content length of packets in the outgoing transmit queue. Transmit packets -/// generated from the endpoint (retry or initial close) can be dropped when this limit is being execeeded. -/// Chose to represent 100 MB of data. -const MAX_TRANSMIT_QUEUE_CONTENTS_LEN: usize = 100_000_000; - fn udp_transmit(t: proto::Transmit, buffer: Bytes) -> udp::Transmit { udp::Transmit { destination: t.destination,