diff --git a/Cargo.lock b/Cargo.lock index 82b3f85..f6b1fd2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1199,6 +1199,7 @@ dependencies = [ "rand", "serde", "serde_json", + "time", "tokio", "tokio-tungstenite", "tracing", @@ -1210,6 +1211,7 @@ name = "bevygap_matchmaker_httpd" version = "0.1.0" dependencies = [ "anyhow", + "async-nats", "axum", "bevygap_shared", "clap", diff --git a/Cargo.toml b/Cargo.toml index bad15c3..153187e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,7 +54,7 @@ tokio-tungstenite = "0.23" clap = { version = "4.4", features = ["derive"] } # same LY ver as game: lightyear = {git = "https://github.com/cBournhonesque/lightyear.git", rev = "0b7a4852fbb2eb9848d2fbc04dd4eea70d51ea98", default-features = false} - +time = {version = "0.3.36", features = ["std"]} base64 = "0.22" # edgegap = {path = "./edgegap-client"} url = "^2.5" diff --git a/README.md b/README.md index 5086d98..5500e6e 100644 --- a/README.md +++ b/README.md @@ -159,4 +159,8 @@ Access to fetch at 'http://127.0.0.1:3000/wannaplay' from origin 'http://lan-mac ## TODO The client needs to be configured with the path to the matchmaker_httpd endpoint. -Possibly defaulting to /matchmaker/* on current domain? \ No newline at end of file +Possibly defaulting to /matchmaker/* on current domain? + +Need to guarantee no leaked sessions. +If MM gives session and player connects, server will delete session on player disconnect. +if they fail to connect to server after getting a session, session leaks atm. \ No newline at end of file diff --git a/bevygap_matchmaker/Cargo.toml b/bevygap_matchmaker/Cargo.toml index 8a52d66..1b0b7c0 100644 --- a/bevygap_matchmaker/Cargo.toml +++ b/bevygap_matchmaker/Cargo.toml @@ -11,7 +11,7 @@ futures.workspace = true futures-util.workspace = true tokio.workspace = true tokio-tungstenite.workspace = true - +time.workspace = true edgegap = {path = "../edgegap-client"} serde.workspace = true diff --git a/bevygap_matchmaker/src/main.rs b/bevygap_matchmaker/src/main.rs index 1afa7cb..d9fc664 100644 --- a/bevygap_matchmaker/src/main.rs +++ b/bevygap_matchmaker/src/main.rs @@ -1,6 +1,7 @@ // use std::time::Duration; -use async_nats::jetstream; +// use async_nats::jetstream; +// use async_nats::jetstream::stream::Stream; // use async_nats::jetstream::stream::StorageType; use async_nats::Client; use clap::Parser; @@ -10,13 +11,18 @@ use edgegap::apis::configuration::*; use futures::stream::StreamExt; use lightyear::connection::netcode::PRIVATE_KEY_BYTES; use log::*; -use session_service::session_cleanup_supervisor; use tracing_subscriber::{layer::*, util::*}; use bevygap_shared::*; +mod session_delete_worker; +mod session_reaper; mod session_service; +use session_delete_worker::*; +use session_reaper::*; +use session_service::*; + fn edgegap_configuration(_settings: &Settings) -> Configuration { let key = std::env::var("EDGEGAP_API_KEY").expect("EDGEGAP_API_KEY environment variable is not set"); @@ -47,7 +53,7 @@ pub struct Settings { } impl Settings { - pub fn private_key_bytes(&self) -> [u8; PRIVATE_KEY_BYTES] { + fn parse_private_key(&self) -> [u8; PRIVATE_KEY_BYTES] { if self.lightyear_private_key.is_empty() { return [0u8; PRIVATE_KEY_BYTES]; } @@ -99,6 +105,7 @@ pub(crate) struct MatchmakerState { nats: BevygapNats, api_config: Configuration, settings: Settings, + lypkey: [u8; PRIVATE_KEY_BYTES], } impl MatchmakerState { @@ -108,17 +115,8 @@ impl MatchmakerState { pub(crate) fn configuration(&self) -> &Configuration { &self.api_config } - pub(crate) fn kv_s2c(&self) -> &jetstream::kv::Store { - self.nats.kv_s2c() - } - pub(crate) fn kv_c2s(&self) -> &jetstream::kv::Store { - self.nats.kv_c2s() - } - pub(crate) fn kv_sessions(&self) -> &jetstream::kv::Store { - self.nats.kv_sessions() - } - pub(crate) fn kv_cert_digests(&self) -> &jetstream::kv::Store { - self.nats.kv_cert_digests() + pub(crate) fn lightyear_private_key(&self) -> [u8; PRIVATE_KEY_BYTES] { + self.lypkey } } @@ -127,23 +125,23 @@ async fn main() -> Result<(), async_nats::Error> { setup_logging(); info!("Starting Edgegap Matchmaker"); let bgnats = BevygapNats::new_and_connect("matchmaker").await.unwrap(); - let settings = Settings::parse(); - // info!("priv key bytes: {:?}", settings.private_key_bytes()); - + let lypkey = settings.parse_private_key(); let api_config = edgegap_configuration(&settings); - let mm_state = MatchmakerState { nats: bgnats, api_config, settings, + lypkey, }; // ensure the specified app, version, and deployment are valid and ready for players. verify_application(&mm_state).await?; let state = mm_state.clone(); - let _watcher = tokio::spawn(async move { session_cleanup_supervisor(&state).await }); + let _a = tokio::spawn(async move { session_cleanup_supervisor(&state).await }); + let state = mm_state.clone(); + let _b = tokio::spawn(async move { delete_session_worker_supervisor(&state).await }); let state = mm_state.clone(); let _watcher = tokio::spawn(async move { @@ -155,7 +153,7 @@ async fn main() -> Result<(), async_nats::Error> { let state = mm_state.clone(); let session_service = tokio::spawn(async move { - match session_service::session_request_supervisor(&state).await { + match session_request_supervisor(&state).await { Ok(_) => info!("Session service completed"), Err(e) => error!("Error in session service: {}", e), } @@ -195,13 +193,13 @@ async fn verify_application(state: &MatchmakerState) -> Result<(), async_nats::E info!("🟢 Application version '{}' is active.", app_version.name); } else { error!( - "🔴 Application version '{}' is not active, aborting.", + "🔴 Application version '{}' is not active, won't be able to create sessions.", app_version.name ); - std::process::exit(1); + // std::process::exit(1); } - info!("✅ {} @ {}", settings.app_name, settings.app_version); + // info!("✅ {} @ {}", settings.app_name, settings.app_version); Ok(()) } diff --git a/bevygap_matchmaker/src/session_delete_worker.rs b/bevygap_matchmaker/src/session_delete_worker.rs new file mode 100644 index 0000000..e9032b7 --- /dev/null +++ b/bevygap_matchmaker/src/session_delete_worker.rs @@ -0,0 +1,73 @@ +use crate::MatchmakerState; +use async_nats::jetstream::{self}; +use edgegap::apis::sessions_api::*; +use futures::StreamExt; +use log::*; + +// need an erlang/OTP like supervision tree! +pub async fn delete_session_worker_supervisor( + state: &MatchmakerState, +) -> Result<(), async_nats::Error> { + loop { + let state = state.clone(); + let handle = tokio::spawn(async move { + let res = delete_session_worker(&state).await; + if let Err(e) = res { + error!("delete_session_worker error: {e:?}"); + } + }); + futures::future::join_all([handle]).await; + warn!("delete_session_worker exited, restarting after timeout"); + tokio::time::sleep(std::time::Duration::from_secs(30)).await; + } + Ok(()) +} + +async fn delete_session_worker(state: &MatchmakerState) -> Result<(), async_nats::Error> { + let stream = state.nats.delete_session_stream(); + let consumer = stream + .create_consumer(jetstream::consumer::pull::Config { + durable_name: Some("api-deleter-1".to_string()), + description: Some("Calls edgegap session delete api".to_string()), + ack_policy: jetstream::consumer::AckPolicy::Explicit, + ..Default::default() + }) + .await?; + + loop { + let mut messages = consumer.fetch().max_messages(100).messages().await?; + while let Some(Ok(message)) = messages.next().await { + let session_id = String::from_utf8(message.payload.to_vec())?; + match session_delete(state.configuration(), session_id.as_str()).await { + Ok(session_delete_response) => { + info!("session_delete ok: {:?}", session_delete_response); + message.ack().await?; + } + Err(edgegap::apis::Error::ResponseError(resp_content)) => { + match resp_content.status.as_u16() { + 404 => { + // session already deleted or never existed. + warn!("session_delete 404: {session_id}"); + message.ack().await?; + } + 410 => { + // "instance already terminated" + warn!("session_delete 410 'instance already terminated': {session_id}"); + message.ack().await?; + } + code => { + error!("session_delete error status = {code} for {session_id} {resp_content:?}"); + } + } + } + Err(e) => { + // TODO What to do about junk data on queue that can never be deleted? + error!("unhandled session_delete error {session_id}: {e:?}"); + } + } + } + tokio::time::sleep(std::time::Duration::from_millis(5000)).await; + } + + Ok(()) +} diff --git a/bevygap_matchmaker/src/session_reaper.rs b/bevygap_matchmaker/src/session_reaper.rs new file mode 100644 index 0000000..1a7d762 --- /dev/null +++ b/bevygap_matchmaker/src/session_reaper.rs @@ -0,0 +1,96 @@ +/// Detects orphaned edgegap sessions and schedules them for deletion by the API +/// Actual API-delete call happens in the session_delete_worker. +use crate::MatchmakerState; +use ::time::OffsetDateTime; +use async_nats::jetstream::kv::Operation; +use futures::{StreamExt, TryStreamExt}; +use log::*; +use tokio::time::{self, Duration}; + +pub(crate) async fn session_cleanup_supervisor( + orig_state: &MatchmakerState, +) -> Result<(), async_nats::Error> { + let state = orig_state.clone(); + let handle1 = tokio::spawn(async move { + loop { + let _ = session_cleanup_watcher(&state).await; + error!("session_cleanup_watcher exited, restarting"); + } + }); + let state = orig_state.clone(); + let handle2 = tokio::spawn(async move { + loop { + let _ = unclaimed_session_reaper(&state).await; + error!("unclaimed_session_reaper exited, restarting"); + } + }); + futures::future::join_all([handle1, handle2]).await; + Ok(()) +} + +/// Get all the session keys in unclaimed sessions - if any are older than 30 seconds, +/// enqueue them for deletion. +/// Session ids must be removed from unclaimed_sessions once a gameserver connection happens. +async fn unclaimed_session_reaper(state: &MatchmakerState) -> Result<(), async_nats::Error> { + // how often to check for orphaned sessions: + let mut interval = time::interval(Duration::from_millis(5000)); + let kv = state.nats.kv_unclaimed_sessions(); + loop { + interval.tick().await; + let mut keys = kv.keys().await?.boxed(); + while let Some(key) = keys.try_next().await? { + let Ok(Some(entry)) = kv.entry(&key).await else { + continue; + }; + let session_id = String::from_utf8(entry.value.to_vec()) + .expect("Failed to convert session_id to string"); + let age = OffsetDateTime::now_utc() - entry.created; + info!("* Session {session_id} is {age} old"); + if age > Duration::from_secs(30) { + warn!("Unclaimed session {session_id} is older than 30 seconds = {age}"); + // write to delete_sessions work queue and remove from unclaimed_sessions KV + state + .nats + .enqueue_session_delete(session_id.clone()) + .await?; + kv.delete(&key).await?; + } + } + } + Ok(()) +} + +/// Deletes sessions once a gameserver removes the active_sessions KV entry. +/// this is the happy path, where there were no orphans.. +async fn session_cleanup_watcher(state: &MatchmakerState) -> Result<(), async_nats::Error> { + let kv = state.nats.kv_active_connections(); + let mut watcher = kv.watch(">").await?; + while let Some(event) = watcher.next().await { + info!("{event:?}"); + match event { + Ok(event) => { + let session_id = event.key; + if event.operation == Operation::Delete { + info!("active_connection deleted, deleting session {session_id}",); + state + .nats + .enqueue_session_delete(session_id.clone()) + .await?; + } + if event.operation == Operation::Put { + info!("New Session put {session_id}, deleting from unclaimed_sessions "); + // delete this session_id from unclaimed_sessions. + let _ = state + .nats + .kv_unclaimed_sessions() + .delete(session_id.as_str()) + .await; + } + } + Err(e) => { + warn!("KV event error watching for session cleanup: {:?}", e); + } + } + } + Ok(()) +} diff --git a/bevygap_matchmaker/src/session_service.rs b/bevygap_matchmaker/src/session_service.rs index 391e756..c12bcde 100644 --- a/bevygap_matchmaker/src/session_service.rs +++ b/bevygap_matchmaker/src/session_service.rs @@ -1,14 +1,12 @@ -use std::net::SocketAddr; - use crate::MatchmakerState; -use async_nats::{jetstream::kv::Operation, service::ServiceExt}; +use async_nats::service::ServiceExt; use base64::prelude::*; use edgegap::{apis::sessions_api::*, apis::Error as EdgegapError, models::SessionModel}; use futures::StreamExt; use lightyear::prelude::ConnectToken; use log::*; use serde::{de, Deserialize, Serialize}; -// use std::{env, str::from_utf8, time::Duration}; +use std::net::SocketAddr; #[derive(Deserialize, Debug)] struct SessionRequest { @@ -54,18 +52,6 @@ fn decode_request(raw: &[u8]) -> Result { }) } -pub async fn session_cleanup_supervisor(state: &MatchmakerState) -> Result<(), async_nats::Error> { - let state = state.clone(); - let handle = tokio::spawn(async move { - loop { - let _ = session_cleanup_watcher(&state).await; - error!("session_cleanup_watcher exited, restarting"); - } - }); - futures::future::join_all([handle]).await; - Ok(()) -} - pub async fn session_request_supervisor(state: &MatchmakerState) -> Result<(), async_nats::Error> { let handles = (0..5).map(|_| session_request_handler(state)); @@ -74,32 +60,6 @@ pub async fn session_request_supervisor(state: &MatchmakerState) -> Result<(), a Ok(()) } -async fn session_cleanup_watcher(state: &MatchmakerState) -> Result<(), async_nats::Error> { - let kv = state.kv_sessions(); - let mut watcher = kv.watch(">").await?; - while let Some(event) = watcher.next().await { - info!("{event:?}"); - match event { - Ok(event) => { - let session_id = event.key; - if event.operation == Operation::Delete { - info!("active_connection deleted, deleting session {session_id}",); - session_delete(state.configuration(), session_id.as_str()) - .await - .expect("Failed to delete session"); - } - if event.operation == Operation::Put { - info!("New Session put {session_id} "); - } - } - Err(e) => { - warn!("KV event error watching for session cleanup: {:?}", e); - } - } - } - Ok(()) -} - async fn session_request_handler(state: &MatchmakerState) -> Result<(), async_nats::Error> { let client = state.nats_client(); info!("Listening for session requests on 'session_requests'"); @@ -133,11 +93,30 @@ async fn session_request_handler(state: &MatchmakerState) -> Result<(), async_na .await .unwrap(); } + + Err(edgegap::apis::Error::ResponseError(e)) => { + error!("edgegap api error: {:?}", e); + let (err_code, err_msg) = match e.entity { + Some(SessionPostError::Status400(ee)) => (400, ee.message), + Some(SessionPostError::Status401(ee)) => (401, ee.message), + Some(SessionPostError::Status409(ee)) => (409, ee.message), + _ => (999, "unknown error".to_string()), + }; + error!("error in session_responder: {err_code}={err_msg}"); + request + .respond(Err(async_nats::service::error::Error { + status: err_msg, + code: err_code, + })) + .await + .unwrap(); + } Err(e) => { + error!("Unhandled error in session_responder: {:?}", e); request .respond(Err(async_nats::service::error::Error { status: format!("error generating session: {}", e), - code: 0, + code: 500, // internal server error })) .await .unwrap(); @@ -148,8 +127,8 @@ async fn session_request_handler(state: &MatchmakerState) -> Result<(), async_na // TODO: not sure how to properly respond with an error here. request .respond(Err(async_nats::service::error::Error { - status: format!("error decoding session request: {}", e), - code: 0, + status: "error decoding session request!".to_string(), + code: 400, // bad request })) .await .unwrap(); @@ -163,6 +142,10 @@ async fn session_request_handler(state: &MatchmakerState) -> Result<(), async_na Ok(()) } +// if app version disabled, get_session returns: +// Nats-Service-Error-Code: 0 +//Nats-Service-Error: error generating session: error in response: status code 400 Bad Request + /// Generate the session on edgegap, the connect token, and reply via nats: /// /// * Call edgegap's session creation API @@ -203,12 +186,15 @@ async fn session_responder( {"session_id": "950dd2eaff09-S", "status": "Ready", "ready": true, "kind": "Seat", "user_count": 1, "linked": true, "webhook_url": "https://example.com/hook/session", "deployment_request_id": "57f84a8e1298"} - so perhaps we should just watch a KV value for the session, and update it with polling and from - the webhook, whichever happens first, then reply to the client. + + so perhaps we should have clients watch a requesting_session.SESSION_ID queue, + and write updates to that (from polling or a webhook). + */ let mut session_get; let mut tries = 0; + let mut first_seen_session_id = false; tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; loop { tries += 1; @@ -224,6 +210,22 @@ async fn session_responder( info!("{session_get:?}"); + // Avoid session leakage! + // the first time we get a response with a session_id, we store it in unclaimed_sessions, + // so we can automatically delete it if it goes unused. + if !first_seen_session_id { + first_seen_session_id = true; + let session_id_str = session_get.session_id.clone(); + info!("Writing session_id to unclaimed_sessions KV: {session_id_str}"); + let val = session_id_str.clone().into(); + state + .nats + .kv_unclaimed_sessions() + .put(session_id_str, val) + .await + .expect("Failed to put session_id in unclaimed_sessions KV"); + } + if session_get.ready { break; } @@ -278,6 +280,7 @@ async fn session_responder( // TODO once the session is ready, the cert digest should have been reported, but // there is definitely a race here so we should block on it for a second or so? let cert_digest = state + .nats .kv_cert_digests() .get(public_ip_str) .await @@ -292,13 +295,13 @@ async fn session_responder( info!( "🏠 BUILD ConnectToken: server_addresses = {server_addresses} proto id: {}, client_id: {client_id}, privkey: {:?}", state.settings.protocol_id(), - state.settings.private_key_bytes() + state.lightyear_private_key() ); let token = ConnectToken::build( server_addresses, state.settings.protocol_id(), client_id, - state.settings.private_key_bytes(), + state.lightyear_private_key(), ) .generate() .expect("Failed to generate token"); @@ -310,6 +313,7 @@ async fn session_responder( // lookup based on clientid. let client_id_str = client_id.to_string(); state + .nats .kv_c2s() .put( client_id_str.as_str(), @@ -323,6 +327,7 @@ async fn session_responder( )) })?; state + .nats .kv_s2c() .put(session_get.session_id.as_str(), client_id_str.into()) .await @@ -347,3 +352,13 @@ async fn session_responder( Ok(resp) } + +// prevent session leaks: +/* + When a client connects to the game server, the server puts the session_id to KV active_sessions. + + If we write session_id to unclaimed_sessions on issue, then delete when it appears in active_sessions, + we can poll the unclaimed_sessions for keys older than a timeout, for deletion? + + +*/ diff --git a/bevygap_matchmaker_httpd/Cargo.toml b/bevygap_matchmaker_httpd/Cargo.toml index 2ed6f9c..ee033cd 100644 --- a/bevygap_matchmaker_httpd/Cargo.toml +++ b/bevygap_matchmaker_httpd/Cargo.toml @@ -18,6 +18,7 @@ bevygap_shared = { path = "../bevygap_shared" } anyhow = "1.0" tower-http.workspace = true clap.workspace = true +async-nats.workspace = true [lints] workspace = true diff --git a/bevygap_matchmaker_httpd/src/main.rs b/bevygap_matchmaker_httpd/src/main.rs index be3b481..d2dd0d5 100644 --- a/bevygap_matchmaker_httpd/src/main.rs +++ b/bevygap_matchmaker_httpd/src/main.rs @@ -1,6 +1,8 @@ +use async_nats::client::RequestErrorKind; use axum::extract::{Request, State}; use axum::http::{header, HeaderValue, Method}; use axum::routing::post; +use axum::Json; use axum::{ extract::ConnectInfo, extract::Query, @@ -122,7 +124,8 @@ async fn wannaplay_handler( Query(params): Query, State(state): State>, req: Request, -) -> Result { +) -> Response { + //Result { // client_ip is the one sent to Edgegap, to decide which server to assign the player to. // We use one provided in the qs, otherwise the connecting IP of the http client. let mut client_ip = params.client_ip.unwrap_or(addr.ip().to_string()); @@ -146,17 +149,58 @@ async fn wannaplay_handler( info!("wannaplay_handler req for ip {client_ip}"); let payload = format!("{{\"client_ip\":\"{client_ip}\"}}"); - let resp = state + match state .bgnats .client() // .request_with_headers(subject, headers, payload) .request("session.gensession", payload.into()) - .await?; + .await + { + // Don't really understand the reasoning here, but if you respond to a service + // request with an Err, you still get an Ok here, and have to examine the headers + // to figure out if it was actually an error? + // see: https://github.com/nats-io/nats.rs/blob/main/async-nats/tests/service_tests.rs#L245 + Ok(resp) => { + if let Some((code, msg)) = message_error(&resp) { + error!("Got error matchmaker response: {:?}", msg); + ( + StatusCode::from_u16(code as u16).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR), + msg, + ) + .into_response() + } else { + info!("Got OK matchmaker response: {:?}", resp); + ([(header::CONTENT_TYPE, "text/json")], resp.payload).into_response() + } + } + Err(e) => { + warn!("Got Err matchmaker response: {:?}", e); + match e.kind() { + RequestErrorKind::TimedOut => { + (StatusCode::REQUEST_TIMEOUT, "Request timeout").into_response() + } + RequestErrorKind::NoResponders => { + (StatusCode::SERVICE_UNAVAILABLE, "No service responders").into_response() + } + RequestErrorKind::Other => { + (StatusCode::INTERNAL_SERVER_ERROR, "Unhandled error").into_response() + } + } + } + } +} - info!("Got matchmaker response: {:?}", resp); - // resp.payload - let reply = ([(header::CONTENT_TYPE, "text/json")], resp.payload); - Ok(reply) +fn message_error(message: &async_nats::Message) -> Option<(usize, String)> { + let h = message.headers.clone()?; + if let Some(code) = h.get(async_nats::service::NATS_SERVICE_ERROR_CODE) { + let msg_str = h + .get(async_nats::service::NATS_SERVICE_ERROR) + .unwrap() + .to_string(); + Some((code.as_str().parse::().unwrap(), msg_str)) + } else { + None + } } /// Serde deserialization decorator to map empty Strings to None, @@ -184,28 +228,3 @@ fn setup_logging() { .with(tracing_subscriber::fmt::Layer::default().compact()) .init(); } - -// Make our own error that wraps `anyhow::Error`. -struct AppError(anyhow::Error); - -// Tell axum how to convert `AppError` into a response. -impl IntoResponse for AppError { - fn into_response(self) -> Response { - ( - StatusCode::INTERNAL_SERVER_ERROR, - format!("Something went wrong: {}", self.0), - ) - .into_response() - } -} - -// This enables using `?` on functions that return `Result<_, anyhow::Error>` to turn them into -// `Result<_, AppError>`. That way you don't need to do that manually. -impl From for AppError -where - E: Into, -{ - fn from(err: E) -> Self { - Self(err.into()) - } -} diff --git a/bevygap_server_plugin/src/plugin.rs b/bevygap_server_plugin/src/plugin.rs index 457897d..32ae69b 100644 --- a/bevygap_server_plugin/src/plugin.rs +++ b/bevygap_server_plugin/src/plugin.rs @@ -167,7 +167,7 @@ fn setup_nats(runtime: ResMut, mut commands: Commands) { info!("NATS connected"); let kv_c2s = bgnats.kv_c2s().clone(); - let kv_sessions = bgnats.kv_sessions().clone(); + let kv_sessions = bgnats.kv_active_connections().clone(); let kv_cert_digests = bgnats.kv_cert_digests().clone(); let client = bgnats.client().clone(); @@ -198,14 +198,17 @@ fn setup_nats(runtime: ResMut, mut commands: Commands) { panic!("Client ID is not mapped to a session id! wtf."); } Some(session_id) => { - let key = String::from_utf8(session_id.into()) + let session_id_key = String::from_utf8(session_id.into()) .expect("Failed to convert session_id to string"); - info!("Client ID {client_id} associated with session id: {key}",); - client_id_to_session_id.insert(client_id, key.clone()); + info!("Client ID {client_id} associated with session id: {session_id_key}",); + client_id_to_session_id.insert(client_id, session_id_key.clone()); kv_sessions - .put(key, client_id.to_string().into()) + .put(session_id_key, client_id.to_string().into()) .await .expect("Failed to put client_id in KV"); + // delete the mappings. + // this signifies the session + // let _ = kv_c2s.delete(client_id.to_string()).await; } } } diff --git a/bevygap_shared/src/lib.rs b/bevygap_shared/src/lib.rs index fde61aa..b2e3f8c 100644 --- a/bevygap_shared/src/lib.rs +++ b/bevygap_shared/src/lib.rs @@ -1,4 +1,5 @@ -use async_nats::jetstream; +use async_nats::jetstream::stream::Stream; +use async_nats::jetstream::{self, stream}; use async_nats::Client; use std::time::Duration; @@ -11,22 +12,30 @@ pub struct BevygapNats { kv_s2c: jetstream::kv::Store, kv_c2s: jetstream::kv::Store, kv_cert_digests: jetstream::kv::Store, - kv_sessions: jetstream::kv::Store, + kv_active_connections: jetstream::kv::Store, + kv_unclaimed_sessions: jetstream::kv::Store, + delete_session_stream: Stream, } +const DELETE_SESSION_STREAM: &str = "edgegap_delete_session_q"; + impl BevygapNats { /// Connects to NATS based on environment variables. pub async fn new_and_connect(nats_client_name: &str) -> Result { let client = Self::connect_to_nats(nats_client_name).await?; let (kv_s2c, kv_c2s) = Self::create_kv_buckets_for_session_mappings(client.clone()).await?; - let kv_sessions = Self::create_kv_sessions(client.clone()).await?; + let kv_active_connections = Self::create_kv_active_connections(client.clone()).await?; let kv_cert_digests = Self::create_kv_cert_digests(client.clone()).await?; + let kv_unclaimed_sessions = Self::create_kv_unclaimed_sessions(client.clone()).await?; + let delete_session_stream = Self::create_session_delete_queue(&client).await?; Ok(Self { client, kv_s2c, kv_c2s, kv_cert_digests, - kv_sessions, + kv_active_connections, + kv_unclaimed_sessions, + delete_session_stream, }) } @@ -39,12 +48,33 @@ impl BevygapNats { pub fn kv_c2s(&self) -> &jetstream::kv::Store { &self.kv_c2s } - pub fn kv_sessions(&self) -> &jetstream::kv::Store { - &self.kv_sessions + pub fn kv_active_connections(&self) -> &jetstream::kv::Store { + &self.kv_active_connections + } + pub fn kv_unclaimed_sessions(&self) -> &jetstream::kv::Store { + &self.kv_unclaimed_sessions } pub fn kv_cert_digests(&self) -> &jetstream::kv::Store { &self.kv_cert_digests } + pub fn delete_session_stream(&self) -> &Stream { + &self.delete_session_stream + } + + /// Enqueues a job to delete a session id via the edgegap API + pub async fn enqueue_session_delete( + &self, + session_id: String, + ) -> Result<(), async_nats::Error> { + let js = jetstream::new(self.client.clone()); + js.publish( + format!("{DELETE_SESSION_STREAM}.{session_id}"), + session_id.into(), + ) + .await? + .await?; + Ok(()) + } async fn connect_to_nats(nats_client_name: &str) -> Result { info!("Setting up NATS, client name: {}", nats_client_name); @@ -77,7 +107,7 @@ impl BevygapNats { Ok(client) } - pub async fn create_kv_sessions( + pub async fn create_kv_active_connections( client: Client, ) -> Result { let jetstream = jetstream::new(client); @@ -90,6 +120,34 @@ impl BevygapNats { Ok(kv) } + pub async fn create_kv_unclaimed_sessions( + client: Client, + ) -> Result { + let jetstream = jetstream::new(client); + let kv = jetstream + .create_key_value(async_nats::jetstream::kv::Config { + bucket: "unclaimed_sessions".to_string(), + max_value_size: 1024, + description: "Any session ids we get from the API are stored here, and if they key age gets too big, we delete the session via the API.".to_string(), + ..Default::default() + }) + .await?; + Ok(kv) + } + + pub async fn create_session_delete_queue(client: &Client) -> Result { + let js = jetstream::new(client.clone()); + let stream = js + .create_stream(jetstream::stream::Config { + name: "DELETE_SESSION_STREAM".to_string(), + retention: stream::RetentionPolicy::WorkQueue, + subjects: vec![format!("{DELETE_SESSION_STREAM}.*").to_string()], + ..Default::default() + }) + .await?; + Ok(stream) + } + pub async fn create_kv_cert_digests( client: Client, ) -> Result { @@ -100,6 +158,7 @@ impl BevygapNats { description: "Maps server public ip to their self-signed cert digests".to_string(), max_age: Duration::from_secs(86400 * 14), max_value_size: 1024, + ..Default::default() }) .await?;