Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #275 #276

Merged
merged 2 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions wtx-docs/src/web-socket/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

Implementation of [RFC6455](https://datatracker.ietf.org/doc/html/rfc6455) and [RFC7692](https://datatracker.ietf.org/doc/html/rfc7692). WebSocket is a communication protocol that enables full-duplex communication between a client (typically a web browser) and a server over a single TCP connection. Unlike traditional HTTP, which is request-response based, WebSocket allows real-time data exchange without the need for polling.

In-house benchmarks are available at <https://c410-f3r.github.io/wtx-bench>. If you are aware of other benchmark tools, please open an discussion in the GitHub project.
In-house benchmarks are available at <https://c410-f3r.github.io/wtx-bench>. If you are aware of other benchmark tools, please open a discussion in the GitHub project.

To use this functionality, it is necessary to activate the `web-socket` feature.

Expand All @@ -28,11 +28,11 @@ To make everything work as intended both parties, client and server, need to imp
## Client Example

```rust,edition2021,no_run
{{#rustdoc_include ../../../wtx-instances/generic-examples/web-socket-client.rs}}
{{#rustdoc_include ../../../wtx-instances/web-socket-examples/web-socket-client.rs}}
```

## Server Example

```rust,edition2021,no_run
{{#rustdoc_include ../../../wtx-instances/generic-examples/web-socket-server.rs}}
{{#rustdoc_include ../../../wtx-instances/web-socket-examples/web-socket-server.rs}}
```
29 changes: 18 additions & 11 deletions wtx-instances/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ required-features = ["grpc"]
[[example]]
name = "grpc-server"
path = "generic-examples/grpc-server.rs"
required-features = ["grpc", "wtx/tokio-rustls", "wtx/webpki-roots"]
required-features = ["grpc", "wtx/tokio-rustls"]

[[example]]
name = "http-client-framework"
Expand All @@ -69,16 +69,6 @@ name = "pool"
path = "generic-examples/pool.rs"
required-features = ["wtx/pool"]

[[example]]
name = "web-socket-client"
path = "generic-examples/web-socket-client.rs"
required-features = ["wtx/web-socket-handshake", "wtx/webpki-roots"]

[[example]]
name = "web-socket-server"
path = "generic-examples/web-socket-server.rs"
required-features = ["tokio-rustls", "wtx/pool", "wtx/tokio-rustls", "wtx/web-socket-handshake"]

# HTTP Server Framework Examples

[[example]]
Expand Down Expand Up @@ -118,6 +108,23 @@ name = "http2-web-socket"
path = "http2-examples/http2-web-socket.rs"
required-features = ["wtx/http2", "wtx/tokio-rustls", "wtx/web-socket"]

# WebSocket Examples

[[example]]
name = "web-socket-client"
path = "web-socket-examples/web-socket-client.rs"
required-features = ["wtx/web-socket-handshake"]

[[example]]
name = "web-socket-concurrent-client"
path = "web-socket-examples/web-socket-concurrent-client.rs"
required-features = ["wtx/tokio-rustls", "wtx/web-socket-handshake", "wtx/webpki-roots"]

[[example]]
name = "web-socket-server"
path = "web-socket-examples/web-socket-server.rs"
required-features = ["tokio-rustls", "wtx/pool", "wtx/tokio-rustls", "wtx/web-socket-handshake"]

[build-dependencies]
pb-rs = { default-features = false, optional = true, version = "0.10" }

Expand Down
88 changes: 44 additions & 44 deletions wtx-instances/generic-examples/client-api-framework.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,52 +80,52 @@ mod generic_web_socket_subscription {
pub type GenericWebSocketSubscriptionRes = u64;
}

#[tokio::main]
async fn main() -> wtx::Result<()> {
async fn http_pair(
) -> Pair<PkgsAux<GenericThrottlingApi, SerdeJson, HttpParams>, ClientFrameworkTokio> {
Pair::new(
PkgsAux::from_minimum(
GenericThrottlingApi {
rt: RequestThrottling::from_rl(RequestLimit::new(5, Duration::from_secs(1))),
},
SerdeJson,
HttpParams::from_uri("ws://generic_web_socket_uri.com".into()),
),
ClientFrameworkTokio::tokio(1).build(),
)
}
async fn http_pair(
) -> Pair<PkgsAux<GenericThrottlingApi, SerdeJson, HttpParams>, ClientFrameworkTokio> {
Pair::new(
PkgsAux::from_minimum(
GenericThrottlingApi {
rt: RequestThrottling::from_rl(RequestLimit::new(5, Duration::from_secs(1))),
},
SerdeJson,
HttpParams::from_uri("ws://generic_web_socket_uri.com".into()),
),
ClientFrameworkTokio::tokio(1).build(),
)
}

async fn web_socket_pair() -> wtx::Result<
Pair<
PkgsAux<GenericThrottlingApi, SerdeJson, WsParams>,
WebSocketClient<(), TcpStream, WebSocketBuffer>,
>,
> {
let uri = Uri::new("ws://generic_web_socket_uri.com");
let web_socket = WebSocketClient::connect(
(),
[],
false,
Xorshift64::from(simple_seed()),
TcpStream::connect(uri.hostname_with_implied_port()).await?,
&uri,
WebSocketBuffer::default(),
|_| wtx::Result::Ok(()),
)
.await?;
Ok(Pair::new(
PkgsAux::from_minimum(
GenericThrottlingApi {
rt: RequestThrottling::from_rl(RequestLimit::new(40, Duration::from_secs(2))),
},
SerdeJson,
WsParams::default(),
),
web_socket,
))
}
async fn web_socket_pair() -> wtx::Result<
Pair<
PkgsAux<GenericThrottlingApi, SerdeJson, WsParams>,
WebSocketClient<(), TcpStream, WebSocketBuffer>,
>,
> {
let uri = Uri::new("ws://generic_web_socket_uri.com");
let web_socket = WebSocketClient::connect(
(),
[],
false,
Xorshift64::from(simple_seed()),
TcpStream::connect(uri.hostname_with_implied_port()).await?,
&uri,
WebSocketBuffer::default(),
|_| wtx::Result::Ok(()),
)
.await?;
Ok(Pair::new(
PkgsAux::from_minimum(
GenericThrottlingApi {
rt: RequestThrottling::from_rl(RequestLimit::new(40, Duration::from_secs(2))),
},
SerdeJson,
WsParams::default(),
),
web_socket,
))
}

#[tokio::main]
async fn main() -> wtx::Result<()> {
let mut hp = http_pair().await;
let _http_response_tuple = hp
.trans
Expand Down
2 changes: 1 addition & 1 deletion wtx-instances/src/bin/autobahn-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ async fn main() -> wtx::Result<()> {
|_| wtx::Result::Ok(()),
)
.await?;
let (mut common, mut reader, mut writer) = ws.parts();
let (mut common, mut reader, mut writer) = ws.parts_mut();
loop {
let mut frame = match reader.read_frame(&mut common).await {
Err(_err) => {
Expand Down
2 changes: 1 addition & 1 deletion wtx-instances/src/bin/autobahn-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ async fn main() -> wtx::Result<()> {
async fn handle(
mut ws: WebSocketServer<Option<NegotiatedFlate2>, TcpStream, &mut WebSocketBuffer>,
) -> wtx::Result<()> {
let (mut common, mut reader, mut writer) = ws.parts();
let (mut common, mut reader, mut writer) = ws.parts_mut();
loop {
let mut frame = reader.read_frame(&mut common).await?;
match frame.op_code() {
Expand Down
50 changes: 50 additions & 0 deletions wtx-instances/web-socket-examples/web-socket-concurrent-client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
//! WebSocket client that reads and writes frames in different tasks.

extern crate tokio;
extern crate wtx;
extern crate wtx_instances;

use tokio::{net::TcpStream, sync::Mutex};
use wtx::{
misc::{simple_seed, Arc, TokioRustlsConnector, Uri, Xorshift64},
web_socket::{Frame, OpCode, WebSocketBuffer, WebSocketClient},
};

#[tokio::main]
async fn main() -> wtx::Result<()> {
let uri = Uri::new("ws://www.example.com");
let connector = TokioRustlsConnector::from_auto()?.push_certs(wtx_instances::ROOT_CA)?;
let stream = TcpStream::connect(uri.hostname_with_implied_port()).await?;
let ws = WebSocketClient::connect(
(),
[],
false,
Xorshift64::from(simple_seed()),
connector.connect_without_client_auth(uri.hostname(), stream).await?,
&uri.to_ref(),
WebSocketBuffer::default(),
|_| wtx::Result::Ok(()),
)
.await?;
let (mut reader, mut writer) = ws.into_parts::<Arc<Mutex<_>>, _, _>(|el| tokio::io::split(el));
let reader_jh = tokio::spawn(async move {
loop {
let frame = reader.read_frame().await?;
match (frame.op_code(), frame.text_payload()) {
(_, Some(elem)) => println!("{elem}"),
(OpCode::Close, _) => break,
_ => {}
}
}
wtx::Result::Ok(())
});
let writer_jh = tokio::spawn(async move {
writer.write_frame(&mut Frame::new_fin(OpCode::Text, *b"Hi and Bye")).await?;
writer.write_frame(&mut Frame::new_fin(OpCode::Close, [])).await?;
wtx::Result::Ok(())
});
let (reader_rslt, writer_rslt) = tokio::join!(reader_jh, writer_jh);
reader_rslt??;
writer_rslt??;
Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ async fn main() -> wtx::Result<()> {
async fn handle(
mut ws: WebSocketServer<(), TlsStream<TcpStream>, &mut WebSocketBuffer>,
) -> wtx::Result<()> {
let (mut common, mut reader, mut writer) = ws.parts();
let (mut common, mut reader, mut writer) = ws.parts_mut();
loop {
let mut frame = reader.read_frame(&mut common).await?;
match frame.op_code() {
Expand Down
2 changes: 1 addition & 1 deletion wtx/src/http2/web_socket_over_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ where
Http2RecvStatus::Ongoing(data) => (data, false),
};
let mut slice = data.as_slice();
let rfi = ReadFrameInfo::from_bytes::<_, false>(&mut slice, usize::MAX, &(), no_masking)?;
let rfi = ReadFrameInfo::from_bytes::<false>(&mut slice, usize::MAX, (true, 0), no_masking)?;
let before = buffer.len();
buffer.extend_from_copyable_slice(slice)?;
unmask_nb::<false>(buffer.get_mut(before..).unwrap_or_default(), no_masking, &rfi)?;
Expand Down
6 changes: 3 additions & 3 deletions wtx/src/misc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,14 +293,14 @@ where
}

#[inline]
pub(crate) async fn _read_payload<S>(
pub(crate) async fn _read_payload<SR>(
(header_len, payload_len): (usize, usize),
network_buffer: &mut PartitionedFilledBuffer,
read: &mut usize,
stream: &mut S,
stream: &mut SR,
) -> crate::Result<()>
where
S: StreamReader,
SR: StreamReader,
{
let frame_len = header_len.wrapping_add(payload_len);
network_buffer._reserve(frame_len)?;
Expand Down
16 changes: 16 additions & 0 deletions wtx/src/misc/connection_state.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::misc::{Lease, LeaseMut};

/// The state of a connection between two parties.
#[derive(Clone, Copy, Debug)]
pub enum ConnectionState {
Expand All @@ -21,6 +23,20 @@ impl ConnectionState {
}
}

impl Lease<ConnectionState> for ConnectionState {
#[inline]
fn lease(&self) -> &ConnectionState {
self
}
}

impl LeaseMut<ConnectionState> for ConnectionState {
#[inline]
fn lease_mut(&mut self) -> &mut ConnectionState {
self
}
}

impl From<bool> for ConnectionState {
#[inline]
fn from(from: bool) -> Self {
Expand Down
16 changes: 15 additions & 1 deletion wtx/src/misc/partitioned_filled_buffer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::misc::{FilledBuffer, FilledBufferWriter, VectorError};
use crate::misc::{FilledBuffer, FilledBufferWriter, Lease, LeaseMut, VectorError};
use core::ops::Range;

// ```
Expand Down Expand Up @@ -174,6 +174,20 @@ impl PartitionedFilledBuffer {
}
}

impl Lease<PartitionedFilledBuffer> for PartitionedFilledBuffer {
#[inline]
fn lease(&self) -> &PartitionedFilledBuffer {
self
}
}

impl LeaseMut<PartitionedFilledBuffer> for PartitionedFilledBuffer {
#[inline]
fn lease_mut(&mut self) -> &mut PartitionedFilledBuffer {
self
}
}

impl Default for PartitionedFilledBuffer {
#[inline]
fn default() -> Self {
Expand Down
16 changes: 15 additions & 1 deletion wtx/src/misc/rng/xorshift.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::misc::{AtomicU64, Rng};
use crate::misc::{AtomicU64, Lease, LeaseMut, Rng};
use core::sync::atomic::Ordering;

/// Xorshift that deals with 64 bits numbers.
Expand Down Expand Up @@ -29,6 +29,20 @@ impl Rng for Xorshift64 {
}
}

impl Lease<Xorshift64> for Xorshift64 {
#[inline]
fn lease(&self) -> &Xorshift64 {
self
}
}

impl LeaseMut<Xorshift64> for Xorshift64 {
#[inline]
fn lease_mut(&mut self) -> &mut Xorshift64 {
self
}
}

impl From<u64> for Xorshift64 {
#[inline]
fn from(value: u64) -> Self {
Expand Down
23 changes: 0 additions & 23 deletions wtx/src/misc/stream/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,6 @@ impl StreamReader for TcpStream {
}
}

#[cfg(unix)]
impl StreamReader for tokio::net::UnixStream {
#[inline]
async fn read(&mut self, bytes: &mut [u8]) -> crate::Result<usize> {
Ok(<Self as AsyncReadExt>::read(self, bytes).await?)
}
}

impl StreamWriter for OwnedWriteHalf {
#[inline]
async fn write_all(&mut self, bytes: &[u8]) -> crate::Result<()> {
Expand Down Expand Up @@ -83,18 +75,3 @@ impl StreamWriter for TcpStream {
Ok(())
}
}

#[cfg(unix)]
impl StreamWriter for tokio::net::UnixStream {
#[inline]
async fn write_all(&mut self, bytes: &[u8]) -> crate::Result<()> {
<Self as AsyncWriteExt>::write_all(self, bytes).await?;
Ok(())
}

#[inline]
async fn write_all_vectored(&mut self, bytes: &[&[u8]]) -> crate::Result<()> {
_local_write_all_vectored!(bytes, self, |io_slices| self.write_vectored(io_slices).await);
Ok(())
}
}
Loading
Loading