Skip to content

Commit

Permalink
Merge pull request #1 from dadleyy/table-admin
Browse files Browse the repository at this point in the history
table admin background
  • Loading branch information
dadleyy authored Dec 10, 2021
2 parents 9afc947 + 691c847 commit 6e33ca0
Show file tree
Hide file tree
Showing 13 changed files with 121 additions and 54 deletions.
26 changes: 13 additions & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion workspace/bankah/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "bankah"
version = "0.1.0"
version = "0.2.0"
edition = "2018"

[dependencies]
Expand Down
17 changes: 17 additions & 0 deletions workspace/bankah/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,24 @@ pub struct JobWapper<T> {
pub attempts: u8,
}

#[derive(Debug, Deserialize, Serialize, Clone)]
pub enum TableAdminJob {
ReindexPopulations,
}

#[derive(Debug, Deserialize, Serialize, Clone)]
pub enum TableJob {
Bet(JobWapper<BetJob>),
Roll(JobWapper<RollJob>),
Admin(JobWapper<TableAdminJob>),
}

impl TableJob {
pub fn id(&self) -> uuid::Uuid {
match self {
TableJob::Bet(inner) => inner.id.clone(),
TableJob::Roll(inner) => inner.id.clone(),
TableJob::Admin(inner) => inner.id.clone(),
}
}

Expand All @@ -46,6 +53,15 @@ impl TableJob {
}
}

pub fn reindex() -> Self {
let id = uuid::Uuid::new_v4();
TableJob::Admin(JobWapper {
job: TableAdminJob::ReindexPopulations,
id,
attempts: 0,
})
}

pub fn roll(table: uuid::Uuid, version: uuid::Uuid) -> Self {
let id = uuid::Uuid::new_v4();
let job = RollJob { table, version };
Expand Down Expand Up @@ -80,6 +96,7 @@ pub enum TableJobOutput {
BetFailed(BetFailureReason),
RollProcessed,
RollStale,
AdminOk,
}

#[derive(Debug, Serialize)]
Expand Down
7 changes: 7 additions & 0 deletions workspace/bankah/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ impl Default for SeatState {
}
}

#[derive(Debug, Deserialize, Serialize)]
pub struct TableIndexState {
pub id: uuid::Uuid,
pub name: String,
// pub population: u32,
}

#[derive(Debug, Deserialize, Serialize)]
pub struct TableState {
pub id: uuid::Uuid,
Expand Down
2 changes: 1 addition & 1 deletion workspace/stickbot/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "stickbot"
version = "0.1.0"
version = "0.2.0"
edition = "2018"

[lib]
Expand Down
17 changes: 8 additions & 9 deletions workspace/stickbot/src/bin/boxbot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use bankah::jobs::TableJob;
const POP_CMD: kramer::Command<&'static str, &'static str> =
kramer::Command::List::<_, &str>(kramer::ListCommand::Pop(
kramer::Side::Left,
stickbot::constants::STICKBOT_BETS_QUEUE,
stickbot::constants::STICKBOT_JOB_QUEUE,
Some((None, 3)),
));

Expand Down Expand Up @@ -60,12 +60,11 @@ async fn work(services: &stickbot::Services) -> Result<()> {

log::debug!("deserialized job from queue - {:?}", job);

let (id, result) = match &job {
bankah::jobs::TableJob::Bet(inner) => (inner.id.clone(), stickbot::processors::bet(&services, &inner.job).await),
bankah::jobs::TableJob::Roll(inner) => (
inner.id.clone(),
stickbot::processors::roll(&services, &inner.job).await,
),
let id = job.id();
let result = match &job {
TableJob::Admin(inner) => stickbot::processors::admin::reindex(&services, &inner.job).await,
TableJob::Bet(inner) => stickbot::processors::bet(&services, &inner.job).await,
TableJob::Roll(inner) => stickbot::processors::roll(&services, &inner.job).await,
};

// Processors will return a Result<E, T>, where `E` can either represent a "fatal" error that is non-retryable or
Expand All @@ -82,7 +81,7 @@ async fn work(services: &stickbot::Services) -> Result<()> {
// api route to push bet jobs onto our queue.
let command = kramer::Command::List(kramer::ListCommand::Push(
(kramer::Side::Right, kramer::Insertion::Always),
stickbot::constants::STICKBOT_BETS_QUEUE,
stickbot::constants::STICKBOT_JOB_RESULTS,
kramer::Arity::One(serialized),
));

Expand All @@ -97,7 +96,7 @@ async fn work(services: &stickbot::Services) -> Result<()> {

// Insert into our results hash the output from the processor.
let sets = kramer::Command::Hashes(kramer::HashCommand::Set(
stickbot::constants::STICKBOT_BET_RESULTS,
stickbot::constants::STICKBOT_JOB_RESULTS,
kramer::Arity::One((&sid, output.as_str())),
kramer::Insertion::Always,
));
Expand Down
4 changes: 2 additions & 2 deletions workspace/stickbot/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ pub const REDIS_HOSTNAME_ENV: &'static str = "STICKBOT_REDIS_HOSTNAME";
pub const REDIS_PORT_ENV: &'static str = "STICKBOT_REDIS_PORT";
pub const REDIS_PASSWORD_ENV: &'static str = "STICKBOT_REDIS_PASSWORD";

pub const STICKBOT_BETS_QUEUE: &'static str = "stickbot:bets";
pub const STICKBOT_BET_RESULTS: &'static str = "stickbot:bet_results";
pub const STICKBOT_JOB_QUEUE: &'static str = "stickbot:jobs";
pub const STICKBOT_JOB_RESULTS: &'static str = "stickbot:job_results";

pub const EMPTY_RESPONSE: &'static str = "";

Expand Down
25 changes: 25 additions & 0 deletions workspace/stickbot/src/processors/admin.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use bankah::jobs::{JobError, TableAdminJob, TableJobOutput};

use crate::db::doc;
use crate::Services;

pub async fn reindex<'a>(services: &Services, job: &TableAdminJob) -> Result<TableJobOutput, JobError> {
log::info!("attempting to reindex table populations - {:?}", job);
let tables = services.tables();
let pipeline = vec![
doc! { "$project": { "id": 1, "name": 1, "seats": { "$objectToArray": "$seats" } } },
doc! { "$project": { "id": 1, "name": 1, "population": {
"$map": {
"input": "$seats",
"as": "seat",
"in": ["$$seat.k", "$$seat.v.nickname"],
},
} } },
doc! { "$merge": { "into": crate::constants::MONGO_DB_TABLE_LIST_COLLECTION_NAME } },
];
tables.aggregate(pipeline, None).await.map_err(|error| {
log::warn!("unable to perform aggregate - {}", error);
JobError::Retryable
})?;
Ok(TableJobOutput::AdminOk)
}
1 change: 1 addition & 0 deletions workspace/stickbot/src/processors/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod bets;
mod rolls;

pub mod admin;
pub use bets::bet;
pub use rolls::roll;
2 changes: 1 addition & 1 deletion workspace/stickbot/src/routes/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub async fn find(request: Request) -> Result {
log::debug!("player '{}' checking on job '{}'", player.id, query.id);

let command = kramer::Command::Hashes(kramer::HashCommand::Get::<_, &str>(
constants::STICKBOT_BET_RESULTS,
constants::STICKBOT_JOB_RESULTS,
Some(kramer::Arity::One(query.id.as_str())),
));

Expand Down
64 changes: 39 additions & 25 deletions workspace/stickbot/src/routes/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,23 +80,26 @@ pub async fn list(request: Request) -> Result {
.ok_or(Error::from_str(404, ""))?;

log::trace!("listing tables for '{:?}'", player);
let collection = request.state().tables();
let collection = request.state().table_index();

let mut tables = collection.find(None, None).await.map_err(|error| {
log::warn!("unable to query tables - {}", error);
Error::from_str(500, "load-tables")
})?;

let mut page: Vec<TableState> = Vec::with_capacity(10);
let mut page = Vec::with_capacity(10);

while let Some(doc) = tables.next().await {
if let Ok(state) = doc {
page.push(state)
}

if page.len() >= 10 {
break;
}
}

let body = Body::from_json(&page)?;
Ok(Response::builder(200).body(body).build())
Body::from_json(&page).map(|body| Response::builder(200).body(body).build())
}

#[derive(Debug, Deserialize)]
Expand Down Expand Up @@ -201,11 +204,17 @@ pub async fn join(mut request: Request) -> Result {
.map_err(|error| {
log::warn!("unable to create new table - {:?}", error);
Error::from_str(422, "failed")
})
.and_then(|_r| {
log::info!("player joined table '{}'", ts.id);
Body::from_json(&ts).map(|body| Response::builder(200).body(body).build())
})
})?;

// TODO: do we care if our attempt to enqueue a job fails from the web thread?
let job = bankah::jobs::TableJob::reindex();
request.state().queue(&job).await.map(|_| ()).unwrap_or_else(|error| {
log::warn!("unable to queue indexing job - {}", error);
()
});

log::info!("player joined table '{}'", ts.id);
Body::from_json(&ts).map(|body| Response::builder(200).body(body).build())
}

// ## Route
Expand Down Expand Up @@ -264,22 +273,12 @@ pub async fn create(request: Request) -> Result {
Error::from_str(422, "failed")
});

let pipeline = vec![
doc! { "$project": { "id": 1, "name": 1, "seats": { "$objectToArray": "$seats" } } },
doc! { "$project": { "id": 1, "population": {
"$reduce": {
"input": "$seats",
"initialValue": 0,
"in": {
"$add": ["$$value", 1]
}
},
} } },
doc! { "$merge": { "into": crate::constants::MONGO_DB_TABLE_LIST_COLLECTION_NAME } },
];
let agg = tables.aggregate(pipeline, None).await.map(|_| String::from("ok"));

log::warn!("aggregate result - {:?}", agg);
// TODO: do we care if our attempt to enqueue a job fails from the web thread?
let job = bankah::jobs::TableJob::reindex();
request.state().queue(&job).await.map(|_| ()).unwrap_or_else(|error| {
log::warn!("unable to queue indexing job - {}", error);
()
});

result.and_then(|_r| {
log::info!("new table created - '{}'", ts.id);
Expand Down Expand Up @@ -343,6 +342,21 @@ pub async fn leave(mut request: Request) -> Result {

log::trace!("player '{}' finished leaving", ps.id);

// TODO: do we care if our attempt to enqueue a job fails from the web thread?
let job = bankah::jobs::TableJob::reindex();
request
.state()
.queue(&job)
.await
.map(|_| {
log::debug!("successfully queued indexing job");
()
})
.unwrap_or_else(|error| {
log::warn!("unable to queue indexing job - {}", error);
()
});

Body::from_json(&ts).map(|body| Response::builder(200).body(body).build())
}

Expand Down
6 changes: 5 additions & 1 deletion workspace/stickbot/src/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ impl Services {
db.collection::<T>(name.as_ref())
}

pub fn table_index(&self) -> db::Collection<bankah::state::TableIndexState> {
self.collection(constants::MONGO_DB_TABLE_LIST_COLLECTION_NAME)
}

pub fn tables(&self) -> db::Collection<bankah::state::TableState> {
self.collection(constants::MONGO_DB_TABLE_COLLECTION_NAME)
}
Expand All @@ -59,7 +63,7 @@ impl Services {

let command = kramer::Command::List(kramer::ListCommand::Push(
(kramer::Side::Right, kramer::Insertion::Always),
constants::STICKBOT_BETS_QUEUE,
constants::STICKBOT_JOB_QUEUE,
kramer::Arity::One(serialized),
));

Expand Down
2 changes: 1 addition & 1 deletion workspace/twowaiyo/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "twowaiyo"
version = "0.1.0"
version = "0.2.0"
edition = "2018"

[lib]
Expand Down

0 comments on commit 6e33ca0

Please sign in to comment.