Skip to content

Commit

Permalink
Fix #275
Browse files Browse the repository at this point in the history
  • Loading branch information
c410-f3r committed Nov 26, 2024
1 parent 1e2e572 commit a66f432
Show file tree
Hide file tree
Showing 24 changed files with 962 additions and 575 deletions.
6 changes: 3 additions & 3 deletions wtx-docs/src/web-socket/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

Implementation of [RFC6455](https://datatracker.ietf.org/doc/html/rfc6455) and [RFC7692](https://datatracker.ietf.org/doc/html/rfc7692). WebSocket is a communication protocol that enables full-duplex communication between a client (typically a web browser) and a server over a single TCP connection. Unlike traditional HTTP, which is request-response based, WebSocket allows real-time data exchange without the need for polling.

In-house benchmarks are available at <https://c410-f3r.github.io/wtx-bench>. If you are aware of other benchmark tools, please open an discussion in the GitHub project.
In-house benchmarks are available at <https://c410-f3r.github.io/wtx-bench>. If you are aware of other benchmark tools, please open a discussion in the GitHub project.

To use this functionality, it is necessary to activate the `web-socket` feature.

Expand All @@ -28,11 +28,11 @@ To make everything work as intended both parties, client and server, need to imp
## Client Example

```rust,edition2021,no_run
{{#rustdoc_include ../../../wtx-instances/generic-examples/web-socket-client.rs}}
{{#rustdoc_include ../../../wtx-instances/web-socket-examples/web-socket-client.rs}}
```

## Server Example

```rust,edition2021,no_run
{{#rustdoc_include ../../../wtx-instances/generic-examples/web-socket-server.rs}}
{{#rustdoc_include ../../../wtx-instances/web-socket-examples/web-socket-server.rs}}
```
29 changes: 18 additions & 11 deletions wtx-instances/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ required-features = ["grpc"]
[[example]]
name = "grpc-server"
path = "generic-examples/grpc-server.rs"
required-features = ["grpc", "wtx/tokio-rustls", "wtx/webpki-roots"]
required-features = ["grpc", "wtx/tokio-rustls"]

[[example]]
name = "http-client-framework"
Expand All @@ -69,16 +69,6 @@ name = "pool"
path = "generic-examples/pool.rs"
required-features = ["wtx/pool"]

[[example]]
name = "web-socket-client"
path = "generic-examples/web-socket-client.rs"
required-features = ["wtx/web-socket-handshake", "wtx/webpki-roots"]

[[example]]
name = "web-socket-server"
path = "generic-examples/web-socket-server.rs"
required-features = ["tokio-rustls", "wtx/pool", "wtx/tokio-rustls", "wtx/web-socket-handshake"]

# HTTP Server Framework Examples

[[example]]
Expand Down Expand Up @@ -118,6 +108,23 @@ name = "http2-web-socket"
path = "http2-examples/http2-web-socket.rs"
required-features = ["wtx/http2", "wtx/tokio-rustls", "wtx/web-socket"]

# WebSocket Examples

[[example]]
name = "web-socket-client"
path = "web-socket-examples/web-socket-client.rs"
required-features = ["wtx/web-socket-handshake"]

[[example]]
name = "web-socket-concurrent-client"
path = "web-socket-examples/web-socket-concurrent-client.rs"
required-features = ["wtx/tokio-rustls", "wtx/web-socket-handshake", "wtx/webpki-roots"]

[[example]]
name = "web-socket-server"
path = "web-socket-examples/web-socket-server.rs"
required-features = ["tokio-rustls", "wtx/pool", "wtx/tokio-rustls", "wtx/web-socket-handshake"]

[build-dependencies]
pb-rs = { default-features = false, optional = true, version = "0.10" }

Expand Down
88 changes: 44 additions & 44 deletions wtx-instances/generic-examples/client-api-framework.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,52 +80,52 @@ mod generic_web_socket_subscription {
pub type GenericWebSocketSubscriptionRes = u64;
}

#[tokio::main]
async fn main() -> wtx::Result<()> {
async fn http_pair(
) -> Pair<PkgsAux<GenericThrottlingApi, SerdeJson, HttpParams>, ClientFrameworkTokio> {
Pair::new(
PkgsAux::from_minimum(
GenericThrottlingApi {
rt: RequestThrottling::from_rl(RequestLimit::new(5, Duration::from_secs(1))),
},
SerdeJson,
HttpParams::from_uri("ws://generic_web_socket_uri.com".into()),
),
ClientFrameworkTokio::tokio(1).build(),
)
}
async fn http_pair(
) -> Pair<PkgsAux<GenericThrottlingApi, SerdeJson, HttpParams>, ClientFrameworkTokio> {
Pair::new(
PkgsAux::from_minimum(
GenericThrottlingApi {
rt: RequestThrottling::from_rl(RequestLimit::new(5, Duration::from_secs(1))),
},
SerdeJson,
HttpParams::from_uri("ws://generic_web_socket_uri.com".into()),
),
ClientFrameworkTokio::tokio(1).build(),
)
}

async fn web_socket_pair() -> wtx::Result<
Pair<
PkgsAux<GenericThrottlingApi, SerdeJson, WsParams>,
WebSocketClient<(), TcpStream, WebSocketBuffer>,
>,
> {
let uri = Uri::new("ws://generic_web_socket_uri.com");
let web_socket = WebSocketClient::connect(
(),
[],
false,
Xorshift64::from(simple_seed()),
TcpStream::connect(uri.hostname_with_implied_port()).await?,
&uri,
WebSocketBuffer::default(),
|_| wtx::Result::Ok(()),
)
.await?;
Ok(Pair::new(
PkgsAux::from_minimum(
GenericThrottlingApi {
rt: RequestThrottling::from_rl(RequestLimit::new(40, Duration::from_secs(2))),
},
SerdeJson,
WsParams::default(),
),
web_socket,
))
}
async fn web_socket_pair() -> wtx::Result<
Pair<
PkgsAux<GenericThrottlingApi, SerdeJson, WsParams>,
WebSocketClient<(), TcpStream, WebSocketBuffer>,
>,
> {
let uri = Uri::new("ws://generic_web_socket_uri.com");
let web_socket = WebSocketClient::connect(
(),
[],
false,
Xorshift64::from(simple_seed()),
TcpStream::connect(uri.hostname_with_implied_port()).await?,
&uri,
WebSocketBuffer::default(),
|_| wtx::Result::Ok(()),
)
.await?;
Ok(Pair::new(
PkgsAux::from_minimum(
GenericThrottlingApi {
rt: RequestThrottling::from_rl(RequestLimit::new(40, Duration::from_secs(2))),
},
SerdeJson,
WsParams::default(),
),
web_socket,
))
}

#[tokio::main]
async fn main() -> wtx::Result<()> {
let mut hp = http_pair().await;
let _http_response_tuple = hp
.trans
Expand Down
2 changes: 1 addition & 1 deletion wtx-instances/src/bin/autobahn-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ async fn main() -> wtx::Result<()> {
|_| wtx::Result::Ok(()),
)
.await?;
let (mut common, mut reader, mut writer) = ws.parts();
let (mut common, mut reader, mut writer) = ws.parts_mut();
loop {
let mut frame = match reader.read_frame(&mut common).await {
Err(_err) => {
Expand Down
2 changes: 1 addition & 1 deletion wtx-instances/src/bin/autobahn-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ async fn main() -> wtx::Result<()> {
async fn handle(
mut ws: WebSocketServer<Option<NegotiatedFlate2>, TcpStream, &mut WebSocketBuffer>,
) -> wtx::Result<()> {
let (mut common, mut reader, mut writer) = ws.parts();
let (mut common, mut reader, mut writer) = ws.parts_mut();
loop {
let mut frame = reader.read_frame(&mut common).await?;
match frame.op_code() {
Expand Down
50 changes: 50 additions & 0 deletions wtx-instances/web-socket-examples/web-socket-concurrent-client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
//! WebSocket client that reads and writes frames in different tasks.
extern crate tokio;
extern crate wtx;
extern crate wtx_instances;

use tokio::{net::TcpStream, sync::Mutex};
use wtx::{
misc::{simple_seed, Arc, TokioRustlsConnector, Uri, Xorshift64},
web_socket::{Frame, OpCode, WebSocketBuffer, WebSocketClient},
};

#[tokio::main]
async fn main() -> wtx::Result<()> {
let uri = Uri::new("ws://www.example.com");
let connector = TokioRustlsConnector::from_auto()?;
let stream = TcpStream::connect(uri.hostname_with_implied_port()).await?;
let ws = WebSocketClient::connect(
(),
[],
false,
Xorshift64::from(simple_seed()),
connector.connect_without_client_auth(uri.hostname(), stream).await?,
&uri.to_ref(),
WebSocketBuffer::default(),
|_| wtx::Result::Ok(()),
)
.await?;
let (mut reader, mut writer) = ws.into_parts::<Arc<Mutex<_>>, _, _>(|el| tokio::io::split(el));
let reader_jh = tokio::spawn(async move {
loop {
let frame = reader.read_frame().await?;
match (frame.op_code(), frame.text_payload()) {
(_, Some(elem)) => println!("{elem}"),
(OpCode::Close, _) => break,
_ => {}
}
}
wtx::Result::Ok(())
});
let writer_jh = tokio::spawn(async move {
writer.write_frame(&mut Frame::new_fin(OpCode::Text, *b"Hi and Bye")).await?;
writer.write_frame(&mut Frame::new_fin(OpCode::Close, [])).await?;
wtx::Result::Ok(())
});
let (reader_rslt, writer_rslt) = tokio::join!(reader_jh, writer_jh);
reader_rslt??;
writer_rslt??;
Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ async fn main() -> wtx::Result<()> {
async fn handle(
mut ws: WebSocketServer<(), TlsStream<TcpStream>, &mut WebSocketBuffer>,
) -> wtx::Result<()> {
let (mut common, mut reader, mut writer) = ws.parts();
let (mut common, mut reader, mut writer) = ws.parts_mut();
loop {
let mut frame = reader.read_frame(&mut common).await?;
match frame.op_code() {
Expand Down
2 changes: 1 addition & 1 deletion wtx/src/http2/web_socket_over_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ where
Http2RecvStatus::Ongoing(data) => (data, false),
};
let mut slice = data.as_slice();
let rfi = ReadFrameInfo::from_bytes::<_, false>(&mut slice, usize::MAX, &(), no_masking)?;
let rfi = ReadFrameInfo::from_bytes::<false>(&mut slice, usize::MAX, (true, 0), no_masking)?;
let before = buffer.len();
buffer.extend_from_copyable_slice(slice)?;
unmask_nb::<false>(buffer.get_mut(before..).unwrap_or_default(), no_masking, &rfi)?;
Expand Down
6 changes: 3 additions & 3 deletions wtx/src/misc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,14 +293,14 @@ where
}

#[inline]
pub(crate) async fn _read_payload<S>(
pub(crate) async fn _read_payload<SR>(
(header_len, payload_len): (usize, usize),
network_buffer: &mut PartitionedFilledBuffer,
read: &mut usize,
stream: &mut S,
stream: &mut SR,
) -> crate::Result<()>
where
S: StreamReader,
SR: StreamReader,
{
let frame_len = header_len.wrapping_add(payload_len);
network_buffer._reserve(frame_len)?;
Expand Down
16 changes: 16 additions & 0 deletions wtx/src/misc/connection_state.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::misc::{Lease, LeaseMut};

/// The state of a connection between two parties.
#[derive(Clone, Copy, Debug)]
pub enum ConnectionState {
Expand All @@ -21,6 +23,20 @@ impl ConnectionState {
}
}

impl Lease<ConnectionState> for ConnectionState {
#[inline]
fn lease(&self) -> &ConnectionState {
self
}
}

impl LeaseMut<ConnectionState> for ConnectionState {
#[inline]
fn lease_mut(&mut self) -> &mut ConnectionState {
self
}
}

impl From<bool> for ConnectionState {
#[inline]
fn from(from: bool) -> Self {
Expand Down
16 changes: 15 additions & 1 deletion wtx/src/misc/partitioned_filled_buffer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::misc::{FilledBuffer, FilledBufferWriter, VectorError};
use crate::misc::{FilledBuffer, FilledBufferWriter, Lease, LeaseMut, VectorError};
use core::ops::Range;

// ```
Expand Down Expand Up @@ -174,6 +174,20 @@ impl PartitionedFilledBuffer {
}
}

impl Lease<PartitionedFilledBuffer> for PartitionedFilledBuffer {
#[inline]
fn lease(&self) -> &PartitionedFilledBuffer {
self
}
}

impl LeaseMut<PartitionedFilledBuffer> for PartitionedFilledBuffer {
#[inline]
fn lease_mut(&mut self) -> &mut PartitionedFilledBuffer {
self
}
}

impl Default for PartitionedFilledBuffer {
#[inline]
fn default() -> Self {
Expand Down
16 changes: 15 additions & 1 deletion wtx/src/misc/rng/xorshift.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::misc::{AtomicU64, Rng};
use crate::misc::{AtomicU64, Lease, LeaseMut, Rng};
use core::sync::atomic::Ordering;

/// Xorshift that deals with 64 bits numbers.
Expand Down Expand Up @@ -29,6 +29,20 @@ impl Rng for Xorshift64 {
}
}

impl Lease<Xorshift64> for Xorshift64 {
#[inline]
fn lease(&self) -> &Xorshift64 {
self
}
}

impl LeaseMut<Xorshift64> for Xorshift64 {
#[inline]
fn lease_mut(&mut self) -> &mut Xorshift64 {
self
}
}

impl From<u64> for Xorshift64 {
#[inline]
fn from(value: u64) -> Self {
Expand Down
23 changes: 0 additions & 23 deletions wtx/src/misc/stream/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,6 @@ impl StreamReader for TcpStream {
}
}

#[cfg(unix)]
impl StreamReader for tokio::net::UnixStream {
#[inline]
async fn read(&mut self, bytes: &mut [u8]) -> crate::Result<usize> {
Ok(<Self as AsyncReadExt>::read(self, bytes).await?)
}
}

impl StreamWriter for OwnedWriteHalf {
#[inline]
async fn write_all(&mut self, bytes: &[u8]) -> crate::Result<()> {
Expand Down Expand Up @@ -83,18 +75,3 @@ impl StreamWriter for TcpStream {
Ok(())
}
}

#[cfg(unix)]
impl StreamWriter for tokio::net::UnixStream {
#[inline]
async fn write_all(&mut self, bytes: &[u8]) -> crate::Result<()> {
<Self as AsyncWriteExt>::write_all(self, bytes).await?;
Ok(())
}

#[inline]
async fn write_all_vectored(&mut self, bytes: &[&[u8]]) -> crate::Result<()> {
_local_write_all_vectored!(bytes, self, |io_slices| self.write_vectored(io_slices).await);
Ok(())
}
}
Loading

0 comments on commit a66f432

Please sign in to comment.