Skip to content

Commit

Permalink
Strip down ConnectionEvent to reflect its sole remaining purpose
Browse files Browse the repository at this point in the history
  • Loading branch information
Ralith committed May 29, 2023
1 parent 282b5c7 commit 23afc97
Show file tree
Hide file tree
Showing 9 changed files with 63 additions and 74 deletions.
90 changes: 42 additions & 48 deletions quinn-proto/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::{
frame::{Close, Datagram, FrameStruct},
packet::{Header, LongType, Packet, PartialDecode, SpaceId},
range_set::ArrayRangeSet,
shared::{ConnectionEvent, ConnectionEventInner, ConnectionId, EcnCodepoint},
shared::{ConnectionDatagram, ConnectionId, EcnCodepoint},
token::ResetToken,
transport_parameters::TransportParameters,
ConnectionHandle, Dir, Endpoint, EndpointConfig, Frame, Side, StreamId, Transmit,
Expand Down Expand Up @@ -81,7 +81,7 @@ use timer::{Timer, TimerTable};

/// Protocol state and logic for a single QUIC connection
///
/// Objects of this type receive [`ConnectionEvent`]s and emit application [`Event`]s to make
/// Objects of this type receive [`ConnectionDatagram`]s and emit application [`Event`]s to make
/// progress. To handle timeouts, a `Connection` returns timer updates and expects timeouts through
/// various methods. A number of simple getter methods are exposed to allow callers to inspect some
/// of the connection state.
Expand Down Expand Up @@ -112,11 +112,10 @@ use timer::{Timer, TimerTable};
///
/// (A) may be called whenever desired.
///
/// Care should be made to ensure that the input events represent monotonically
/// increasing time. Specifically, calling [`handle_timeout`](Self::handle_timeout)
/// with events of the same [`Instant`] may be interleaved in any order with a
/// call to [`handle_event`](Self::handle_event) at that same instant; however
/// events or timeouts with different instants must not be interleaved.
/// Care should be made to ensure that the input events represent monotonically increasing
/// time. Specifically, calling [`handle_timeout`](Self::handle_timeout) with events of the same
/// [`Instant`] may be interleaved in any order with a call to [`handle`](Self::handle) at that same
/// instant; however events or timeouts with different instants must not be interleaved.
pub struct Connection {
handle: ConnectionHandle,
endpoint_config: Arc<EndpointConfig>,
Expand Down Expand Up @@ -873,56 +872,51 @@ impl Connection {
SendableFrames::empty()
}

/// Process `ConnectionEvent`s generated by the associated `Endpoint`
/// Process an incoming [`ConnectionDatagram`] forwarded from the associated `Endpoint`
///
/// Will execute protocol logic upon receipt of a connection event, in turn preparing signals
/// (including application `Event`s and outgoing datagrams) that should be extracted through the
/// relevant methods.
pub fn handle_event(&mut self, event: ConnectionEvent, endpoint: &Endpoint) {
use self::ConnectionEventInner::*;
match event.0 {
Datagram {
now,
remote,
ecn,
first_decode,
remaining,
} => {
// If this packet could initiate a migration and we're a client or a server that
// forbids migration, drop the datagram. This could be relaxed to heuristically
// permit NAT-rebinding-like migration.
if remote != self.path.remote
&& self.server_config.as_ref().map_or(true, |x| !x.migration)
{
trace!("discarding packet from unrecognized peer {}", remote);
return;
}
pub fn handle(&mut self, datagram: ConnectionDatagram, endpoint: &Endpoint) {
let ConnectionDatagram {
now,
remote,
ecn,
first_decode,
remaining,
} = datagram;
// If this packet could initiate a migration and we're a client or a server that
// forbids migration, drop the datagram. This could be relaxed to heuristically
// permit NAT-rebinding-like migration.
if remote != self.path.remote && self.server_config.as_ref().map_or(true, |x| !x.migration)
{
trace!("discarding packet from unrecognized peer {}", remote);
return;
}

let was_anti_amplification_blocked = self.path.anti_amplification_blocked(1);
let was_anti_amplification_blocked = self.path.anti_amplification_blocked(1);

self.stats.udp_rx.datagrams += 1;
self.stats.udp_rx.bytes += first_decode.len() as u64;
let data_len = first_decode.len();
self.stats.udp_rx.datagrams += 1;
self.stats.udp_rx.bytes += first_decode.len() as u64;
let data_len = first_decode.len();

self.handle_decode(now, remote, ecn, first_decode, endpoint);
// The current `path` might have changed inside `handle_decode`,
// since the packet could have triggered a migration. Make sure
// the data received is accounted for the most recent path by accessing
// `path` after `handle_decode`.
self.path.total_recvd = self.path.total_recvd.saturating_add(data_len as u64);
self.handle_decode(now, remote, ecn, first_decode, endpoint);
// The current `path` might have changed inside `handle_decode`,
// since the packet could have triggered a migration. Make sure
// the data received is accounted for the most recent path by accessing
// `path` after `handle_decode`.
self.path.total_recvd = self.path.total_recvd.saturating_add(data_len as u64);

if let Some(data) = remaining {
self.stats.udp_rx.bytes += data.len() as u64;
self.handle_coalesced(now, remote, ecn, data, endpoint);
}
if let Some(data) = remaining {
self.stats.udp_rx.bytes += data.len() as u64;
self.handle_coalesced(now, remote, ecn, data, endpoint);
}

if was_anti_amplification_blocked {
// A prior attempt to set the loss detection timer may have failed due to
// anti-amplification, so ensure it's set now. Prevents a handshake deadlock if
// the server's first flight is lost.
self.set_loss_detection_timer(now);
}
}
if was_anti_amplification_blocked {
// A prior attempt to set the loss detection timer may have failed due to
// anti-amplification, so ensure it's set now. Prevents a handshake deadlock if
// the server's first flight is lost.
self.set_loss_detection_timer(now);
}
}

Expand Down
8 changes: 4 additions & 4 deletions quinn-proto/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{
crypto::{self, Keys, UnsupportedVersion},
frame,
packet::{Header, Packet, PacketDecodeError, PacketNumber, PartialDecode},
shared::{ConnectionEvent, ConnectionEventInner, ConnectionId, EcnCodepoint, IssuedCid},
shared::{ConnectionDatagram, ConnectionId, EcnCodepoint, IssuedCid},
transport_parameters::TransportParameters,
ResetToken, RetryToken, Side, Transmit, TransportConfig, TransportError, INITIAL_MTU,
MAX_CID_SIZE, MIN_INITIAL_SIZE, RESET_TOKEN_SIZE,
Expand Down Expand Up @@ -169,13 +169,13 @@ impl Endpoint {
if let Some(ch) = self.index.read().unwrap().get(&addresses, &first_decode) {
return Some(DatagramEvent::ConnectionEvent(
ch,
ConnectionEvent(ConnectionEventInner::Datagram {
ConnectionDatagram {
now,
remote: addresses.remote,
ecn,
first_decode,
remaining,
}),
},
));
}

Expand Down Expand Up @@ -852,7 +852,7 @@ impl IndexMut<ConnectionHandle> for Slab<ConnectionMeta> {
#[allow(clippy::large_enum_variant)] // Not passed around extensively
pub enum DatagramEvent {
/// The datagram is redirected to its `Connection`
ConnectionEvent(ConnectionHandle, ConnectionEvent),
ConnectionEvent(ConnectionHandle, ConnectionDatagram),
/// The datagram has resulted in starting a new `Connection`
NewConnection(ConnectionHandle, Connection),
/// Response generated directly by the endpoint
Expand Down
2 changes: 1 addition & 1 deletion quinn-proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ mod endpoint;
pub use crate::endpoint::{ConnectError, ConnectionHandle, DatagramEvent, Endpoint};

mod shared;
pub use crate::shared::{ConnectionEvent, ConnectionId, EcnCodepoint};
pub use crate::shared::{ConnectionDatagram, ConnectionId, EcnCodepoint};

mod transport_error;
pub use crate::transport_error::{Code as TransportErrorCode, Error as TransportError};
Expand Down
21 changes: 8 additions & 13 deletions quinn-proto/src/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,15 @@ use bytes::{Buf, BufMut, BytesMut};

use crate::{coding::BufExt, packet::PartialDecode, ResetToken, MAX_CID_SIZE};

/// Events sent from an Endpoint to a Connection
/// UDP datagram addressed to a specific
/// [`Connection`](crate::Connection)
#[derive(Debug)]
pub struct ConnectionEvent(pub(crate) ConnectionEventInner);

#[derive(Debug)]
pub(crate) enum ConnectionEventInner {
/// A datagram has been received for the Connection
Datagram {
now: Instant,
remote: SocketAddr,
ecn: Option<EcnCodepoint>,
first_decode: PartialDecode,
remaining: Option<BytesMut>,
},
pub struct ConnectionDatagram {
pub(crate) now: Instant,
pub(crate) remote: SocketAddr,
pub(crate) ecn: Option<EcnCodepoint>,
pub(crate) first_decode: PartialDecode,
pub(crate) remaining: Option<BytesMut>,
}

/// Protocol-level identifier for a connection.
Expand Down
2 changes: 1 addition & 1 deletion quinn-proto/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ fn version_negotiate_client() {
.into(),
);
if let Some(DatagramEvent::ConnectionEvent(_, event)) = opt_event {
client_ch.handle_event(event, &client);
client_ch.handle(event, &client);
}
assert_matches!(
client_ch.poll(),
Expand Down
4 changes: 2 additions & 2 deletions quinn-proto/src/tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ pub(super) struct TestEndpoint {
pub(super) inbound: VecDeque<(Instant, Option<EcnCodepoint>, BytesMut)>,
accepted: Option<ConnectionHandle>,
pub(super) connections: HashMap<ConnectionHandle, Connection>,
conn_events: HashMap<ConnectionHandle, VecDeque<ConnectionEvent>>,
conn_events: HashMap<ConnectionHandle, VecDeque<ConnectionDatagram>>,
}

impl TestEndpoint {
Expand Down Expand Up @@ -337,7 +337,7 @@ impl TestEndpoint {

for (_, mut events) in self.conn_events.drain() {
for event in events.drain(..) {
conn.handle_event(event, &self.endpoint);
conn.handle(event, &self.endpoint);
}
}

Expand Down
4 changes: 2 additions & 2 deletions quinn/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -906,8 +906,8 @@ impl State {
Poll::Ready(Some(ConnectionEvent::Ping)) => {
self.inner.ping();
}
Poll::Ready(Some(ConnectionEvent::Proto(event))) => {
self.inner.handle_event(event, &shared.endpoint);
Poll::Ready(Some(ConnectionEvent::Datagram(datagram))) => {
self.inner.handle(datagram, &shared.endpoint);
}
Poll::Ready(Some(ConnectionEvent::Close { reason, error_code })) => {
self.close(error_code, reason, shared);
Expand Down
4 changes: 2 additions & 2 deletions quinn/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,14 +419,14 @@ impl State {
);
self.incoming.push_back(conn);
}
Some(DatagramEvent::ConnectionEvent(handle, event)) => {
Some(DatagramEvent::ConnectionEvent(handle, datagram)) => {
// Ignoring errors from dropped connections that haven't yet been cleaned up
let _ = self
.connections
.senders
.get_mut(&handle)
.unwrap()
.send(ConnectionEvent::Proto(event));
.send(ConnectionEvent::Datagram(datagram));
}
Some(DatagramEvent::Response(t)) => {
self.outgoing.push_back(udp_transmit(t));
Expand Down
2 changes: 1 addition & 1 deletion quinn/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ enum ConnectionEvent {
error_code: VarInt,
reason: bytes::Bytes,
},
Proto(proto::ConnectionEvent),
Datagram(proto::ConnectionDatagram),
Ping,
}

Expand Down

0 comments on commit 23afc97

Please sign in to comment.