Skip to content

Commit

Permalink
orphaned session protection, funnel service errors to http client
Browse files Browse the repository at this point in the history
  • Loading branch information
RJ committed Oct 16, 2024
1 parent 59f64f5 commit 1ba6e22
Show file tree
Hide file tree
Showing 12 changed files with 389 additions and 119 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ tokio-tungstenite = "0.23"
clap = { version = "4.4", features = ["derive"] }
# same LY ver as game:
lightyear = {git = "https://github.com/cBournhonesque/lightyear.git", rev = "0b7a4852fbb2eb9848d2fbc04dd4eea70d51ea98", default-features = false}

time = {version = "0.3.36", features = ["std"]}
base64 = "0.22"
# edgegap = {path = "./edgegap-client"}
url = "^2.5"
Expand Down
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,4 +159,8 @@ Access to fetch at 'http://127.0.0.1:3000/wannaplay' from origin 'http://lan-mac
## TODO

The client needs to be configured with the path to the matchmaker_httpd endpoint.
Possibly defaulting to /matchmaker/* on current domain?
Possibly defaulting to /matchmaker/* on current domain?

Need to guarantee no leaked sessions.
If MM gives session and player connects, server will delete session on player disconnect.
if they fail to connect to server after getting a session, session leaks atm.
2 changes: 1 addition & 1 deletion bevygap_matchmaker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ futures.workspace = true
futures-util.workspace = true
tokio.workspace = true
tokio-tungstenite.workspace = true

time.workspace = true
edgegap = {path = "../edgegap-client"}

serde.workspace = true
Expand Down
44 changes: 21 additions & 23 deletions bevygap_matchmaker/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// use std::time::Duration;

use async_nats::jetstream;
// use async_nats::jetstream;
// use async_nats::jetstream::stream::Stream;
// use async_nats::jetstream::stream::StorageType;
use async_nats::Client;
use clap::Parser;
Expand All @@ -10,13 +11,18 @@ use edgegap::apis::configuration::*;
use futures::stream::StreamExt;
use lightyear::connection::netcode::PRIVATE_KEY_BYTES;
use log::*;
use session_service::session_cleanup_supervisor;
use tracing_subscriber::{layer::*, util::*};

use bevygap_shared::*;

mod session_delete_worker;
mod session_reaper;
mod session_service;

use session_delete_worker::*;
use session_reaper::*;
use session_service::*;

fn edgegap_configuration(_settings: &Settings) -> Configuration {
let key =
std::env::var("EDGEGAP_API_KEY").expect("EDGEGAP_API_KEY environment variable is not set");
Expand Down Expand Up @@ -47,7 +53,7 @@ pub struct Settings {
}

impl Settings {
pub fn private_key_bytes(&self) -> [u8; PRIVATE_KEY_BYTES] {
fn parse_private_key(&self) -> [u8; PRIVATE_KEY_BYTES] {
if self.lightyear_private_key.is_empty() {
return [0u8; PRIVATE_KEY_BYTES];
}
Expand Down Expand Up @@ -99,6 +105,7 @@ pub(crate) struct MatchmakerState {
nats: BevygapNats,
api_config: Configuration,
settings: Settings,
lypkey: [u8; PRIVATE_KEY_BYTES],
}

impl MatchmakerState {
Expand All @@ -108,17 +115,8 @@ impl MatchmakerState {
pub(crate) fn configuration(&self) -> &Configuration {
&self.api_config
}
pub(crate) fn kv_s2c(&self) -> &jetstream::kv::Store {
self.nats.kv_s2c()
}
pub(crate) fn kv_c2s(&self) -> &jetstream::kv::Store {
self.nats.kv_c2s()
}
pub(crate) fn kv_sessions(&self) -> &jetstream::kv::Store {
self.nats.kv_sessions()
}
pub(crate) fn kv_cert_digests(&self) -> &jetstream::kv::Store {
self.nats.kv_cert_digests()
pub(crate) fn lightyear_private_key(&self) -> [u8; PRIVATE_KEY_BYTES] {
self.lypkey
}
}

Expand All @@ -127,23 +125,23 @@ async fn main() -> Result<(), async_nats::Error> {
setup_logging();
info!("Starting Edgegap Matchmaker");
let bgnats = BevygapNats::new_and_connect("matchmaker").await.unwrap();

let settings = Settings::parse();
// info!("priv key bytes: {:?}", settings.private_key_bytes());

let lypkey = settings.parse_private_key();
let api_config = edgegap_configuration(&settings);

let mm_state = MatchmakerState {
nats: bgnats,
api_config,
settings,
lypkey,
};

// ensure the specified app, version, and deployment are valid and ready for players.
verify_application(&mm_state).await?;

let state = mm_state.clone();
let _watcher = tokio::spawn(async move { session_cleanup_supervisor(&state).await });
let _a = tokio::spawn(async move { session_cleanup_supervisor(&state).await });
let state = mm_state.clone();
let _b = tokio::spawn(async move { delete_session_worker_supervisor(&state).await });

let state = mm_state.clone();
let _watcher = tokio::spawn(async move {
Expand All @@ -155,7 +153,7 @@ async fn main() -> Result<(), async_nats::Error> {

let state = mm_state.clone();
let session_service = tokio::spawn(async move {
match session_service::session_request_supervisor(&state).await {
match session_request_supervisor(&state).await {
Ok(_) => info!("Session service completed"),
Err(e) => error!("Error in session service: {}", e),
}
Expand Down Expand Up @@ -195,13 +193,13 @@ async fn verify_application(state: &MatchmakerState) -> Result<(), async_nats::E
info!("🟢 Application version '{}' is active.", app_version.name);
} else {
error!(
"🔴 Application version '{}' is not active, aborting.",
"🔴 Application version '{}' is not active, won't be able to create sessions.",
app_version.name
);
std::process::exit(1);
// std::process::exit(1);
}

info!("✅ {} @ {}", settings.app_name, settings.app_version);
// info!("✅ {} @ {}", settings.app_name, settings.app_version);

Ok(())
}
Expand Down
73 changes: 73 additions & 0 deletions bevygap_matchmaker/src/session_delete_worker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use crate::MatchmakerState;
use async_nats::jetstream::{self};
use edgegap::apis::sessions_api::*;
use futures::StreamExt;
use log::*;

// need an erlang/OTP like supervision tree!
pub async fn delete_session_worker_supervisor(
state: &MatchmakerState,
) -> Result<(), async_nats::Error> {
loop {
let state = state.clone();
let handle = tokio::spawn(async move {
let res = delete_session_worker(&state).await;
if let Err(e) = res {
error!("delete_session_worker error: {e:?}");
}
});
futures::future::join_all([handle]).await;
warn!("delete_session_worker exited, restarting after timeout");
tokio::time::sleep(std::time::Duration::from_secs(30)).await;
}
Ok(())
}

async fn delete_session_worker(state: &MatchmakerState) -> Result<(), async_nats::Error> {
let stream = state.nats.delete_session_stream();
let consumer = stream
.create_consumer(jetstream::consumer::pull::Config {
durable_name: Some("api-deleter-1".to_string()),
description: Some("Calls edgegap session delete api".to_string()),
ack_policy: jetstream::consumer::AckPolicy::Explicit,
..Default::default()
})
.await?;

loop {
let mut messages = consumer.fetch().max_messages(100).messages().await?;
while let Some(Ok(message)) = messages.next().await {
let session_id = String::from_utf8(message.payload.to_vec())?;
match session_delete(state.configuration(), session_id.as_str()).await {
Ok(session_delete_response) => {
info!("session_delete ok: {:?}", session_delete_response);
message.ack().await?;
}
Err(edgegap::apis::Error::ResponseError(resp_content)) => {
match resp_content.status.as_u16() {
404 => {
// session already deleted or never existed.
warn!("session_delete 404: {session_id}");
message.ack().await?;
}
410 => {
// "instance already terminated"
warn!("session_delete 410 'instance already terminated': {session_id}");
message.ack().await?;
}
code => {
error!("session_delete error status = {code} for {session_id} {resp_content:?}");
}
}
}
Err(e) => {
// TODO What to do about junk data on queue that can never be deleted?
error!("unhandled session_delete error {session_id}: {e:?}");
}
}
}
tokio::time::sleep(std::time::Duration::from_millis(5000)).await;
}

Ok(())
}
96 changes: 96 additions & 0 deletions bevygap_matchmaker/src/session_reaper.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/// Detects orphaned edgegap sessions and schedules them for deletion by the API
/// Actual API-delete call happens in the session_delete_worker.
use crate::MatchmakerState;
use ::time::OffsetDateTime;
use async_nats::jetstream::kv::Operation;
use futures::{StreamExt, TryStreamExt};
use log::*;
use tokio::time::{self, Duration};

pub(crate) async fn session_cleanup_supervisor(
orig_state: &MatchmakerState,
) -> Result<(), async_nats::Error> {
let state = orig_state.clone();
let handle1 = tokio::spawn(async move {
loop {
let _ = session_cleanup_watcher(&state).await;
error!("session_cleanup_watcher exited, restarting");
}
});
let state = orig_state.clone();
let handle2 = tokio::spawn(async move {
loop {
let _ = unclaimed_session_reaper(&state).await;
error!("unclaimed_session_reaper exited, restarting");
}
});
futures::future::join_all([handle1, handle2]).await;
Ok(())
}

/// Get all the session keys in unclaimed sessions - if any are older than 30 seconds,
/// enqueue them for deletion.
/// Session ids must be removed from unclaimed_sessions once a gameserver connection happens.
async fn unclaimed_session_reaper(state: &MatchmakerState) -> Result<(), async_nats::Error> {
// how often to check for orphaned sessions:
let mut interval = time::interval(Duration::from_millis(5000));
let kv = state.nats.kv_unclaimed_sessions();
loop {
interval.tick().await;
let mut keys = kv.keys().await?.boxed();
while let Some(key) = keys.try_next().await? {
let Ok(Some(entry)) = kv.entry(&key).await else {
continue;
};
let session_id = String::from_utf8(entry.value.to_vec())
.expect("Failed to convert session_id to string");
let age = OffsetDateTime::now_utc() - entry.created;
info!("* Session {session_id} is {age} old");
if age > Duration::from_secs(30) {
warn!("Unclaimed session {session_id} is older than 30 seconds = {age}");
// write to delete_sessions work queue and remove from unclaimed_sessions KV
state
.nats
.enqueue_session_delete(session_id.clone())
.await?;
kv.delete(&key).await?;
}
}
}
Ok(())
}

/// Deletes sessions once a gameserver removes the active_sessions KV entry.
/// this is the happy path, where there were no orphans..
async fn session_cleanup_watcher(state: &MatchmakerState) -> Result<(), async_nats::Error> {
let kv = state.nats.kv_active_connections();
let mut watcher = kv.watch(">").await?;
while let Some(event) = watcher.next().await {
info!("{event:?}");
match event {
Ok(event) => {
let session_id = event.key;
if event.operation == Operation::Delete {
info!("active_connection deleted, deleting session {session_id}",);
state
.nats
.enqueue_session_delete(session_id.clone())
.await?;
}
if event.operation == Operation::Put {
info!("New Session put {session_id}, deleting from unclaimed_sessions ");
// delete this session_id from unclaimed_sessions.
let _ = state
.nats
.kv_unclaimed_sessions()
.delete(session_id.as_str())
.await;
}
}
Err(e) => {
warn!("KV event error watching for session cleanup: {:?}", e);
}
}
}
Ok(())
}
Loading

0 comments on commit 1ba6e22

Please sign in to comment.