From 691c84749c3fa9f83f765497b629e7f6bcaebfd6 Mon Sep 17 00:00:00 2001 From: Danny Hadley Date: Thu, 9 Dec 2021 19:59:40 -0500 Subject: [PATCH] moving table index aggregation to background --- Cargo.lock | 6 +- workspace/bankah/Cargo.toml | 2 +- workspace/bankah/src/jobs.rs | 17 ++++++ workspace/bankah/src/state.rs | 7 +++ workspace/stickbot/Cargo.toml | 2 +- workspace/stickbot/src/bin/boxbot.rs | 17 +++--- workspace/stickbot/src/constants.rs | 4 +- workspace/stickbot/src/processors/admin.rs | 25 +++++++++ workspace/stickbot/src/processors/mod.rs | 1 + workspace/stickbot/src/routes/jobs.rs | 2 +- workspace/stickbot/src/routes/tables.rs | 64 +++++++++++++--------- workspace/stickbot/src/services.rs | 6 +- workspace/twowaiyo/Cargo.toml | 2 +- 13 files changed, 111 insertions(+), 44 deletions(-) create mode 100644 workspace/stickbot/src/processors/admin.rs diff --git a/Cargo.lock b/Cargo.lock index 10a0c60..96bc8fa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -345,7 +345,7 @@ checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" [[package]] name = "bankah" -version = "0.1.0" +version = "0.2.0" dependencies = [ "bson", "chrono", @@ -2162,7 +2162,7 @@ checksum = "213701ba3370744dcd1a12960caa4843b3d68b4d1c0a5d575e0d65b2ee9d16c0" [[package]] name = "stickbot" -version = "0.1.0" +version = "0.2.0" dependencies = [ "async-std", "bankah", @@ -2511,7 +2511,7 @@ dependencies = [ [[package]] name = "twowaiyo" -version = "0.1.0" +version = "0.2.0" dependencies = [ "bankah", "dotenv", diff --git a/workspace/bankah/Cargo.toml b/workspace/bankah/Cargo.toml index 3eb1f62..12f652b 100644 --- a/workspace/bankah/Cargo.toml +++ b/workspace/bankah/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bankah" -version = "0.1.0" +version = "0.2.0" edition = "2018" [dependencies] diff --git a/workspace/bankah/src/jobs.rs b/workspace/bankah/src/jobs.rs index 033152e..da37304 100644 --- a/workspace/bankah/src/jobs.rs +++ b/workspace/bankah/src/jobs.rs @@ -22,10 +22,16 @@ pub struct JobWapper { pub attempts: u8, } +#[derive(Debug, Deserialize, Serialize, Clone)] +pub enum TableAdminJob { + ReindexPopulations, +} + #[derive(Debug, Deserialize, Serialize, Clone)] pub enum TableJob { Bet(JobWapper), Roll(JobWapper), + Admin(JobWapper), } impl TableJob { @@ -33,6 +39,7 @@ impl TableJob { match self { TableJob::Bet(inner) => inner.id.clone(), TableJob::Roll(inner) => inner.id.clone(), + TableJob::Admin(inner) => inner.id.clone(), } } @@ -46,6 +53,15 @@ impl TableJob { } } + pub fn reindex() -> Self { + let id = uuid::Uuid::new_v4(); + TableJob::Admin(JobWapper { + job: TableAdminJob::ReindexPopulations, + id, + attempts: 0, + }) + } + pub fn roll(table: uuid::Uuid, version: uuid::Uuid) -> Self { let id = uuid::Uuid::new_v4(); let job = RollJob { table, version }; @@ -80,6 +96,7 @@ pub enum TableJobOutput { BetFailed(BetFailureReason), RollProcessed, RollStale, + AdminOk, } #[derive(Debug, Serialize)] diff --git a/workspace/bankah/src/state.rs b/workspace/bankah/src/state.rs index 57916b4..8cd54a1 100644 --- a/workspace/bankah/src/state.rs +++ b/workspace/bankah/src/state.rs @@ -44,6 +44,13 @@ impl Default for SeatState { } } +#[derive(Debug, Deserialize, Serialize)] +pub struct TableIndexState { + pub id: uuid::Uuid, + pub name: String, + // pub population: u32, +} + #[derive(Debug, Deserialize, Serialize)] pub struct TableState { pub id: uuid::Uuid, diff --git a/workspace/stickbot/Cargo.toml b/workspace/stickbot/Cargo.toml index 0c59b5e..055f18d 100644 --- a/workspace/stickbot/Cargo.toml +++ b/workspace/stickbot/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "stickbot" -version = "0.1.0" +version = "0.2.0" edition = "2018" [lib] diff --git a/workspace/stickbot/src/bin/boxbot.rs b/workspace/stickbot/src/bin/boxbot.rs index 7348664..99e29c8 100644 --- a/workspace/stickbot/src/bin/boxbot.rs +++ b/workspace/stickbot/src/bin/boxbot.rs @@ -8,7 +8,7 @@ use bankah::jobs::TableJob; const POP_CMD: kramer::Command<&'static str, &'static str> = kramer::Command::List::<_, &str>(kramer::ListCommand::Pop( kramer::Side::Left, - stickbot::constants::STICKBOT_BETS_QUEUE, + stickbot::constants::STICKBOT_JOB_QUEUE, Some((None, 3)), )); @@ -60,12 +60,11 @@ async fn work(services: &stickbot::Services) -> Result<()> { log::debug!("deserialized job from queue - {:?}", job); - let (id, result) = match &job { - bankah::jobs::TableJob::Bet(inner) => (inner.id.clone(), stickbot::processors::bet(&services, &inner.job).await), - bankah::jobs::TableJob::Roll(inner) => ( - inner.id.clone(), - stickbot::processors::roll(&services, &inner.job).await, - ), + let id = job.id(); + let result = match &job { + TableJob::Admin(inner) => stickbot::processors::admin::reindex(&services, &inner.job).await, + TableJob::Bet(inner) => stickbot::processors::bet(&services, &inner.job).await, + TableJob::Roll(inner) => stickbot::processors::roll(&services, &inner.job).await, }; // Processors will return a Result, where `E` can either represent a "fatal" error that is non-retryable or @@ -82,7 +81,7 @@ async fn work(services: &stickbot::Services) -> Result<()> { // api route to push bet jobs onto our queue. let command = kramer::Command::List(kramer::ListCommand::Push( (kramer::Side::Right, kramer::Insertion::Always), - stickbot::constants::STICKBOT_BETS_QUEUE, + stickbot::constants::STICKBOT_JOB_RESULTS, kramer::Arity::One(serialized), )); @@ -97,7 +96,7 @@ async fn work(services: &stickbot::Services) -> Result<()> { // Insert into our results hash the output from the processor. let sets = kramer::Command::Hashes(kramer::HashCommand::Set( - stickbot::constants::STICKBOT_BET_RESULTS, + stickbot::constants::STICKBOT_JOB_RESULTS, kramer::Arity::One((&sid, output.as_str())), kramer::Insertion::Always, )); diff --git a/workspace/stickbot/src/constants.rs b/workspace/stickbot/src/constants.rs index 99a829c..63a55d8 100644 --- a/workspace/stickbot/src/constants.rs +++ b/workspace/stickbot/src/constants.rs @@ -25,8 +25,8 @@ pub const REDIS_HOSTNAME_ENV: &'static str = "STICKBOT_REDIS_HOSTNAME"; pub const REDIS_PORT_ENV: &'static str = "STICKBOT_REDIS_PORT"; pub const REDIS_PASSWORD_ENV: &'static str = "STICKBOT_REDIS_PASSWORD"; -pub const STICKBOT_BETS_QUEUE: &'static str = "stickbot:bets"; -pub const STICKBOT_BET_RESULTS: &'static str = "stickbot:bet_results"; +pub const STICKBOT_JOB_QUEUE: &'static str = "stickbot:jobs"; +pub const STICKBOT_JOB_RESULTS: &'static str = "stickbot:job_results"; pub const EMPTY_RESPONSE: &'static str = ""; diff --git a/workspace/stickbot/src/processors/admin.rs b/workspace/stickbot/src/processors/admin.rs new file mode 100644 index 0000000..d2ee040 --- /dev/null +++ b/workspace/stickbot/src/processors/admin.rs @@ -0,0 +1,25 @@ +use bankah::jobs::{JobError, TableAdminJob, TableJobOutput}; + +use crate::db::doc; +use crate::Services; + +pub async fn reindex<'a>(services: &Services, job: &TableAdminJob) -> Result { + log::info!("attempting to reindex table populations - {:?}", job); + let tables = services.tables(); + let pipeline = vec![ + doc! { "$project": { "id": 1, "name": 1, "seats": { "$objectToArray": "$seats" } } }, + doc! { "$project": { "id": 1, "name": 1, "population": { + "$map": { + "input": "$seats", + "as": "seat", + "in": ["$$seat.k", "$$seat.v.nickname"], + }, + } } }, + doc! { "$merge": { "into": crate::constants::MONGO_DB_TABLE_LIST_COLLECTION_NAME } }, + ]; + tables.aggregate(pipeline, None).await.map_err(|error| { + log::warn!("unable to perform aggregate - {}", error); + JobError::Retryable + })?; + Ok(TableJobOutput::AdminOk) +} diff --git a/workspace/stickbot/src/processors/mod.rs b/workspace/stickbot/src/processors/mod.rs index 4c56ebc..7da27c5 100644 --- a/workspace/stickbot/src/processors/mod.rs +++ b/workspace/stickbot/src/processors/mod.rs @@ -1,5 +1,6 @@ mod bets; mod rolls; +pub mod admin; pub use bets::bet; pub use rolls::roll; diff --git a/workspace/stickbot/src/routes/jobs.rs b/workspace/stickbot/src/routes/jobs.rs index 615b274..9146a99 100644 --- a/workspace/stickbot/src/routes/jobs.rs +++ b/workspace/stickbot/src/routes/jobs.rs @@ -27,7 +27,7 @@ pub async fn find(request: Request) -> Result { log::debug!("player '{}' checking on job '{}'", player.id, query.id); let command = kramer::Command::Hashes(kramer::HashCommand::Get::<_, &str>( - constants::STICKBOT_BET_RESULTS, + constants::STICKBOT_JOB_RESULTS, Some(kramer::Arity::One(query.id.as_str())), )); diff --git a/workspace/stickbot/src/routes/tables.rs b/workspace/stickbot/src/routes/tables.rs index c9eaf88..4dcff28 100644 --- a/workspace/stickbot/src/routes/tables.rs +++ b/workspace/stickbot/src/routes/tables.rs @@ -80,23 +80,26 @@ pub async fn list(request: Request) -> Result { .ok_or(Error::from_str(404, ""))?; log::trace!("listing tables for '{:?}'", player); - let collection = request.state().tables(); + let collection = request.state().table_index(); let mut tables = collection.find(None, None).await.map_err(|error| { log::warn!("unable to query tables - {}", error); Error::from_str(500, "load-tables") })?; - let mut page: Vec = Vec::with_capacity(10); + let mut page = Vec::with_capacity(10); while let Some(doc) = tables.next().await { if let Ok(state) = doc { page.push(state) } + + if page.len() >= 10 { + break; + } } - let body = Body::from_json(&page)?; - Ok(Response::builder(200).body(body).build()) + Body::from_json(&page).map(|body| Response::builder(200).body(body).build()) } #[derive(Debug, Deserialize)] @@ -201,11 +204,17 @@ pub async fn join(mut request: Request) -> Result { .map_err(|error| { log::warn!("unable to create new table - {:?}", error); Error::from_str(422, "failed") - }) - .and_then(|_r| { - log::info!("player joined table '{}'", ts.id); - Body::from_json(&ts).map(|body| Response::builder(200).body(body).build()) - }) + })?; + + // TODO: do we care if our attempt to enqueue a job fails from the web thread? + let job = bankah::jobs::TableJob::reindex(); + request.state().queue(&job).await.map(|_| ()).unwrap_or_else(|error| { + log::warn!("unable to queue indexing job - {}", error); + () + }); + + log::info!("player joined table '{}'", ts.id); + Body::from_json(&ts).map(|body| Response::builder(200).body(body).build()) } // ## Route @@ -264,22 +273,12 @@ pub async fn create(request: Request) -> Result { Error::from_str(422, "failed") }); - let pipeline = vec![ - doc! { "$project": { "id": 1, "name": 1, "seats": { "$objectToArray": "$seats" } } }, - doc! { "$project": { "id": 1, "population": { - "$reduce": { - "input": "$seats", - "initialValue": 0, - "in": { - "$add": ["$$value", 1] - } - }, - } } }, - doc! { "$merge": { "into": crate::constants::MONGO_DB_TABLE_LIST_COLLECTION_NAME } }, - ]; - let agg = tables.aggregate(pipeline, None).await.map(|_| String::from("ok")); - - log::warn!("aggregate result - {:?}", agg); + // TODO: do we care if our attempt to enqueue a job fails from the web thread? + let job = bankah::jobs::TableJob::reindex(); + request.state().queue(&job).await.map(|_| ()).unwrap_or_else(|error| { + log::warn!("unable to queue indexing job - {}", error); + () + }); result.and_then(|_r| { log::info!("new table created - '{}'", ts.id); @@ -343,6 +342,21 @@ pub async fn leave(mut request: Request) -> Result { log::trace!("player '{}' finished leaving", ps.id); + // TODO: do we care if our attempt to enqueue a job fails from the web thread? + let job = bankah::jobs::TableJob::reindex(); + request + .state() + .queue(&job) + .await + .map(|_| { + log::debug!("successfully queued indexing job"); + () + }) + .unwrap_or_else(|error| { + log::warn!("unable to queue indexing job - {}", error); + () + }); + Body::from_json(&ts).map(|body| Response::builder(200).body(body).build()) } diff --git a/workspace/stickbot/src/services.rs b/workspace/stickbot/src/services.rs index e9f3a09..9cec00c 100644 --- a/workspace/stickbot/src/services.rs +++ b/workspace/stickbot/src/services.rs @@ -43,6 +43,10 @@ impl Services { db.collection::(name.as_ref()) } + pub fn table_index(&self) -> db::Collection { + self.collection(constants::MONGO_DB_TABLE_LIST_COLLECTION_NAME) + } + pub fn tables(&self) -> db::Collection { self.collection(constants::MONGO_DB_TABLE_COLLECTION_NAME) } @@ -59,7 +63,7 @@ impl Services { let command = kramer::Command::List(kramer::ListCommand::Push( (kramer::Side::Right, kramer::Insertion::Always), - constants::STICKBOT_BETS_QUEUE, + constants::STICKBOT_JOB_QUEUE, kramer::Arity::One(serialized), )); diff --git a/workspace/twowaiyo/Cargo.toml b/workspace/twowaiyo/Cargo.toml index 7420a3a..03df85b 100644 --- a/workspace/twowaiyo/Cargo.toml +++ b/workspace/twowaiyo/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "twowaiyo" -version = "0.1.0" +version = "0.2.0" edition = "2018" [lib]