Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix documentation and also performance improvements #2

Merged
merged 4 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 1 addition & 42 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,6 @@ jobs:
test:
name: Test
runs-on: ubuntu-latest
services:
postgres:
image: postgres:12
env:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: password
ports:
- 5432:5432
mailhog:
image: mailhog/mailhog:v1.0.1
ports:
- 1025:1025
- 8025:8025
redis:
image: redis:6
ports:
- 6379:6379
env:
SQLX_VERSION: 0.6.2
SQLX_FEATURES: "rustls,postgres"
steps:
- name: Checkout repository
uses: actions/checkout@v2
Expand All @@ -53,10 +33,6 @@ jobs:
toolchain: stable
override: true

- name: Migrate database
run: |
echo sudo apt-get install libpq-dev -y
echo SKIP_DOCKER=true ./scripts/init_db.sh
- name: Run cargo test
uses: actions-rs/cargo@v1
with:
Expand Down Expand Up @@ -95,24 +71,6 @@ jobs:
coverage:
name: Code coverage
runs-on: ubuntu-latest
services:
postgres:
image: postgres:12
env:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: password
ports:
- 5432:5432
mailhog:
image: mailhog/mailhog:v1.0.1
ports:
- 1025:1025
- 8025:8025
redis:
image: redis:7
ports:
- 6379:6379

steps:
- name: Checkout repository
uses: actions/checkout@v2
Expand All @@ -126,6 +84,7 @@ jobs:
- name: Run cargo-tarpaulin
uses: actions-rs/[email protected]
with:
version: 0.22.0
args: '--ignore-tests'

- name: Upload to codecov.io
Expand Down
19 changes: 12 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ of proxies are the resources available on the system on which it is run. This li
for its runtime. A simple program to proxy web traffic to a server might look like this:
```
use std::error::Error;
use tokio::sync::broadcast;
use relayport_rs::RelaySocket;
use relayport_rs::command::RelayCommand;
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::broadcast;

#[tokio::main]
pub async fn main() -> Result<(), Box<dyn Error>> {
Expand All @@ -21,21 +22,25 @@ pub async fn main() -> Result<(), Box<dyn Error>> {

// build a relay with a listener TCP socket
let relay = RelaySocket::build()
.set_so_reuseaddr(true)?
.set_tcp_nodelay(true)?
.bind("0.0.0.0:8080)?
.set_so_reuseaddr(true)
.set_tcp_nodelay(true)
.bind("127.0.0.1:8080")?
.listen()?;

// spawn a task to handle the acceptance and dispatch of a relay connection
let _ = tokio::task::spawn(async move {
relay
.accept_and_relay("www.example.com:80", &rx)
.accept_and_relay("127.0.0.1:9090", &rx)
.await
.expect("failed to start relay")
});

// send the task a shutdown command so it exits cleanly
tx.send(RelayCommand::Shutdown)?;
// Wait for Ctrl-C to send the shutdown command
let mut sigint = signal(SignalKind::interrupt())?;
match sigint.recv().await {
Some(()) => tx.send(RelayCommand::Shutdown)?,
None => {},
}

Ok(())
}
Expand Down
1 change: 0 additions & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! Errors returned by the library

use std::net::AddrParseError;

use tokio::sync::broadcast::error::RecvError;

/// The methods of this library may return any of the errors defined here.
Expand Down
49 changes: 21 additions & 28 deletions src/relay/tcp.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,22 @@
//! The abstractions that allow relaying of TCP communications
//! Abstractions that relay TCP communication

use std::io::ErrorKind;
use std::net::SocketAddr;

use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::io::AsyncWriteExt;
use tokio::net::tcp::{ReadHalf, WriteHalf};
use tokio::net::{TcpListener, TcpSocket, TcpStream};
use tokio::sync::broadcast::Receiver;
use tracing::{debug, trace};
use tracing::debug;

use crate::command::RelayCommand;
use crate::RelayPortError;

const MTU: usize = 1518;

/// Abstraction for initiating the relay builder.
#[derive(Debug)]
pub struct RelaySocket {}

/// Builder abstraction for composing a relay.
/// Builder abstraction for composing a TCP relay.
#[derive(Copy, Clone, Debug)]
pub struct RelaySocketBuilder(RelayInner);

Expand Down Expand Up @@ -286,7 +284,6 @@ impl RelayListener {
/// .bind("127.0.0.1:10443")?
/// .listen()?;
///
///
/// // spawn a task to handle the acceptance and dispatch of a relay
/// let _ = tokio::task::spawn(async move {
/// listener
Expand All @@ -296,7 +293,7 @@ impl RelayListener {
/// });
///
/// // Do other work
/// tokio::time::sleep(std::time::Duration::from_millis(120));
/// tokio::time::sleep(std::time::Duration::from_secs(60));
///
/// // send the task a shutdown command so it exits cleanly
/// tx.send(RelayCommand::Shutdown)?;
Expand All @@ -321,7 +318,7 @@ impl RelayListener {
result = cancel.recv() => {match result {
Ok(cmd) => match cmd {
RelayCommand::Shutdown => {
debug!("received shutdown relay command");
debug!("received relay command: shutdown");
break
},
},
Expand Down Expand Up @@ -378,37 +375,33 @@ async fn relay_inner(
) -> Result<usize, RelayPortError> {
let _from_addr = read_sock.peer_addr().unwrap();
let _to_addr = write_sock.peer_addr().unwrap();
let mut buf = vec![0u8; MTU];
let mut relay_bytes = 0;
loop {
let read_bytes;
tokio::select! {
biased;
result = read_sock.read(buf.as_mut()) => {
let copy_reader_to_writer = tokio::io::copy(read_sock, write_sock);
let await_shutdown = rx.recv();
let mut read_bytes = Ok(0);
tokio::select! {
biased;
result = copy_reader_to_writer => {
read_bytes = result.or_else(|e| match e.kind() {
ErrorKind::ConnectionReset => Ok(0),
_ => Err(e),
})
}
result = rx.recv() => { match result {
result = await_shutdown => { match result {
Ok(cmd) => match cmd {
RelayCommand::Shutdown => {
debug!("received shutdown relay command");
break
},
},
Err(e) => return Err(RelayPortError::InternalCommunicationError(e)),
}}
}
match read_bytes {
Ok(bytes) => {
write_sock.write_all(&buf[0..bytes]).await?;
relay_bytes += bytes;
trace!(bytes, total_bytes = relay_bytes);
}
Err(e) => return Err(RelayPortError::IoError(e)),
}
}

Ok(relay_bytes)
match read_bytes {
Ok(bytes) => {
debug!("Transferred {bytes} bytes");
let _ = write_sock.shutdown().await;
Ok(bytes as usize)
}
Err(e) => Err(RelayPortError::IoError(e)),
}
}
Loading