Skip to content

Commit

Permalink
Update quinn and rustls
Browse files Browse the repository at this point in the history
rustls: 0.20.6 -> 0.21.7
quinn: 0.8.3 -> 0.10.2
  • Loading branch information
patowen committed Oct 27, 2023
1 parent 40a5a1e commit a3a6704
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 60 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ members = ["client", "server", "common", "save", "save/gen-protos"]
[workspace.dependencies]
hecs = "0.10.0"
nalgebra = { version = "0.32.1", features = ["libm-force"] }
quinn = "0.8.3"
quinn = "0.10.2"
toml = { version = "0.8.0", default-features = false, features = ["parse"] }

[profile.dev]
Expand Down
2 changes: 1 addition & 1 deletion client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ fxhash = "0.2.1"
downcast-rs = "1.1.1"
quinn = { workspace = true }
futures-util = "0.3.1"
rustls = { version = "0.20.6", features = ["dangerous_configuration"] }
rustls = { version = "0.21.7", features = ["dangerous_configuration"] }
webpki = "0.22.0"
hecs = { workspace = true }
rcgen = { version = "0.11.0", default-features = false }
Expand Down
49 changes: 27 additions & 22 deletions client/src/net.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::{sync::Arc, thread};

use anyhow::{anyhow, Error, Result};
use futures_util::{StreamExt, TryStreamExt};
use tokio::sync::mpsc;

use common::{codec, proto};
Expand Down Expand Up @@ -63,16 +62,12 @@ async fn inner(
endpoint: quinn::Endpoint,
) -> Result<()> {
let server = cfg.server.unwrap();
let quinn::NewConnection {
connection,
mut uni_streams,
..
} = endpoint.connect(server, "localhost").unwrap().await?;
let connection = endpoint.connect(server, "localhost").unwrap().await?;

// Open the first stream for our hello message
let clienthello_stream = connection.open_uni().await?;
// Start sending commands asynchronously
tokio::spawn(handle_outgoing(outgoing, connection));
tokio::spawn(handle_outgoing(outgoing, connection.clone()));
// Actually send the hello message
codec::send_whole(
clienthello_stream,
Expand All @@ -82,9 +77,9 @@ async fn inner(
)
.await?;

let mut ordered = uni_streams.next().await.unwrap()?;
let mut ordered = connection.accept_uni().await?;
// Handle unordered messages
tokio::spawn(handle_unordered(incoming.clone(), uni_streams));
tokio::spawn(handle_unordered(incoming.clone(), connection));

// Receive the server's hello message
let hello = codec::recv::<proto::ServerHello>(&mut ordered)
Expand Down Expand Up @@ -118,21 +113,31 @@ async fn handle_outgoing(
/// Receive unordered messages from the server
async fn handle_unordered(
incoming: mpsc::UnboundedSender<Message>,
uni_streams: quinn::IncomingUniStreams,
connection: quinn::Connection,
) -> Result<()> {
let mut msgs = uni_streams
.map(|stream| async {
let stream = stream?;
codec::recv_whole::<proto::StateDelta>(2usize.pow(16), stream).await
})
.buffer_unordered(128);
// TODO: Don't silently die on parse errors
while let Some(msg) = msgs.try_next().await? {
// Ignore errors so we don't panic if the simulation thread goes away between checking
// `msgs` and here.
let _ = incoming.send(Message::StateDelta(msg));
loop {
let stream = connection.accept_uni().await;
let stream = match stream {
Err(quinn::ConnectionError::ApplicationClosed { .. }) => {
return Ok(());
}
Err(e) => {
return Err(e.into());
}
Ok(s) => s,
};
let incoming = incoming.clone();
tokio::spawn(async move {
match codec::recv_whole::<proto::StateDelta>(2usize.pow(16), stream).await {
Err(e) => {
let _ = incoming.send(Message::ConnectionLost(e));
}
Ok(msg) => {
let _ = incoming.send(Message::StateDelta(msg));
}
}
});
}
Ok(())
}

struct AcceptAnyCert;
Expand Down
2 changes: 1 addition & 1 deletion common/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub async fn send_whole<T: Serialize + ?Sized>(
/// Receive the entirety of `stream` as a `T`
pub async fn recv_whole<T: DeserializeOwned>(
size_limit: usize,
stream: quinn::RecvStream,
mut stream: quinn::RecvStream,
) -> Result<T> {
let buf = stream.read_to_end(size_limit).await?;
Ok(bincode::deserialize(&buf)?)
Expand Down
2 changes: 1 addition & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@ fxhash = "0.2.1"
nalgebra = { workspace = true }
libm = "0.2.6"
slotmap = "1.0.6"
rustls = "0.20.6"
rustls = "0.21.7"
rustls-pemfile = "1.0.0"
save = { path = "../save" }
85 changes: 51 additions & 34 deletions server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ mod sim;
use std::{net::UdpSocket, sync::Arc, time::Instant};

use anyhow::{Context, Error, Result};
use futures::{select, StreamExt, TryStreamExt};
use futures::{select, StreamExt};
use hecs::Entity;
use slotmap::DenseSlotMap;
use tokio::sync::mpsc;
Expand All @@ -32,15 +32,16 @@ pub async fn run(net: NetParams, mut sim: SimConfig, save: Save) -> Result<()> {
let server_config =
quinn::ServerConfig::with_single_cert(net.certificate_chain, net.private_key)
.context("parsing certificate")?;
let (endpoint, incoming) = quinn::Endpoint::new(
let endpoint = quinn::Endpoint::new(
quinn::EndpointConfig::default(),
Some(server_config),
net.socket,
quinn::default_runtime().unwrap(),
)?;
info!(address = %endpoint.local_addr().unwrap(), "listening");

let server = Server::new(sim, save);
server.run(incoming).await;
server.run(endpoint).await;
Ok(())
}

Expand All @@ -62,11 +63,9 @@ impl Server {
}
}

async fn run(mut self, incoming: quinn::Incoming) {
async fn run(mut self, endpoint: quinn::Endpoint) {
let mut ticks = IntervalStream::new(tokio::time::interval(self.cfg.step_interval)).fuse();
let mut incoming = incoming
.inspect(|x| trace!(address = %x.remote_address(), "connection incoming"))
.buffer_unordered(16);
let mut incoming = ReceiverStream::new(self.handle_incoming(endpoint)).fuse();
let (client_events_send, client_events) = mpsc::channel(128);
let mut client_events = ReceiverStream::new(client_events).fuse();
loop {
Expand All @@ -78,6 +77,27 @@ impl Server {
}
}

fn handle_incoming(&self, endpoint: quinn::Endpoint) -> mpsc::Receiver<quinn::Connection> {
let (incoming_send, incoming_recv) = mpsc::channel(16);
tokio::spawn(async move {
while let Some(conn) = endpoint.accept().await {
trace!(address = %conn.remote_address(), "connection incoming");
let incoming_send = incoming_send.clone();
tokio::spawn(async move {
match conn.await {
Err(e) => {
error!("incoming connection failed: {}", e.to_string());
}
Ok(connection) => {
let _ = incoming_send.send(connection).await;
}
}
});
}
});
incoming_recv
}

fn on_step(&mut self) {
let now = Instant::now();
// Apply queued inputs
Expand Down Expand Up @@ -186,24 +206,13 @@ impl Server {

fn on_connect(
&mut self,
conn: Result<quinn::NewConnection, quinn::ConnectionError>,
connection: quinn::Connection,
mut send: mpsc::Sender<(ClientId, ClientEvent)>,
) {
let quinn::NewConnection {
connection,
uni_streams,
..
} = match conn {
Ok(x) => x,
Err(e) => {
error!("incoming connection failed: {}", e);
return;
}
};
let id = self.clients.insert(Client::new(connection.clone()));
info!(id = ?id.0, address = %connection.remote_address(), "connection established");
tokio::spawn(async move {
if let Err(e) = drive_recv(id, uni_streams, &mut send).await {
if let Err(e) = drive_recv(id, connection, &mut send).await {
let _ = send.send((id, ClientEvent::Lost(e))).await;
}
});
Expand All @@ -214,26 +223,34 @@ const MAX_CLIENT_MSG_SIZE: usize = 1 << 16;

async fn drive_recv(
id: ClientId,
mut streams: quinn::IncomingUniStreams,
connection: quinn::Connection,
send: &mut mpsc::Sender<(ClientId, ClientEvent)>,
) -> Result<()> {
let hello = match streams.next().await {
None => return Ok(()),
Some(stream) => {
codec::recv_whole::<proto::ClientHello>(MAX_CLIENT_MSG_SIZE, stream?).await?
}
let hello = match connection.accept_uni().await {
Err(quinn::ConnectionError::ApplicationClosed(_)) => return Ok(()),
Err(e) => return Err(e.into()),
Ok(stream) => codec::recv_whole::<proto::ClientHello>(MAX_CLIENT_MSG_SIZE, stream).await?,
};
let _ = send.send((id, ClientEvent::Hello(hello))).await;

let mut cmds = streams
.map(|stream| async {
codec::recv_whole::<proto::Command>(MAX_CLIENT_MSG_SIZE, stream?).await
})
.buffer_unordered(16); // Allow a modest amount of out-of-order completion
while let Some(msg) = cmds.try_next().await? {
let _ = send.send((id, ClientEvent::Command(msg))).await;
loop {
let stream = match connection.accept_uni().await {
Err(quinn::ConnectionError::ApplicationClosed(_)) => return Ok(()),
Err(e) => return Err(e.into()),
Ok(stream) => stream,
};
let send = send.clone();
tokio::spawn(async move {
match codec::recv_whole::<proto::Command>(MAX_CLIENT_MSG_SIZE, stream).await {
Err(e) => {
let _ = send.send((id, ClientEvent::Lost(e))).await;
}
Ok(msg) => {
let _ = send.send((id, ClientEvent::Command(msg))).await;
}
}
});
}
Ok(())
}

async fn drive_send(
Expand Down

0 comments on commit a3a6704

Please sign in to comment.