Skip to content

Commit

Permalink
refactor: [#681] udp return errors instead of panicking
Browse files Browse the repository at this point in the history
  • Loading branch information
hungfnt committed May 2, 2024
1 parent 340e9e0 commit 144f875
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 100 deletions.
16 changes: 8 additions & 8 deletions src/console/clients/udp/checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl Client {
let binding_address = local_bind_to.parse().context("binding local address")?;

debug!("Binding to: {local_bind_to}");
let udp_client = UdpClient::bind(&local_bind_to).await;
let udp_client = UdpClient::bind(&local_bind_to).await?;

let bound_to = udp_client.socket.local_addr().context("bound local address")?;
debug!("Bound to: {bound_to}");
Expand All @@ -88,7 +88,7 @@ impl Client {

match &self.udp_tracker_client {
Some(client) => {
client.udp_client.connect(&tracker_socket_addr.to_string()).await;
client.udp_client.connect(&tracker_socket_addr.to_string()).await?;
self.remote_socket = Some(*tracker_socket_addr);
Ok(())
}
Expand Down Expand Up @@ -116,9 +116,9 @@ impl Client {

match &self.udp_tracker_client {
Some(client) => {
client.send(connect_request.into()).await;
client.send(connect_request.into()).await?;

let response = client.receive().await;
let response = client.receive().await?;

debug!("connection request response:\n{response:#?}");

Expand Down Expand Up @@ -163,9 +163,9 @@ impl Client {

match &self.udp_tracker_client {
Some(client) => {
client.send(announce_request.into()).await;
client.send(announce_request.into()).await?;

let response = client.receive().await;
let response = client.receive().await?;

debug!("announce request response:\n{response:#?}");

Expand Down Expand Up @@ -200,9 +200,9 @@ impl Client {

match &self.udp_tracker_client {
Some(client) => {
client.send(scrape_request.into()).await;
client.send(scrape_request.into()).await?;

let response = client.receive().await;
let response = client.receive().await?;

debug!("scrape request response:\n{response:#?}");

Expand Down
214 changes: 122 additions & 92 deletions src/shared/bit_torrent/tracker/udp/client.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
use core::result::Result::{Err, Ok};
use std::io::Cursor;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;

use crate::shared::bit_torrent::tracker::udp::{source_address, MAX_PACKET_SIZE};
use anyhow::anyhow;
use anyhow::{Context, Result};
use core::result::Result::{Ok, Err};
use anyhow::Error as AError;
use anyhow::{anyhow, Context, Result};
use aquatic_udp_protocol::{ConnectRequest, Request, Response, TransactionId};
use log::debug;
use tokio::net::UdpSocket;
use tokio::time;

use crate::shared::bit_torrent::tracker::udp::{source_address, MAX_PACKET_SIZE};

/// Default timeout for sending and receiving packets. And waiting for sockets
/// to be readable and writable.
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5);
Expand All @@ -28,107 +27,120 @@ pub struct UdpClient {
}

impl UdpClient {
/// # Panics
/// # Errors
///
/// Will panic if the local address can't be bound.
/// Will return error if the local address can't be bound.
///
pub async fn bind(local_address: &str) -> Result<Self> {
let socket_addr = local_address.parse::<SocketAddr>().map_err(|err| err).context("{local_address} is not a valid socket address")?;
let socket_addr = local_address
.parse::<SocketAddr>()
.context(format!("{local_address} is not a valid socket address"))?;

let socket = UdpSocket::bind(socket_addr).await?;

let udp_client = Self {
socket: Arc::new(socket),
timeout: DEFAULT_TIMEOUT,
};
Ok(udp_client)
}

/// # Panics
/// # Errors
///
/// Will panic if can't connect to the socket.
/// Will return error if can't connect to the socket.
pub async fn connect(&self, remote_address: &str) -> anyhow::Result<()> {
let socket_addr = remote_address.parse::<SocketAddr>().map_err(|err| err).context(format!("{} is not a valid socket address", remote_address))?;
self.socket.connect(socket_addr).await.map_err(|err| err)?;
Ok(())
let socket_addr = remote_address
.parse::<SocketAddr>()
.context(format!("{remote_address} is not a valid socket address"))?;

match self.socket.connect(socket_addr).await {
Ok(()) => {
debug!("Connected successfully");
Ok(())
}
Err(e) => Err(anyhow!("Failed to connect: {e:?}")),
}
}

/// # Panics
/// # Errors
///
/// Will panic if:
/// Will return error if:
///
/// - Can't write to the socket.
/// - Can't send data.
pub async fn send(&self, bytes: &[u8]) -> Result<usize, anyhow::Error> {
debug!(target: "UDP client", "sending {bytes:?} ...");

let _:Result<(), anyhow::Error> = match time::timeout(self.timeout, self.socket.writable()).await {
match time::timeout(self.timeout, self.socket.writable()).await {
Ok(writable_result) => {
let writable_result_status : Result<(), anyhow::Error> = match writable_result {
Ok(()) => Ok(()),
Err(e) => Err(anyhow!("IO error waiting for the socket to become readable: {e:?}"))
match writable_result {
Ok(()) => (),
Err(e) => return Err(anyhow!("IO error waiting for the socket to become readable: {e:?}")),
};
writable_result_status
}
Err(e) => Err(anyhow!("Timeout waiting for the socket to become readable: {e:?}"))
Err(e) => return Err(anyhow!("Timeout waiting for the socket to become readable: {e:?}")),
};

let send_status:Result<usize, anyhow::Error> = match time::timeout(self.timeout, self.socket.send(bytes)).await {
Ok(send_result) => {
let send_result_status: Result<usize, anyhow::Error> = match send_result {
Ok(size) => Ok(size),
Err(e) => Err(anyhow!("IO error waiting for the socket to become readable: {}", e))
};
send_result_status
}
Err(e) => Err(anyhow!("IO error waiting for the socket to become readable: {}", e))
};
send_status
match time::timeout(self.timeout, self.socket.send(bytes)).await {
Ok(send_result) => match send_result {
Ok(size) => Ok(size),
Err(e) => Err(anyhow!("IO error during send: {e:?}")),
},
Err(e) => Err(anyhow!("Send operation timed out: {e:?}")),
}
}

/// # Panics
/// # Errors
///
/// Will panic if:
/// Will return error if:
///
/// - Can't read from the socket.
/// - Can't receive data.
///
/// # Panics
///
pub async fn receive(&self, bytes: &mut [u8]) -> Result<usize> {
debug!(target: "UDP client", "receiving ...");

let _ :Result<(), anyhow::Error>= match time::timeout(self.timeout, self.socket.readable()).await {
match time::timeout(self.timeout, self.socket.readable()).await {
Ok(readable_result) => {
let readable_result_status = match readable_result {
Ok(()) => Ok(()),
Err(e) => Err(anyhow!("IO error waiting for the socket to become readable: {e:?}")),
match readable_result {
Ok(()) => (),
Err(e) => return Err(anyhow!("IO error waiting for the socket to become readable: {e:?}")),
};
readable_result_status
},
Err(e) => Err(anyhow!("Timeout waiting for the socket to become readable: {e:?}")),
}
Err(e) => return Err(anyhow!("Timeout waiting for the socket to become readable: {e:?}")),
};

let size: Result<usize, anyhow::Error> = match time::timeout(self.timeout, self.socket.recv(bytes)).await {
let size_result = match time::timeout(self.timeout, self.socket.recv(bytes)).await {
Ok(recv_result) => match recv_result {
Ok(size) => Ok(size),
Err(e) => Err(anyhow!("IO error during send: {e:?}")),
},
Err(e) => Err(anyhow!("Receive operation timed out: {e:?}")),
};

debug!(target: "UDP client", "{size} bytes received {bytes:?}");

size
if size_result.is_ok() {
let size = size_result.as_ref().unwrap();
debug!(target: "UDP client", "{size} bytes received {bytes:?}");
size_result
} else {
size_result
}
}

}

/// Creates a new `UdpClient` connected to a Udp server
///
/// # Errors
///
/// Will return any errors present in the call stack
///
pub async fn new_udp_client_connected(remote_address: &str) -> Result<UdpClient> {
let port = 0; // Let OS choose an unused port.
match UdpClient::bind(&source_address(port)).await {
Ok(client) => {
client.connect(remote_address).await;
Ok(client)
}
Err(err) => Err(err),
}
}
let client = UdpClient::bind(&source_address(port)).await?;
client.connect(remote_address).await?;
Ok(client)
}

#[allow(clippy::module_name_repetitions)]
Expand All @@ -138,85 +150,103 @@ pub struct UdpTrackerClient {
}

impl UdpTrackerClient {
/// # Panics
/// # Errors
///
/// Will panic if can't write request to bytes.
/// Will return error if can't write request to bytes.
pub async fn send(&self, request: Request) -> Result<usize> {
debug!(target: "UDP tracker client", "send request {request:?}");

// Write request into a buffer
let request_buffer = vec![0u8; MAX_PACKET_SIZE];
let mut cursor = Cursor::new(request_buffer);

let request_data = match request.write(&mut cursor) {
let request_data_result = match request.write(&mut cursor) {
Ok(()) => {
#[allow(clippy::cast_possible_truncation)]
let position = cursor.position() as usize;
let inner_request_buffer = cursor.get_ref();
// Return slice which contains written request data
&inner_request_buffer[..position]
Ok(&inner_request_buffer[..position])
}
Err(e) => Err(anyhow!("could not write request to bytes: {e}.")),
};

let request_data = request_data_result?;

self.udp_client.send(request_data).await
}

/// # Panics
/// # Errors
///
/// Will panic if can't create response from the received payload (bytes buffer).
pub async fn receive(&self) -> Response {
/// Will return error if can't create response from the received payload (bytes buffer).
pub async fn receive(&self) -> Result<Response> {
let mut response_buffer = [0u8; MAX_PACKET_SIZE];

let payload_size = self.udp_client.receive(&mut response_buffer).await;
let payload_size = self.udp_client.receive(&mut response_buffer).await?;

debug!(target: "UDP tracker client", "received {payload_size} bytes. Response {response_buffer:?}");

Response::from_bytes(&response_buffer[..payload_size], true).unwrap()
let response = Response::from_bytes(&response_buffer[..payload_size], true)?;

Ok(response)
}
}

/// Creates a new `UdpTrackerClient` connected to a Udp Tracker server
pub async fn new_udp_tracker_client_connected(remote_address: &str) -> UdpTrackerClient {
///
/// # Errors
///
/// Will return any errors present in the call stack
///
pub async fn new_udp_tracker_client_connected(remote_address: &str) -> Result<UdpTrackerClient> {
let udp_client = new_udp_client_connected(remote_address).await?;
UdpTrackerClient { udp_client.unwrap() }
let udp_tracker_client = UdpTrackerClient { udp_client };
Ok(udp_tracker_client)
}

/// Helper Function to Check if a UDP Service is Connectable
///
/// # Errors
/// # Panics
///
/// It will return an error if unable to connect to the UDP service.
///
/// # Panics
/// # Errors
///
pub async fn check(binding: &SocketAddr) -> Result<String, String> {
debug!("Checking Service (detail): {binding:?}.");

let client = new_udp_tracker_client_connected(binding.to_string().as_str()).await;

let connect_request = ConnectRequest {
transaction_id: TransactionId(123),
};

client.send(connect_request.into()).await;

let process = move |response| {
if matches!(response, Response::Connect(_connect_response)) {
Ok("Connected".to_string())
} else {
Error("Did not Connect".to_string())
}
};

let sleep = time::sleep(Duration::from_millis(2000));
tokio::pin!(sleep);

tokio::select! {
() = &mut sleep => {
Error("Timed Out".to_string())
}
response = client.receive() => {
process(response)
match new_udp_tracker_client_connected(binding.to_string().as_str()).await {
Ok(client) => {
let connect_request = ConnectRequest {
transaction_id: TransactionId(123),
};

// client.send() return usize, but doesn't use here
match client.send(connect_request.into()).await {
Ok(_) => (),
Err(e) => debug!("Error: {e:?}."),
};

let process = move |response| {
if matches!(response, Response::Connect(_connect_response)) {
Ok("Connected".to_string())
} else {
Err("Did not Connect".to_string())
}
};

let sleep = time::sleep(Duration::from_millis(2000));
tokio::pin!(sleep);

tokio::select! {
() = &mut sleep => {
Err("Timed Out".to_string())
}
response = client.receive() => {
process(response.unwrap())
}
}
}
Err(e) => Err(format!("{e:?}")),
}
}

0 comments on commit 144f875

Please sign in to comment.