diff --git a/.env.example b/.env.example index 5ed3ab3..8452e8a 100644 --- a/.env.example +++ b/.env.example @@ -20,3 +20,9 @@ STICKBOT_ADMIN_EMAILS="" STICKBOT_MAX_ACTIVE_TABLES_PER_PLAYER= BOXBOT_WORKER_DELAY=1000 + +STICKBOT_TABLE_COLLECTION="stickbot:tables" +STICKBOT_TABLE_LIST_COLLECTION="stickbot:table_list" +STICKBOT_PLAYER_COLECTION="stickbot:players" +STICKBOT_JOB_QUEUE="stickbot:jobs" +STICKBOT_JOB_RESULTS="stickbot:job_results" diff --git a/Cargo.lock b/Cargo.lock index 839389e..117af14 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -62,7 +62,7 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47" dependencies = [ - "getrandom 0.2.3", + "getrandom 0.2.4", "once_cell", "version_check", ] @@ -345,7 +345,7 @@ checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" [[package]] name = "bankah" -version = "0.3.4" +version = "0.4.0" dependencies = [ "bson", "chrono", @@ -563,9 +563,9 @@ checksum = "dcb25d077389e53838a8158c8e99174c5a9d902dee4904320db714f3c653ffba" [[package]] name = "crossbeam-utils" -version = "0.8.5" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d82cfc11ce7f2c3faef78d8a684447b40d503d9681acebed6cb728d45940c4db" +checksum = "cfcae03edb34f947e64acdb1c33ec169824e20657e9ecb61cef6c8c74dcb8120" dependencies = [ "cfg-if 1.0.0", "lazy_static", @@ -622,9 +622,9 @@ dependencies = [ [[package]] name = "curl" -version = "0.4.41" +version = "0.4.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1bc6d233563261f8db6ffb83bbaad5a73837a6e6b28868e926337ebbdece0be3" +checksum = "7de97b894edd5b5bcceef8b78d7da9b75b1d2f2f9a910569d0bde3dd31d84939" dependencies = [ "curl-sys", "libc", @@ -637,9 +637,9 @@ dependencies = [ [[package]] name = "curl-sys" -version = "0.4.51+curl-7.80.0" +version = "0.4.52+curl-7.81.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d130987e6a6a34fe0889e1083022fa48cd90e6709a84be3fb8dd95801de5af20" +checksum = "14b8c2d1023ea5fded5b7b892e4b8e95f70038a421126a056761a84246a28971" dependencies = [ "cc", "libc", @@ -935,9 +935,9 @@ dependencies = [ [[package]] name = "getrandom" -version = "0.2.3" +version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fcd999463524c52659517fe2cea98493cfe485d10565e7b0fb07dbba7ad2753" +checksum = "418d37c8b1d42553c93648be529cb70f920d3baf8ef469b74b9638df426e0b4c" dependencies = [ "cfg-if 1.0.0", "libc", @@ -1127,9 +1127,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "1.7.0" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc633605454125dec4b66843673f01c7df2b89479b32e0ed634e43a91cff62a5" +checksum = "282a6247722caba404c065016bbfa522806e51714c34f5dfc3e4a3a46fcb4223" dependencies = [ "autocfg", "hashbrown", @@ -1491,9 +1491,9 @@ checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" [[package]] name = "openssl-probe" -version = "0.1.4" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28988d872ab76095a6e6ac88d99b54fd267702734fd7ffe610ca27f533ddb95a" +checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] name = "openssl-sys" @@ -1510,9 +1510,9 @@ dependencies = [ [[package]] name = "os_info" -version = "3.0.9" +version = "3.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b89dd55b8d8d97dabd0d1adc625d188378fcf87632825bfe9c956acc9a11a72a" +checksum = "198e392be7e882f0c2836f425e430f81d9a0e99651e4646311347417cddbfd43" dependencies = [ "log", "winapi", @@ -1739,7 +1739,7 @@ version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d34f1408f55294453790c48b2f1ebbb1c5b4b7563eb1f418bcfcfdbb06ebb4e7" dependencies = [ - "getrandom 0.2.3", + "getrandom 0.2.4", ] [[package]] @@ -2078,9 +2078,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.7.0" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ecab6c735a6bb4139c0caafd0cc3635748bbb3acf4550e8138122099251f309" +checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83" [[package]] name = "socket2" @@ -2178,7 +2178,7 @@ checksum = "213701ba3370744dcd1a12960caa4843b3d68b4d1c0a5d575e0d65b2ee9d16c0" [[package]] name = "stickbot" -version = "0.3.4" +version = "0.4.0" dependencies = [ "async-std", "bankah", @@ -2186,7 +2186,7 @@ dependencies = [ "chrono", "dotenv", "env_logger", - "getrandom 0.2.3", + "getrandom 0.2.4", "http-types", "jsonwebtoken", "kramer", @@ -2233,7 +2233,7 @@ dependencies = [ "cfg-if 1.0.0", "encoding_rs", "futures-util", - "getrandom 0.2.3", + "getrandom 0.2.4", "http-client", "http-types", "log", @@ -2526,12 +2526,12 @@ dependencies = [ [[package]] name = "twowaiyo" -version = "0.3.4" +version = "0.4.0" dependencies = [ "bankah", "dotenv", "env_logger", - "getrandom 0.2.3", + "getrandom 0.2.4", "log", "uuid", ] @@ -2624,7 +2624,7 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7" dependencies = [ - "getrandom 0.2.3", + "getrandom 0.2.4", "serde", ] diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..99ece8e --- /dev/null +++ b/LICENSE @@ -0,0 +1,20 @@ +Copyright (c) 2022 Danny Hadley + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + diff --git a/workspace/bankah/Cargo.toml b/workspace/bankah/Cargo.toml index 53c7e43..0ae76ff 100644 --- a/workspace/bankah/Cargo.toml +++ b/workspace/bankah/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bankah" -version = "0.3.4" +version = "0.4.0" edition = "2018" [dependencies] diff --git a/workspace/bankah/src/jobs.rs b/workspace/bankah/src/jobs.rs index 9a8f6e4..dfcc546 100644 --- a/workspace/bankah/src/jobs.rs +++ b/workspace/bankah/src/jobs.rs @@ -2,6 +2,7 @@ use crate::state::BetState; use serde::{Deserialize, Serialize}; #[derive(Debug, Deserialize, Serialize, Clone)] +#[serde(rename_all = "snake_case")] pub struct BetJob { pub bet: BetState, pub player: uuid::Uuid, @@ -10,18 +11,28 @@ pub struct BetJob { } #[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "snake_case")] pub struct RollJob { pub table: uuid::Uuid, pub version: uuid::Uuid, } #[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "snake_case")] pub struct JobWapper { pub job: T, pub id: uuid::Uuid, pub attempts: u8, } +impl JobWapper { + pub fn wrap(job: T) -> Self { + let id = uuid::Uuid::new_v4(); + let attempts = 0u8; + Self { job, attempts, id } + } +} + impl JobWapper { pub fn retry(self) -> Self { let JobWapper { job, id, attempts } = self; @@ -34,15 +45,20 @@ impl JobWapper { } #[derive(Debug, Deserialize, Serialize, Clone)] +#[serde(rename_all = "snake_case")] pub enum TableAdminJob { ReindexPopulations, CleanupPlayerData(String), } #[derive(Debug, Deserialize, Serialize, Clone)] +#[serde(rename_all = "snake_case")] pub enum TableJob { Bet(JobWapper), Roll(JobWapper), + Sit(JobWapper<(String, String)>), + Create(JobWapper), + Stand(JobWapper<(String, String)>), Admin(JobWapper), } @@ -51,10 +67,21 @@ impl TableJob { match self { TableJob::Bet(inner) => inner.id.clone(), TableJob::Roll(inner) => inner.id.clone(), + TableJob::Sit(inner) => inner.id.clone(), + TableJob::Create(inner) => inner.id.clone(), + TableJob::Stand(inner) => inner.id.clone(), TableJob::Admin(inner) => inner.id.clone(), } } + pub fn sit(table: String, player: String) -> Self { + TableJob::Sit(JobWapper::wrap((table, player))) + } + + pub fn stand(table: String, player: String) -> Self { + TableJob::Stand(JobWapper::wrap((table, player))) + } + pub fn admin(job: TableAdminJob) -> Self { let id = uuid::Uuid::new_v4(); TableJob::Admin(JobWapper { job, id, attempts: 0 }) @@ -95,6 +122,7 @@ impl TableJob { } #[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] pub enum BetFailureReason { InsufficientFunds, InvalidComeBet, @@ -104,6 +132,7 @@ pub enum BetFailureReason { } #[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] pub enum TableJobOutput { BetProcessed, BetStale, @@ -111,9 +140,41 @@ pub enum TableJobOutput { RollProcessed, RollStale, AdminOk, + StandOk, + FinalStandOk, + SitOk, + TableCreated(String), +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct JobResult { + completed: Option>, + output: Option, + id: uuid::Uuid, +} + +impl JobResult { + pub fn empty(id: uuid::Uuid) -> Self { + return Self { + id, + completed: None, + output: None, + }; + } + + pub fn wrap(id: uuid::Uuid, inner: T) -> Self { + let completed = Some(chrono::Utc::now()); + Self { + output: Some(inner), + id, + completed, + } + } } #[derive(Debug, Serialize)] +#[serde(rename_all = "snake_case")] pub enum JobError { Retryable, Terminal(String), diff --git a/workspace/bankah/src/lib.rs b/workspace/bankah/src/lib.rs index 17f21ef..1d3370f 100644 --- a/workspace/bankah/src/lib.rs +++ b/workspace/bankah/src/lib.rs @@ -1,2 +1,10 @@ +use serde::Serialize; + pub mod jobs; pub mod state; + +#[derive(Debug, Serialize)] +pub struct JobResponse { + pub job: uuid::Uuid, + pub output: Option, +} diff --git a/workspace/stickbot/Cargo.toml b/workspace/stickbot/Cargo.toml index 2019e40..1337efb 100644 --- a/workspace/stickbot/Cargo.toml +++ b/workspace/stickbot/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "stickbot" -version = "0.3.4" +version = "0.4.0" edition = "2018" [lib] diff --git a/workspace/stickbot/README.md b/workspace/stickbot/README.md index ae4918d..982a7e7 100644 --- a/workspace/stickbot/README.md +++ b/workspace/stickbot/README.md @@ -19,6 +19,11 @@ with these services. Once the environment variables have been prepared, the main web process and background worker can be started using the appropriate `cargo` aliases: +``` +$ cargo stickbot <- web thread +$ cargo boxbot <- background worker +``` + [rust]: https://www.rust-lang.org/ [twowaiyo]: https://github.com/dadleyy/twowaiyo diff --git a/workspace/stickbot/src/bin/boxbot.rs b/workspace/stickbot/src/bin/boxbot.rs index 57e5c4b..89c87e0 100644 --- a/workspace/stickbot/src/bin/boxbot.rs +++ b/workspace/stickbot/src/bin/boxbot.rs @@ -5,62 +5,15 @@ use stickbot; use bankah::jobs::{TableAdminJob, TableJob}; -const POP_CMD: kramer::Command<&'static str, &'static str> = - kramer::Command::List::<_, &str>(kramer::ListCommand::Pop( - kramer::Side::Left, - stickbot::constants::STICKBOT_JOB_QUEUE, - Some((None, 3)), - )); - -fn response_string(response: &kramer::ResponseValue) -> Option { - match response { - kramer::ResponseValue::String(inner) => Some(inner.clone()), - res => { - log::warn!("strange response from job queue - {:?}", res); - None - } - } -} - -fn parse_pop(response: &kramer::ResponseValue) -> Option { - response_string(&response).and_then(|contents| serde_json::from_str::(&contents).ok()) -} - -async fn pop_next(services: &stickbot::Services) -> Result> { - let result = match services.command(&POP_CMD).await { - Err(error) => { - log::warn!("unable to pop from bet queue - {}", error); - return Err(Error::new(ErrorKind::Other, format!("{}", error))); - } - Ok(kramer::Response::Item(kramer::ResponseValue::Empty)) => { - log::trace!("empty response from queue, moving on"); - return Ok(None); - } - Ok(kramer::Response::Array(values)) => values, - Ok(kramer::Response::Error) => { - log::warn!("unable to pop from queue - redis error"); - return Err(Error::new(ErrorKind::Other, "invalid-response")); - } - Ok(kramer::Response::Item(inner)) => { - log::warn!("unknown response from pop - '{:?}'", inner); - return Err(Error::new(ErrorKind::Other, format!("{:?}", inner))); - } - }; - - log::trace!("result from pop - {:?}, attempting to deserialize", result); - - Ok(result.get(1).and_then(parse_pop)) -} - async fn work(services: &stickbot::Services) -> Result<()> { - let job = match pop_next(&services).await? { + let job = match services.pop().await? { Some(job) => job, None => return Ok(()), }; log::debug!("deserialized job from queue - {:?}", job); - let id = job.id(); + let id = job.id().to_string(); let result = match &job { TableJob::Admin(inner) => match &inner.job { TableAdminJob::ReindexPopulations => stickbot::processors::admin::reindex(&services, &inner.job).await, @@ -68,39 +21,45 @@ async fn work(services: &stickbot::Services) -> Result<()> { }, TableJob::Bet(inner) => stickbot::processors::bet(&services, &inner.job).await, TableJob::Roll(inner) => stickbot::processors::roll(&services, &inner.job).await, + TableJob::Sit(inner) => stickbot::processors::sit(&services, &inner.job).await, + TableJob::Create(inner) => stickbot::processors::create(&services, &inner.job).await, + TableJob::Stand(inner) => stickbot::processors::stand(&services, &inner.job).await, }; + let serialized = result + .map(|inner| bankah::jobs::JobResult::wrap(job.id(), inner)) + .and_then(|out| { + serde_json::to_string(&out).map_err(|error| { + log::warn!("unable to serialze job output - {}", error); + bankah::jobs::JobError::Terminal(format!("unable to serialze job output - {}", error)) + }) + }); + // Processors will return a Result, where `E` can either represent a "fatal" error that is non-retryable or // an error that is retryable. If the job is retryable, re-enqueue. - let output = match result { - Ok(output) => serde_json::to_string(&output)?, + let output = match serialized { + Ok(output) => output, Err(bankah::jobs::JobError::Retryable) => { let retry = job.retry().ok_or(Error::new(ErrorKind::Other, "no-retryable"))?; - - log::warn!("job failed, but is retryable, re-adding back to the queue"); - let serialized = serde_json::to_string(&retry)?; - - // TODO: consider refactoring this command construction into a reusable place; it is used here and in our bets - // api route to push bet jobs onto our queue. - let command = kramer::Command::List(kramer::ListCommand::Push( - (kramer::Side::Right, kramer::Insertion::Always), - stickbot::constants::STICKBOT_JOB_RESULTS, - kramer::Arity::One(serialized), - )); - - return services.command(&command).await.map(|_| ()); + services.queue(&retry).await.map_err(|error| { + log::warn!("unable to persist retry into queue - {}", error); + error + })?; + log::debug!("job '{}' scheduled for retry", retry.id()); + return Ok(()); } Err(bankah::jobs::JobError::Terminal(error)) => return Err(Error::new(ErrorKind::Other, error)), }; log::debug!("job '{}' processed - {}", id, output); - let sid = id.to_string(); + // TODO(service-coagulation): consider `services.finalize(&output)`? + let storage = format!("{}", stickbot::env::JobStore::Results); // Insert into our results hash the output from the processor. let sets = kramer::Command::Hashes(kramer::HashCommand::Set( - stickbot::constants::STICKBOT_JOB_RESULTS, - kramer::Arity::One((&sid, output.as_str())), + &storage, + kramer::Arity::One((&id, output.as_str())), kramer::Insertion::Always, )); diff --git a/workspace/stickbot/src/env.rs b/workspace/stickbot/src/env.rs new file mode 100644 index 0000000..b8b6c58 --- /dev/null +++ b/workspace/stickbot/src/env.rs @@ -0,0 +1,47 @@ +use crate::constants; + +/* TODO: this is a way to share the runtime computation of environment specific storage names. The alternative is to + * compute this once during the initialization of `Services`. + * + */ + +#[derive(Debug, Clone)] +pub enum JobStore { + Queue, + Results, +} + +impl std::fmt::Display for JobStore { + fn fmt(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + let v = match &self { + JobStore::Queue => std::env::var("STICKBOT_JOB_QUEUE").unwrap_or(constants::STICKBOT_JOB_QUEUE.to_string()), + JobStore::Results => std::env::var("STICKBOT_JOB_RESULTS").unwrap_or(constants::STICKBOT_JOB_RESULTS.to_string()), + }; + + write!(formatter, "{}", v) + } +} + +#[derive(Debug, Clone)] +pub enum Collection { + TableList, + Tables, + Players, +} + +impl std::fmt::Display for Collection { + fn fmt(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + let v = match &self { + Collection::Players => { + std::env::var("STICKBOT_PLAYER_COLLECTION").unwrap_or(constants::MONGO_DB_PLAYER_COLLECTION_NAME.to_string()) + } + Collection::Tables => { + std::env::var("STICKBOT_TABLE_COLLECTION").unwrap_or(constants::MONGO_DB_TABLE_COLLECTION_NAME.to_string()) + } + Collection::TableList => std::env::var("STICKBOT_TABLE_LIST_COLLECTION") + .unwrap_or(constants::MONGO_DB_TABLE_LIST_COLLECTION_NAME.to_string()), + }; + + write!(formatter, "{}", v) + } +} diff --git a/workspace/stickbot/src/lib.rs b/workspace/stickbot/src/lib.rs index 7f2d19f..7b11a85 100644 --- a/workspace/stickbot/src/lib.rs +++ b/workspace/stickbot/src/lib.rs @@ -5,7 +5,7 @@ mod services; mod web; pub mod constants; +pub mod env; pub mod processors; pub mod routes; - pub use services::Services; diff --git a/workspace/stickbot/src/processors/admin.rs b/workspace/stickbot/src/processors/admin.rs index b1986da..980a40d 100644 --- a/workspace/stickbot/src/processors/admin.rs +++ b/workspace/stickbot/src/processors/admin.rs @@ -8,6 +8,8 @@ use crate::Services; pub async fn reindex(services: &Services, job: &TableAdminJob) -> Result { log::info!("attempting to reindex table populations - {:?}", job); let tables = services.tables(); + let destination = format!("{}", crate::env::Collection::TableList); + let pipeline = vec![ doc! { "$project": { "id": 1, "name": 1, "seats": { "$objectToArray": "$seats" } } }, doc! { "$project": { "id": 1, "name": 1, "population": { @@ -17,7 +19,7 @@ pub async fn reindex(services: &Services, job: &TableAdminJob) -> Result Result { + services + .players() + .find_one(crate::db::doc! { "id": id }, None) + .await + .map_err(|error| { + log::warn!("unable to query player - {}", error); + JobError::Terminal(format!("unable to query players - {}", error)) + })? + .ok_or_else(|| { + log::warn!("unable to find player"); + JobError::Terminal(format!("unable to find player '{}'", id)) + }) +} + +fn sit_player(mut ts: TableState, mut ps: PlayerState) -> std::io::Result<(TableState, PlayerState)> { + let mut player = Player::from(&ps); + + let table = Table::from(&ts).sit(&mut player); + let next = TableState::from(&table); + + ps.balance = player.balance; + + ps.tables = ps.tables.drain(0..).chain(Some(ts.id.to_string())).collect(); + + ts.roller = next.roller; + + ts.seats = next + .seats + .into_iter() + .map(|(uuid, seat)| { + let original = ts.seats.remove(&uuid); + let mut current = original.unwrap_or(seat); + + let nickname = match uuid == ps.id { + true => ps.nickname.clone(), + false => current.nickname.clone(), + }; + + current.nickname = nickname; + + (uuid, current) + }) + .collect(); + + Ok((ts, ps)) +} + +fn stand_player(mut ts: TableState, mut ps: PlayerState) -> std::io::Result<(TableState, PlayerState)> { + let mut player = Player::from(&ps); + let table = Table::from(&ts); + log::trace!("before player stand - {:?} {:?}", table, player); + + let table = table.stand(&mut player); + let next = TableState::from(&table); + + if next.seats.contains_key(&ps.id) == false { + ps.tables = ps.tables.drain(0..).filter(|id| id != &ts.id.to_string()).collect(); + } + + log::trace!("new player state - {:?}", player); + + ps.balance = player.balance; + ts.roller = next.roller; + + ts.seats = next + .seats + .into_iter() + .filter_map(|(uuid, mut state)| { + let original = ts.seats.remove(&uuid); + + original.map(|orig| { + state.nickname = orig.nickname; + (uuid, state) + }) + }) + .collect(); + + Ok((ts, ps)) +} + +pub async fn create(services: &crate::Services, pid: &String) -> Result { + let player = find_player(&services, &pid).await?; + let name = crate::names::generate().map_err(|error| { + log::warn!("unable to generate random name - {}", error); + JobError::Retryable + })?; + let blank = TableState::with_name(name); + log::debug!("creating blank table - {:?}", blank); + let (table, player) = sit_player(blank, player).map_err(|error| { + log::warn!("logic error while sitting player '{}' at new table - {}", pid, error); + JobError::Terminal("".into()) + })?; + + let query = crate::db::doc! { "id": player.id.to_string() }; + let updates = crate::db::doc! { "$set": { "balance": player.balance, "tables": player.tables } }; + let opts = crate::db::FindOneAndUpdateOptions::builder() + .return_document(crate::db::ReturnDocument::After) + .build(); + + services + .players() + .find_one_and_update(query, updates, opts) + .await + .map_err(|error| { + log::warn!("unable to update player balance after create - {}", error); + JobError::Terminal(format!("failed player balance update - {}", error)) + })? + .ok_or_else(|| { + log::warn!("no player to update"); + JobError::Terminal(format!("missing player '{}' during balance update", pid)) + })?; + + log::debug!("inserting new table {:?}", table); + + // TODO(mongo-uuid): we're using a `find_one_and_replace` w/ the upsert call here to circumvent the serialization + // discrepency between insertion and find/replace methods on the mongodb driver collection. + let tops = crate::db::FindOneAndReplaceOptions::builder().upsert(true).build(); + services + .tables() + .find_one_and_replace(crate::db::lookup_for_uuid(&table.id), &table, Some(tops)) + .await + .map_err(|error| { + log::warn!("unable to create new table - {:?}", error); + JobError::Terminal(format!("missing player '{}' during balance update", pid)) + })?; + + // TODO: do we care if our attempt to enqueue a job fails from the web thread? + services + .queue(&bankah::jobs::TableJob::admin(TableAdminJob::ReindexPopulations)) + .await + .map(|_| ()) + .unwrap_or_else(|error| { + log::warn!("unable to queue indexing job - {}", error); + () + }); + + Ok(TableJobOutput::TableCreated(table.id.to_string())) +} + +pub async fn sit(services: &crate::Services, entities: &(String, String)) -> Result { + let (tid, pid) = entities; + log::debug!("player '{}' is sitting down to table '{}'", pid, tid); + + let tables = services.tables(); + let players = services.players(); + + let player = players + .find_one(crate::db::doc! { "id": pid }, None) + .await + .map_err(|error| { + log::warn!("unable to query player - {}", error); + JobError::Terminal(format!("unable to query players - {}", error)) + })? + .ok_or_else(|| { + log::warn!("unable to find player"); + JobError::Terminal(format!("unable to find player '{}'", pid)) + })?; + + let state = tables + .find_one(crate::db::doc! { "id": tid }, None) + .await + .map_err(|error| { + log::warn!("unable to find table - {}", error); + JobError::Terminal(format!("unable to query tables - {}", error)) + })? + .ok_or_else(|| { + log::warn!("unable to find table '{}'", tid); + JobError::Terminal(format!("unable to find table '{}'", tid)) + })?; + + let (ts, ps) = sit_player(state, player).map_err(|error| { + log::warn!("logic error sitting player - {}", error); + JobError::Terminal(format!("unable to sit player '{}'", error)) + })?; + + let opts = crate::db::FindOneAndUpdateOptions::builder() + .return_document(crate::db::ReturnDocument::After) + .build(); + + // TODO(player-id): another example of player id peristence mismatch. + players + .find_one_and_update( + crate::db::doc! { "id": pid }, + crate::db::doc! { "$set": { "balance": ps.balance, "tables": ps.tables } }, + opts, + ) + .await + .map_err(|error| { + log::warn!("unable to update player balance after join - {}", error); + JobError::Terminal(format!("unable to update player '{}'", error)) + })? + .ok_or_else(|| { + log::warn!("no player balance updated"); + JobError::Terminal(format!("player '{}' not found, no update applied", pid)) + })?; + + tables + .find_one_and_replace(crate::db::doc! { "id": tid }, &ts, None) + .await + .map_err(|error| { + log::warn!("unable to create new table - {:?}", error); + JobError::Terminal(format!("table '{}' not updated - {}", tid, error)) + })?; + + // TODO: do we care if our attempt to enqueue a job fails from the web thread? + let job = bankah::jobs::TableJob::admin(TableAdminJob::ReindexPopulations); + services.queue(&job).await.map(|_| ()).unwrap_or_else(|error| { + log::warn!("unable to queue indexing job - {}", error); + () + }); + + log::info!("player joined table '{}'", ts.id); + + return Ok(TableJobOutput::SitOk); +} + +pub async fn stand(services: &crate::Services, entities: &(String, String)) -> Result { + let (tid, pid) = entities; + log::debug!("player '{}' is leaving table '{}'", pid, tid); + + let tables = services.tables(); + let players = services.players(); + + let player = players + .find_one(crate::db::doc! { "id": pid }, None) + .await + .map_err(|error| { + log::warn!("unable to query player - {}", error); + JobError::Terminal(format!("unable to query tables - {}", error)) + })? + .ok_or_else(|| { + log::warn!("unable to find player"); + JobError::Terminal(format!("unable to find player '{}'", pid)) + })?; + + let table = tables + .find_one(crate::db::doc! { "id": tid }, None) + .await + .map_err(|error| { + log::warn!("unable to find table - {}", error); + JobError::Terminal(format!("unable to query tables - {}", error)) + })? + .ok_or_else(|| { + log::warn!("unable to find table"); + JobError::Terminal(format!("unable to find table '{}'", tid)) + })?; + + let (table, player) = stand_player(table, player).map_err(|error| { + log::warn!("unable to stand player - '{}'", error); + JobError::Terminal(format!("logic error while standing player - '{}'", pid)) + })?; + + players + .update_one( + crate::db::doc! { "id": player.id.to_string() }, + crate::db::doc! { "$set": { "balance": player.balance, "tables": player.tables } }, + None, + ) + .await + .map_err(|error| { + log::warn!("unable to persist new player balance - {}", error); + JobError::Retryable + })?; + + if table.seats.len() == 0 { + log::debug!("table '{}' is now empty, deleting", table.id); + + tables + .delete_one(crate::db::doc! { "id": tid }, None) + .await + .map_err(|error| { + log::warn!("unable to persist table updates - {}", error); + JobError::Retryable + })?; + + services + .table_index() + .delete_one(crate::db::doc! { "id": tid }, None) + .await + .map_err(|error| { + log::warn!("unable to persist table updates - {}", error); + JobError::Retryable + })?; + + log::debug!("table '{}' cleanup complete", tid); + + return Ok(TableJobOutput::FinalStandOk); + } else { + tables + .replace_one(crate::db::doc! { "id": tid }, &table, None) + .await + .map_err(|error| { + log::warn!("unable to persist table updates - {}", error); + JobError::Retryable + })?; + } + + log::debug!("table save, applying new player state for '{}'", player.id); + + log::debug!("player '{}' updated, reindexing populations", player.id); + + let job = bankah::jobs::TableJob::admin(TableAdminJob::ReindexPopulations); + services.queue(&job).await.map_err(|error| { + log::warn!("unable to queue reindexing job - {}", error); + JobError::Terminal(format!("unable to queue reindex - {}", error)) + })?; + + return Ok(TableJobOutput::StandOk); +} + +#[cfg(test)] +mod test { + use super::stand_player; + use bankah::state::{PlayerState, TableState}; + use twowaiyo::{Bet, Player, Table}; + + #[test] + fn test_stand_with_remaining() { + let table = Table::with_dice(vec![2, 2].into_iter()); + let mut player = Player::with_balance(200); + let table = table + .sit(&mut player) + .bet(&player, &Bet::start_pass(100)) + .unwrap() + .roll() + .table; + + let ps = PlayerState::from(&player); + assert_eq!(ps.balance, 0); + let (ts, ps) = stand_player(TableState::from(&table), ps).unwrap(); + assert_eq!(ps.balance, 100, "some balance was returned"); + assert_eq!( + ts.seats.get(&ps.id).map(|s| s.balance), + Some(0), + "the balance was zeroed" + ); + assert_eq!( + ts.seats.get(&ps.id).map(|s| s.bets.len()), + Some(1), + "there is one bet left" + ); + } +} diff --git a/workspace/stickbot/src/routes/jobs.rs b/workspace/stickbot/src/routes/jobs.rs index 9146a99..40e2683 100644 --- a/workspace/stickbot/src/routes/jobs.rs +++ b/workspace/stickbot/src/routes/jobs.rs @@ -1,6 +1,7 @@ -use serde::{Deserialize, Serialize}; +use serde::Deserialize; + +use bankah::jobs::{JobResult, TableJobOutput}; -use crate::constants; use crate::web::{cookie as get_cookie, Body, Error, Request, Response, Result}; #[derive(Debug, Deserialize)] @@ -8,12 +9,6 @@ pub struct JobLookupQuery { id: String, } -#[derive(Debug, Serialize)] -pub struct JobLookupResponse { - id: String, - output: Option, -} - pub async fn find(request: Request) -> Result { let cookie = get_cookie(&request).ok_or(Error::from_str(404, "no-cook"))?; let query = request.query::()?; @@ -25,9 +20,10 @@ pub async fn find(request: Request) -> Result { .ok_or(Error::from_str(404, ""))?; log::debug!("player '{}' checking on job '{}'", player.id, query.id); + let storage = format!("{}", crate::env::JobStore::Results); let command = kramer::Command::Hashes(kramer::HashCommand::Get::<_, &str>( - constants::STICKBOT_JOB_RESULTS, + storage.as_str(), Some(kramer::Arity::One(query.id.as_str())), )); @@ -40,12 +36,14 @@ pub async fn find(request: Request) -> Result { kramer::Response::Item(kramer::ResponseValue::String(inner)) => inner.clone(), kramer::Response::Item(kramer::ResponseValue::Empty) => { log::debug!("nothing in job result store for '{}' yet", query.id); - - return Body::from_json(&JobLookupResponse { - id: query.id.clone(), - output: None, - }) - .map(|bod| Response::builder(200).body(bod).build()); + return uuid::Uuid::parse_str(&query.id) + .map_err(|error| { + log::warn!("unable to parse input as uuid - '{}'", error); + Error::from_str(422, "invalid-id") + }) + .map(|uuid| JobResult::empty(uuid) as JobResult) + .and_then(|res| Body::from_json(&res)) + .map(|bod| Response::builder(200).body(bod).build()); } other => { log::warn!("strange response from job lookup - {:?}", other); @@ -55,18 +53,15 @@ pub async fn find(request: Request) -> Result { log::debug!("response from result lookup - {:?}", payload); - let parsed = serde_json::from_str::(&payload).map_err(|error| { + let parsed = serde_json::from_str::>(&payload).map_err(|error| { log::warn!("unable to parse job output - {}", error); Error::from_str(500, "bad-parse") })?; - Body::from_json(&JobLookupResponse { - id: query.id.clone(), - output: Some(parsed), - }) - .map(|bod| Response::builder(200).body(bod).build()) - .map_err(|error| { - log::warn!("unable to serialize job lookup - {}", error); - error - }) + Body::from_json(&parsed) + .map(|bod| Response::builder(200).body(bod).build()) + .map_err(|error| { + log::warn!("unable to serialize job lookup - {}", error); + error + }) } diff --git a/workspace/stickbot/src/routes/tables.rs b/workspace/stickbot/src/routes/tables.rs index ff37f91..a386bb0 100644 --- a/workspace/stickbot/src/routes/tables.rs +++ b/workspace/stickbot/src/routes/tables.rs @@ -1,87 +1,34 @@ use async_std::stream::StreamExt; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; -use crate::db::{doc, FindOneAndReplaceOptions, FindOneAndUpdateOptions, ReturnDocument}; +use crate::constants; +use crate::db::doc; use crate::web::{cookie as get_cookie, Body, Error, Request, Response, Result}; -use crate::{constants, names}; -use bankah::state::{PlayerState, TableState}; -use twowaiyo::{Player, Table}; +use bankah::jobs::TableJob; +use bankah::state::PlayerState; -fn can_join(ps: &PlayerState) -> bool { +#[derive(Debug, Serialize)] +enum JoinFailure { + TooManyActiveTables, + InsufficientFunds, +} + +fn join_failure(ps: &PlayerState) -> Option { let max = std::env::var(constants::STICKBOT_MAX_ACTIVE_TABLES_PER_PLAYER_ENV) .ok() .and_then(|v| v.parse::().ok()) .unwrap_or(constants::STICKBOT_DEFAULT_MAX_ACTIVE_TABLES_PER_PLAYER); - return ps.tables.len() < max; -} - -// During conversion between our `twowaiyo` engine types, sever fields will be lost that need to be updated with the -// correct state. -fn sit_player(mut ts: TableState, mut ps: PlayerState) -> Result<(TableState, PlayerState)> { - let mut player = Player::from(&ps); - - let table = Table::from(&ts).sit(&mut player); - let next = TableState::from(&table); - - ps.balance = player.balance; - - ps.tables = ps.tables.drain(0..).chain(Some(ts.id.to_string())).collect(); - - ts.roller = next.roller; - - ts.seats = next - .seats - .into_iter() - .map(|(uuid, seat)| { - let original = ts.seats.remove(&uuid); - let mut current = original.unwrap_or(seat); - - let nickname = match uuid == ps.id { - true => ps.nickname.clone(), - false => current.nickname.clone(), - }; - - current.nickname = nickname; - - (uuid, current) - }) - .collect(); - - Ok((ts, ps)) -} - -fn stand_player(mut ts: TableState, mut ps: PlayerState) -> Result<(TableState, PlayerState)> { - let mut player = Player::from(&ps); - let table = Table::from(&ts); - log::trace!("before player stand - {:?} {:?}", table, player); - - let table = table.stand(&mut player); - let next = TableState::from(&table); - - if next.seats.contains_key(&ps.id) == false { - ps.tables = ps.tables.drain(0..).filter(|id| id != &ts.id.to_string()).collect(); + if ps.tables.len() >= max { + return Some(JoinFailure::TooManyActiveTables); } - log::trace!("new player state - {:?}", player); - - ps.balance = player.balance; - - ts.seats = next - .seats - .into_iter() - .filter_map(|(uuid, mut state)| { - let original = ts.seats.remove(&uuid); - - original.map(|orig| { - state.nickname = orig.nickname; - (uuid, state) - }) - }) - .collect(); + if (ps.balance > 0) != true { + return Some(JoinFailure::InsufficientFunds); + } - Ok((ts, ps)) + return None; } // ## Route @@ -170,72 +117,18 @@ pub async fn join(mut request: Request) -> Result { .and_then(|auth| auth.player()) .ok_or(Error::from_str(404, "no-player"))?; - if can_join(&player) != true { - let body = Body::from_string("too-many-active-seats".into()); + if let Some(reason) = join_failure(&player) { + let body = Body::from_string(format!("{:?}", reason)); return Ok(Response::builder(422).body(body).build()); } - if player.balance == 0 { - let body = Body::from_string("no-balance".into()); - return Ok(Response::builder(422).body(body).build()); - } - - let tables = request.state().tables(); - let players = request.state().players(); - let lookup = crate::db::lookup_for_uuid(&query.id); - - let state = tables - .find_one(lookup, None) - .await - .map_err(|error| { - log::warn!("unable to find table - {}", error); - Error::from_str(500, "lookup") - })? - .ok_or_else(|| { - log::warn!("unable to find table {:?}", crate::db::lookup_for_uuid(&query.id)); - Error::from_str(404, "no-table") - })?; - - let (ts, ps) = sit_player(state, player)?; - - let opts = FindOneAndUpdateOptions::builder() - .return_document(ReturnDocument::After) - .build(); - - // TODO(player-id): another example of player id peristence mismatch. - players - .find_one_and_update( - doc! { "id": ps.id.to_string() }, - doc! { "$set": { "balance": ps.balance, "tables": ps.tables } }, - opts, - ) - .await - .map_err(|error| { - log::warn!("unable to update player balance after join - {}", error); - Error::from_str(400, "balance-after-join-update") - })? - .ok_or_else(|| { - log::warn!("no player balance updated"); - Error::from_str(400, "balance-after-join-update") - })?; - - tables - .find_one_and_replace(crate::db::lookup_for_uuid(&query.id), &ts, None) - .await - .map_err(|error| { - log::warn!("unable to create new table - {:?}", error); - Error::from_str(422, "failed") - })?; - - // TODO: do we care if our attempt to enqueue a job fails from the web thread? - let job = bankah::jobs::TableJob::reindex(); - request.state().queue(&job).await.map(|_| ()).unwrap_or_else(|error| { - log::warn!("unable to queue indexing job - {}", error); - () - }); - - log::info!("player joined table '{}'", ts.id); - Body::from_json(&ts).map(|body| Response::builder(200).body(body).build()) + let job = TableJob::sit(query.id.to_string(), player.id.to_string()); + let id = request.state().queue(&job).await.map_err(|error| { + log::warn!("unable to queue sit job - {}", error); + error + })?; + let res = bankah::JobResponse { job: id, output: None }; + Body::from_json(&res).map(|body| Response::builder(200).body(body).build()) } // ## Route @@ -249,67 +142,18 @@ pub async fn create(request: Request) -> Result { .and_then(|auth| auth.player()) .ok_or(Error::from_str(404, "no-player"))?; - if can_join(&player) != true { - let body = Body::from_string("too-many-active-seats".into()); + if let Some(reason) = join_failure(&player) { + let body = Body::from_string(format!("{:?}", reason)); return Ok(Response::builder(422).body(body).build()); } - let tables = request.state().tables(); - let players = request.state().players(); - - let name = names::generate().map_err(|error| { - log::warn!("unable to generate random name - {}", error); - Error::from_str(500, "name-generation") + let job = TableJob::Create(bankah::jobs::JobWapper::wrap(player.id.to_string())); + let id = request.state().queue(&job).await.map_err(|error| { + log::warn!("unable to queue table creation job - '{}'", error); + error })?; - let (ts, ps) = sit_player(TableState::with_name(name), player)?; - - // TODO(player-id): when players are initially inserted into the players collection, their `uuid::Uuid` `id` field - // is being serialized and persisted as a string. Without the explicit `to_string` here, the query attempts to - // search for a binary match of the `uuid:Uuid`. - let query = doc! { "id": ps.id.to_string() }; - let updates = doc! { "$set": { "balance": ps.balance, "tables": ps.tables } }; - let opts = FindOneAndUpdateOptions::builder() - .return_document(ReturnDocument::After) - .build(); - - log::info!("creating table for user {:?} ({:?})", ps.id, query); - - players - .find_one_and_update(query, updates, opts) - .await - .map_err(|error| { - log::warn!("unable to update player balance after create - {}", error); - Error::from_str(500, "failed balance update on table creation") - })? - .ok_or_else(|| { - log::warn!("no player to update"); - Error::from_str(500, "failed balance update on table creation") - })?; - - log::debug!("inserting new table {:?}", ts); - - // TODO(mongo-uuid): we're using a `find_one_and_replace` w/ the upsert call here to circumvent the serialization - // discrepency between insertion and find/replace methods on the mongodb driver collection. - let tops = FindOneAndReplaceOptions::builder().upsert(true).build(); - let result = tables - .find_one_and_replace(crate::db::lookup_for_uuid(&ts.id), &ts, Some(tops)) - .await - .map_err(|error| { - log::warn!("unable to create new table - {:?}", error); - Error::from_str(422, "failed") - }); - - // TODO: do we care if our attempt to enqueue a job fails from the web thread? - let job = bankah::jobs::TableJob::reindex(); - request.state().queue(&job).await.map(|_| ()).unwrap_or_else(|error| { - log::warn!("unable to queue indexing job - {}", error); - () - }); - - result.and_then(|_r| { - log::info!("new table created - '{}'", ts.id); - Body::from_json(&ts).map(|body| Response::builder(200).body(body).build()) - }) + let res = bankah::JobResponse { job: id, output: None }; + Body::from_json(&res).map(|body| Response::builder(200).body(body).build()) } // ## Route @@ -328,97 +172,16 @@ pub async fn leave(mut request: Request) -> Result { .player() .ok_or(Error::from_str(404, "no-player"))?; - // TODO(table-id): Unlike player ids, it looks like the peristed, serialized bson data for tables uses a binary - // representation for the `uuid:Uuid` type. - let search = crate::db::lookup_for_uuid(&query.id); - log::debug!("user '{}' leaving table '{}' ({:?})", player.id, query.id, search); - - let tables = request.state().tables(); - let players = request.state().players(); + log::debug!("user '{}' leaving table '{}'", player.id, query.id); + let job = TableJob::stand(query.id.to_string(), player.id.to_string()); - let state = tables - .find_one(search.clone(), None) - .await - .map_err(|error| { - log::warn!("unable to find table - {}", error); - error - })? - .ok_or_else(|| { - log::warn!("unable to find table"); - Error::from_str(404, "table-missing") - })?; - - let (ts, ps) = stand_player(state, player)?; - - log::debug!("table save, applying player balance - {:?}", ps); - - tables.replace_one(search, &ts, None).await.map_err(|error| { - log::warn!("unable to persist table updates - {}", error); + let id = request.state().queue(&job).await.map_err(|error| { + log::warn!("unable to queue stand job - {}", error); error })?; - players - .update_one( - doc! { "id": ps.id.to_string() }, - doc! { "$set": { "balance": ps.balance, "tables": ps.tables } }, - None, - ) - .await - .map_err(|error| { - log::warn!("unable to persist new player balance - {}", error); - error - })?; + log::debug!("job '{}' queued", id); - log::trace!("player '{}' finished leaving", ps.id); - - // TODO: do we care if our attempt to enqueue a job fails from the web thread? - let job = bankah::jobs::TableJob::reindex(); - request - .state() - .queue(&job) - .await - .map(|_| { - log::debug!("successfully queued indexing job"); - () - }) - .unwrap_or_else(|error| { - log::warn!("unable to queue indexing job - {}", error); - () - }); - - Body::from_json(&ts).map(|body| Response::builder(200).body(body).build()) -} - -#[cfg(test)] -mod test { - use super::stand_player; - use bankah::state::{PlayerState, TableState}; - use twowaiyo::{Bet, Player, Table}; - - #[test] - fn test_stand_with_remaining() { - let table = Table::with_dice(vec![2, 2].into_iter()); - let mut player = Player::with_balance(200); - let table = table - .sit(&mut player) - .bet(&player, &Bet::start_pass(100)) - .unwrap() - .roll() - .table; - - let ps = PlayerState::from(&player); - assert_eq!(ps.balance, 0); - let (ts, ps) = stand_player(TableState::from(&table), ps).unwrap(); - assert_eq!(ps.balance, 100, "some balance was returned"); - assert_eq!( - ts.seats.get(&ps.id).map(|s| s.balance), - Some(0), - "the balance was zeroed" - ); - assert_eq!( - ts.seats.get(&ps.id).map(|s| s.bets.len()), - Some(1), - "there is one bet left" - ); - } + let res = bankah::JobResponse { job: id, output: None }; + Body::from_json(&res).map(|body| Response::builder(200).body(body).build()) } diff --git a/workspace/stickbot/src/services.rs b/workspace/stickbot/src/services.rs index f81b5d3..6bc2576 100644 --- a/workspace/stickbot/src/services.rs +++ b/workspace/stickbot/src/services.rs @@ -28,6 +28,20 @@ async fn connect_redis() -> Result { Ok(redis) } +fn response_string(response: &kramer::ResponseValue) -> Option { + match response { + kramer::ResponseValue::String(inner) => Some(inner.clone()), + res => { + log::warn!("strange response from job queue - {:?}", res); + None + } + } +} + +fn parse_pop(response: &kramer::ResponseValue) -> Option { + response_string(&response).and_then(|contents| serde_json::from_str::(&contents).ok()) +} + #[derive(Clone)] pub struct Services { db: db::Client, @@ -44,15 +58,47 @@ impl Services { } pub fn table_index(&self) -> db::Collection { - self.collection(constants::MONGO_DB_TABLE_LIST_COLLECTION_NAME) + self.collection(&format!("{}", crate::env::Collection::TableList)) } pub fn tables(&self) -> db::Collection { - self.collection(constants::MONGO_DB_TABLE_COLLECTION_NAME) + self.collection(&format!("{}", crate::env::Collection::Tables)) } pub fn players(&self) -> db::Collection { - self.collection(constants::MONGO_DB_PLAYER_COLLECTION_NAME) + self.collection(&format!("{}", crate::env::Collection::Players)) + } + + pub async fn pop(&self) -> Result> { + let cmd = kramer::Command::List::<_, String>(kramer::ListCommand::Pop( + kramer::Side::Left, + crate::env::JobStore::Queue, + Some((None, 3)), + )); + + let result = match self.command(&cmd).await { + Err(error) => { + log::warn!("unable to pop from bet queue - {}", error); + return Err(Error::new(ErrorKind::Other, format!("{}", error))); + } + Ok(kramer::Response::Item(kramer::ResponseValue::Empty)) => { + log::debug!("empty response from queue, moving on"); + return Ok(None); + } + Ok(kramer::Response::Array(values)) => values, + Ok(kramer::Response::Error) => { + log::warn!("unable to pop from queue - redis error"); + return Err(Error::new(ErrorKind::Other, "invalid-response")); + } + Ok(kramer::Response::Item(inner)) => { + log::warn!("unknown response from pop - '{:?}'", inner); + return Err(Error::new(ErrorKind::Other, format!("{:?}", inner))); + } + }; + + log::debug!("result from pop - {:?}, attempting to deserialize", result); + + Ok(result.get(1).and_then(parse_pop)) } pub async fn queue(&self, job: &bankah::jobs::TableJob) -> Result { @@ -63,11 +109,14 @@ impl Services { let command = kramer::Command::List(kramer::ListCommand::Push( (kramer::Side::Right, kramer::Insertion::Always), - constants::STICKBOT_JOB_QUEUE, + crate::env::JobStore::Queue, kramer::Arity::One(serialized), )); - self.command(&command).await.map(|_| job.id()) + self.command(&command).await.map(|result| { + log::debug!("executed queue command - {:?}", result); + job.id() + }) } pub async fn authority(&self, token: T) -> Option diff --git a/workspace/twowaiyo/Cargo.toml b/workspace/twowaiyo/Cargo.toml index 54ede17..dba7e24 100644 --- a/workspace/twowaiyo/Cargo.toml +++ b/workspace/twowaiyo/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "twowaiyo" -version = "0.3.4" +version = "0.4.0" edition = "2018" [lib]