From 92e2772ba312980ce764261d83358a744cd5693b Mon Sep 17 00:00:00 2001 From: Michael Telahun Date: Sat, 2 Dec 2023 07:48:30 +0300 Subject: [PATCH 1/4] Improve documentation - ensure the code snippet in README actually works - minor doc wording changes --- README.md | 19 ++++++++++++------- src/error.rs | 1 - src/relay/tcp.rs | 7 +++---- 3 files changed, 15 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index 1d88e2a..e475adf 100644 --- a/README.md +++ b/README.md @@ -10,9 +10,10 @@ of proxies are the resources available on the system on which it is run. This li for its runtime. A simple program to proxy web traffic to a server might look like this: ``` use std::error::Error; -use tokio::sync::broadcast; use relayport_rs::RelaySocket; use relayport_rs::command::RelayCommand; +use tokio::signal::unix::{signal, SignalKind}; +use tokio::sync::broadcast; #[tokio::main] pub async fn main() -> Result<(), Box> { @@ -21,21 +22,25 @@ pub async fn main() -> Result<(), Box> { // build a relay with a listener TCP socket let relay = RelaySocket::build() - .set_so_reuseaddr(true)? - .set_tcp_nodelay(true)? - .bind("0.0.0.0:8080)? + .set_so_reuseaddr(true) + .set_tcp_nodelay(true) + .bind("127.0.0.1:8080")? .listen()?; // spawn a task to handle the acceptance and dispatch of a relay connection let _ = tokio::task::spawn(async move { relay - .accept_and_relay("www.example.com:80", &rx) + .accept_and_relay("127.0.0.1:9090", &rx) .await .expect("failed to start relay") }); - // send the task a shutdown command so it exits cleanly - tx.send(RelayCommand::Shutdown)?; + // Wait for Ctrl-C to send the shutdown command + let mut sigint = signal(SignalKind::interrupt())?; + match sigint.recv().await { + Some(()) => { tx.send(RelayCommand::Shutdown)?; }, + None => {}, + } Ok(()) } diff --git a/src/error.rs b/src/error.rs index 0f77ca8..5250c57 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,7 +1,6 @@ //! Errors returned by the library use std::net::AddrParseError; - use tokio::sync::broadcast::error::RecvError; /// The methods of this library may return any of the errors defined here. diff --git a/src/relay/tcp.rs b/src/relay/tcp.rs index 31509db..e371651 100644 --- a/src/relay/tcp.rs +++ b/src/relay/tcp.rs @@ -1,4 +1,4 @@ -//! The abstractions that allow relaying of TCP communications +//! Abstractions that relay TCP communication use std::io::ErrorKind; use std::net::SocketAddr; @@ -18,7 +18,7 @@ const MTU: usize = 1518; #[derive(Debug)] pub struct RelaySocket {} -/// Builder abstraction for composing a relay. +/// Builder abstraction for composing a TCP relay. #[derive(Copy, Clone, Debug)] pub struct RelaySocketBuilder(RelayInner); @@ -286,7 +286,6 @@ impl RelayListener { /// .bind("127.0.0.1:10443")? /// .listen()?; /// - /// /// // spawn a task to handle the acceptance and dispatch of a relay /// let _ = tokio::task::spawn(async move { /// listener @@ -296,7 +295,7 @@ impl RelayListener { /// }); /// /// // Do other work - /// tokio::time::sleep(std::time::Duration::from_millis(120)); + /// tokio::time::sleep(std::time::Duration::from_secs(60)); /// /// // send the task a shutdown command so it exits cleanly /// tx.send(RelayCommand::Shutdown)?; From aafb309af545dd8a508f66a3dc44f54ee8e44a20 Mon Sep 17 00:00:00 2001 From: Michael Telahun Date: Tue, 5 Dec 2023 11:02:09 +0300 Subject: [PATCH 2/4] doc: make README code clearer --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index e475adf..16f1394 100644 --- a/README.md +++ b/README.md @@ -38,7 +38,7 @@ pub async fn main() -> Result<(), Box> { // Wait for Ctrl-C to send the shutdown command let mut sigint = signal(SignalKind::interrupt())?; match sigint.recv().await { - Some(()) => { tx.send(RelayCommand::Shutdown)?; }, + Some(()) => tx.send(RelayCommand::Shutdown)?, None => {}, } From 51c5a3e449149187a020c25999e89a01cb9887a7 Mon Sep 17 00:00:00 2001 From: Michael Telahun Date: Tue, 5 Dec 2023 11:02:25 +0300 Subject: [PATCH 3/4] Make the code faster and small bugfix o use tokio::io::copy instead of rolling our own version. This led to a marked performance improvement in throughput o when we receive a 0 byte read shutdown the other end of the connection. Otherwise we end up with both sides of the connection remaining open until the relay is shutdown. --- src/relay/tcp.rs | 42 ++++++++++++++++++------------------------ 1 file changed, 18 insertions(+), 24 deletions(-) diff --git a/src/relay/tcp.rs b/src/relay/tcp.rs index e371651..e21afc7 100644 --- a/src/relay/tcp.rs +++ b/src/relay/tcp.rs @@ -3,17 +3,15 @@ use std::io::ErrorKind; use std::net::SocketAddr; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::io::AsyncWriteExt; use tokio::net::tcp::{ReadHalf, WriteHalf}; use tokio::net::{TcpListener, TcpSocket, TcpStream}; use tokio::sync::broadcast::Receiver; -use tracing::{debug, trace}; +use tracing::debug; use crate::command::RelayCommand; use crate::RelayPortError; -const MTU: usize = 1518; - /// Abstraction for initiating the relay builder. #[derive(Debug)] pub struct RelaySocket {} @@ -320,7 +318,7 @@ impl RelayListener { result = cancel.recv() => {match result { Ok(cmd) => match cmd { RelayCommand::Shutdown => { - debug!("received shutdown relay command"); + debug!("received relay command: shutdown"); break }, }, @@ -377,37 +375,33 @@ async fn relay_inner( ) -> Result { let _from_addr = read_sock.peer_addr().unwrap(); let _to_addr = write_sock.peer_addr().unwrap(); - let mut buf = vec![0u8; MTU]; - let mut relay_bytes = 0; - loop { - let read_bytes; - tokio::select! { - biased; - result = read_sock.read(buf.as_mut()) => { + let copy_reader_to_writer = tokio::io::copy(read_sock, write_sock); + let await_shutdown = rx.recv(); + let mut read_bytes = Ok(0); + tokio::select! { + biased; + result = copy_reader_to_writer => { read_bytes = result.or_else(|e| match e.kind() { ErrorKind::ConnectionReset => Ok(0), _ => Err(e), }) } - result = rx.recv() => { match result { + result = await_shutdown => { match result { Ok(cmd) => match cmd { RelayCommand::Shutdown => { debug!("received shutdown relay command"); - break }, }, Err(e) => return Err(RelayPortError::InternalCommunicationError(e)), }} - } - match read_bytes { - Ok(bytes) => { - write_sock.write_all(&buf[0..bytes]).await?; - relay_bytes += bytes; - trace!(bytes, total_bytes = relay_bytes); - } - Err(e) => return Err(RelayPortError::IoError(e)), - } } - Ok(relay_bytes) + match read_bytes { + Ok(bytes) => { + debug!("Transferred {bytes} bytes"); + let _ = write_sock.shutdown().await; + Ok(bytes as usize) + } + Err(e) => Err(RelayPortError::IoError(e)), + } } From 1f2245bdac2f74139bb6cc10591d441c10f60558 Mon Sep 17 00:00:00 2001 From: Michael Telahun Date: Tue, 5 Dec 2023 11:15:04 +0300 Subject: [PATCH 4/4] Update GH workflow - remove database related actions - pin tarpaulin to ver. 0.22.0 --- .github/workflows/rust.yml | 43 +------------------------------------- 1 file changed, 1 insertion(+), 42 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 99d834d..909233b 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -14,26 +14,6 @@ jobs: test: name: Test runs-on: ubuntu-latest - services: - postgres: - image: postgres:12 - env: - POSTGRES_USER: postgres - POSTGRES_PASSWORD: password - ports: - - 5432:5432 - mailhog: - image: mailhog/mailhog:v1.0.1 - ports: - - 1025:1025 - - 8025:8025 - redis: - image: redis:6 - ports: - - 6379:6379 - env: - SQLX_VERSION: 0.6.2 - SQLX_FEATURES: "rustls,postgres" steps: - name: Checkout repository uses: actions/checkout@v2 @@ -53,10 +33,6 @@ jobs: toolchain: stable override: true - - name: Migrate database - run: | - echo sudo apt-get install libpq-dev -y - echo SKIP_DOCKER=true ./scripts/init_db.sh - name: Run cargo test uses: actions-rs/cargo@v1 with: @@ -95,24 +71,6 @@ jobs: coverage: name: Code coverage runs-on: ubuntu-latest - services: - postgres: - image: postgres:12 - env: - POSTGRES_USER: postgres - POSTGRES_PASSWORD: password - ports: - - 5432:5432 - mailhog: - image: mailhog/mailhog:v1.0.1 - ports: - - 1025:1025 - - 8025:8025 - redis: - image: redis:7 - ports: - - 6379:6379 - steps: - name: Checkout repository uses: actions/checkout@v2 @@ -126,6 +84,7 @@ jobs: - name: Run cargo-tarpaulin uses: actions-rs/tarpaulin@v0.1 with: + version: 0.22.0 args: '--ignore-tests' - name: Upload to codecov.io