Skip to content

Commit

Permalink
Shared context replacing existing passing of multiple values
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobtread committed Dec 31, 2023
1 parent 4efcac7 commit dba6ee6
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 105 deletions.
14 changes: 7 additions & 7 deletions src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use log::error;
use reqwest::{Client, Identity, Upgraded};
use semver::Version;
use serde::{Deserialize, Serialize};
use std::{path::Path, str::FromStr, sync::Arc};
use std::{path::Path, str::FromStr};
use thiserror::Error;
use url::Url;

Expand Down Expand Up @@ -103,11 +103,11 @@ struct ServerDetails {
#[derive(Debug, Clone)]
pub struct LookupData {
/// Server url
pub url: Arc<Url>,
pub url: Url,
/// The server version
pub version: Version,
/// Association token if the server supports providing one
pub association: Arc<Option<String>>,
pub association: Option<String>,
}

/// Errors that can occur while looking up a server
Expand Down Expand Up @@ -226,9 +226,9 @@ pub async fn lookup_server(
}

Ok(LookupData {
url: Arc::new(url),
url,
version: details.version,
association: Arc::new(details.association),
association: details.association,
})
}

Expand All @@ -254,7 +254,7 @@ pub enum ServerStreamError {
/// * `base_url` - The server base URL (Connection URL)
/// * `association` - Optional client association token
pub async fn create_server_stream(
http_client: reqwest::Client,
http_client: &reqwest::Client,
base_url: &Url,
association: Option<&String>,
) -> Result<Upgraded, ServerStreamError> {
Expand Down Expand Up @@ -398,7 +398,7 @@ pub async fn proxy_http_request(
/// * `base_url` - The server base URL (Connection URL)
/// * `association` - Association token
pub async fn create_server_tunnel(
http_client: reqwest::Client,
http_client: &reqwest::Client,
base_url: &Url,
association: &str,
) -> Result<Upgraded, ServerStreamError> {
Expand Down
14 changes: 14 additions & 0 deletions src/ctx.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
//! Shared context state that the app should store and pass to the
//! various servers when they are started
use url::Url;

/// Shared context
pub struct ClientContext {
/// HTTP client for the client to make requests with
pub http_client: reqwest::Client,
/// Base URL of the connected server
pub base_url: Url,
/// Optional association token
pub association: Option<String>,
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub use semver::Version;
pub use url::Url;

pub mod api;
pub mod ctx;
pub mod fire;
pub mod servers;
pub mod update;
Expand Down
52 changes: 19 additions & 33 deletions src/servers/blaze.rs
Original file line number Diff line number Diff line change
@@ -1,66 +1,52 @@
//! Server connected to by BlazeSDK clients (Majority of the game traffic)
use super::{spawn_server_task, BLAZE_PORT};
use crate::api::create_server_stream;
use crate::{api::create_server_stream, ctx::ClientContext};
use log::{debug, error};
use std::{net::Ipv4Addr, sync::Arc};
use tokio::{
io::copy_bidirectional,
net::{TcpListener, TcpStream},
};
use url::Url;

/// Starts the blaze server
///
/// ## Arguments
/// * `http_client` - The HTTP client passed around for connection upgrades
/// * `base_url` - The server base URL to connect clients to
/// * `association` - Optional client association
pub async fn start_blaze_server(
http_client: reqwest::Client,
base_url: Arc<Url>,
association: Arc<Option<String>>,
) -> std::io::Result<()> {
/// * `ctx` - The client context
pub async fn start_blaze_server(ctx: Arc<ClientContext>) -> std::io::Result<()> {
// Bind the local socket for accepting connections
let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, BLAZE_PORT)).await?;

// Accept connections
loop {
let (client_stream, _) = listener.accept().await?;

spawn_server_task(handle(
client_stream,
http_client.clone(),
base_url.clone(),
association.clone(),
));
spawn_server_task(handle(client_stream, ctx.clone()));
}
}

/// Handler for processing BlazeSDK client connections
///
/// ## Arguments
/// * `client_stream` - The client stream to read and write from
/// * `http_client` - The HTTP client passed around for connection upgrades
/// * `base_url` - The server base URL to connect clients to
/// * `association` - Client association token if supported
async fn handle(
mut client_stream: TcpStream,
http_client: reqwest::Client,
base_url: Arc<Url>,
association: Arc<Option<String>>,
) {
/// * `ctx` - The client context
async fn handle(mut client_stream: TcpStream, ctx: Arc<ClientContext>) {
debug!("Starting blaze connection");

// Create a stream to the Pocket Relay server
let mut server_stream =
match create_server_stream(http_client, &base_url, Option::as_ref(&association)).await {
Ok(stream) => stream,
Err(err) => {
error!("Failed to create server stream: {}", err);
return;
}
};
let mut server_stream = match create_server_stream(
&ctx.http_client,
&ctx.base_url,
Option::as_ref(&ctx.association),
)
.await
{
Ok(stream) => stream,
Err(err) => {
error!("Failed to create server stream: {}", err);
return;
}
};

debug!("Blaze connection linked");

Expand Down
30 changes: 10 additions & 20 deletions src/servers/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//! is only capable of communicating over SSLv3
use super::HTTP_PORT;
use crate::api::proxy_http_request;
use crate::{api::proxy_http_request, ctx::ClientContext};
use hyper::{
http::uri::PathAndQuery,
service::{make_service_fn, service_fn},
Expand All @@ -16,30 +16,22 @@ use std::{
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
sync::Arc,
};
use url::Url;

/// Starts the HTTP proxy server
///
/// ## Arguments
/// * `http_client` - The HTTP client passed around for sending the requests
/// * `base_url` - The server base URL to proxy requests to
pub async fn start_http_server(
http_client: reqwest::Client,
base_url: Arc<Url>,
) -> std::io::Result<()> {
/// * `ctx` - The client context
pub async fn start_http_server(ctx: Arc<ClientContext>) -> std::io::Result<()> {
// Create the socket address the server will bind too
let addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, HTTP_PORT));

// Create service that uses the `handle function`
let make_svc = make_service_fn(move |_conn| {
let http_client = http_client.clone();
let base_url = base_url.clone();
let ctx = ctx.clone();

async move {
// service_fn converts our function into a `Service`
Ok::<_, Infallible>(service_fn(move |request| {
handle(request, http_client.clone(), base_url.clone())
}))
Ok::<_, Infallible>(service_fn(move |request| handle(request, ctx.clone())))
}
});

Expand All @@ -54,13 +46,11 @@ pub async fn start_http_server(
/// to the Pocket Relay server
///
/// ## Arguments
/// * `request` - The HTTP request
/// * `http_client` - The HTTP client to proxy the request with
/// * `base_url` - The server base URL (Connection URL)
/// * `request` - The HTTP request
/// * `ctx` - The client context
async fn handle(
request: Request<Body>,
http_client: reqwest::Client,
base_url: Arc<Url>,
ctx: Arc<ClientContext>,
) -> Result<Response<Body>, Infallible> {
let path_and_query = request
.uri()
Expand All @@ -75,7 +65,7 @@ async fn handle(
let path_and_query = path_and_query.strip_prefix('/').unwrap_or(path_and_query);

// Create the new url from the path
let url = match base_url.join(path_and_query) {
let url = match ctx.base_url.join(path_and_query) {
Ok(value) => value,
Err(err) => {
error!("Failed to create HTTP proxy URL: {}", err);
Expand All @@ -87,7 +77,7 @@ async fn handle(
};

// Proxy the request to the server
let response = match proxy_http_request(&http_client, url).await {
let response = match proxy_http_request(&ctx.http_client, url).await {
Ok(value) => value,
Err(err) => {
error!("Failed to proxy HTTP request: {}", err);
Expand Down
26 changes: 14 additions & 12 deletions src/servers/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,41 @@
//! forwarding them to the connect Pocket Relay server
use super::{spawn_server_task, TELEMETRY_PORT};
use crate::api::{publish_telemetry_event, TelemetryEvent};
use crate::{
api::{publish_telemetry_event, TelemetryEvent},
ctx::ClientContext,
};
use log::error;
use std::{net::Ipv4Addr, sync::Arc};
use tokio::{
io::AsyncReadExt,
net::{TcpListener, TcpStream},
};
use url::Url;

/// Starts the telemetry server
///
/// ## Arguments
/// * `http_client` - The HTTP client used to forward messages
/// * `base_url` - The server base URL to connect clients to
pub async fn start_telemetry_server(
http_client: reqwest::Client,
base_url: Arc<Url>,
) -> std::io::Result<()> {
/// * `ctx` - The client context
pub async fn start_telemetry_server(ctx: Arc<ClientContext>) -> std::io::Result<()> {
// Bind the local socket for accepting connections
let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, TELEMETRY_PORT)).await?;

// Accept connections
loop {
let (client_stream, _) = listener.accept().await?;

spawn_server_task(handle(client_stream, http_client.clone(), base_url.clone()));
spawn_server_task(handle(client_stream, ctx.clone()));
}
}

/// Handler for processing telemetry client connections
async fn handle(mut client_stream: TcpStream, http_client: reqwest::Client, base_url: Arc<Url>) {
while let Ok(event) = read_telemetry_event(&mut client_stream).await {
if let Err(err) = publish_telemetry_event(&http_client, &base_url, event).await {
///
/// ## Arguments
/// * `stream` - The stream to decode from
/// * `ctx` - The client context
async fn handle(mut stream: TcpStream, ctx: Arc<ClientContext>) {
while let Ok(event) = read_telemetry_event(&mut stream).await {
if let Err(err) = publish_telemetry_event(&ctx.http_client, &ctx.base_url, event).await {
error!("Failed to publish telemetry event: {}", err);
}
}
Expand Down
55 changes: 22 additions & 33 deletions src/servers/tunnel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use self::codec::{TunnelCodec, TunnelMessage};
use crate::{
api::create_server_tunnel,
ctx::ClientContext,
servers::{spawn_server_task, GAME_HOST_PORT, RANDOM_PORT, TUNNEL_HOST_PORT},
};
use bytes::Bytes;
Expand All @@ -26,7 +27,6 @@ use std::{
};
use tokio::{io::ReadBuf, net::UdpSocket, sync::mpsc, try_join};
use tokio_util::codec::Framed;
use url::Url;

/// The fixed size of socket pool to use
const SOCKET_POOL_SIZE: usize = 4;
Expand All @@ -41,15 +41,9 @@ static LOCAL_SEND_TARGET: SocketAddr =
/// connection to the server
///
/// ## Arguments
/// * `http_client` - The HTTP client passed around for connection upgrades
/// * `base_url` - The server base URL to connect clients to
/// * `association` - Optional client association
pub async fn start_tunnel_server(
http_client: reqwest::Client,
base_url: Arc<Url>,
association: Arc<Option<String>>,
) -> std::io::Result<()> {
let association = match Option::as_ref(&association) {
/// * `ctx` - The client context
pub async fn start_tunnel_server(ctx: Arc<ClientContext>) -> std::io::Result<()> {
let association = match Option::as_ref(&ctx.association) {
Some(value) => value,
// Don't try and tunnel without a token
None => return Ok(()),
Expand All @@ -63,25 +57,24 @@ pub async fn start_tunnel_server(
// Looping to attempt reconnecting if lost
while attempt_errors < MAX_ERROR_ATTEMPTS {
// Create the tunnel (Future will end if tunnel stopped)
let reconnect_time =
if let Err(err) = create_tunnel(http_client.clone(), &base_url, association).await {
error!("Failed to create tunnel: {}", err);
let reconnect_time = if let Err(err) = create_tunnel(ctx.clone(), association).await {
error!("Failed to create tunnel: {}", err);

// Set last error
last_error = Some(err);
// Set last error
last_error = Some(err);

// Increase error attempts
attempt_errors += 1;
// Increase error attempts
attempt_errors += 1;

// Error should be delayed by the number of errors already hit
Duration::from_millis(1000 * attempt_errors as u64)
} else {
// Reset error attempts
attempt_errors = 0;
// Error should be delayed by the number of errors already hit
Duration::from_millis(1000 * attempt_errors as u64)
} else {
// Reset error attempts
attempt_errors = 0;

// Non errored reconnect can be quick
Duration::from_millis(1000)
};
// Non errored reconnect can be quick
Duration::from_millis(1000)
};

debug!(
"Next tunnel create attempt in: {}s",
Expand All @@ -101,15 +94,11 @@ pub async fn start_tunnel_server(
/// Creates a new tunnel
///
/// ## Arguments
/// * `http_client` - The HTTP client passed around for connection upgrades
/// * `base_url` - The server base URL to connect clients to
async fn create_tunnel(
http_client: reqwest::Client,
base_url: &Url,
association: &str,
) -> std::io::Result<()> {
/// * `ctx` - The client context
/// * `association` - The client association token
async fn create_tunnel(ctx: Arc<ClientContext>, association: &str) -> std::io::Result<()> {
// Create the tunnel with the server
let io = create_server_tunnel(http_client, base_url, association)
let io = create_server_tunnel(&ctx.http_client, &ctx.base_url, association)
.await
// Wrap the tunnel with the [`TunnelCodec`] framing
.map(|io| Framed::new(io, TunnelCodec::default()))
Expand Down

0 comments on commit dba6ee6

Please sign in to comment.