diff --git a/Cargo.lock b/Cargo.lock index 9ec10da9..6d7e1dc0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -91,26 +91,25 @@ dependencies = [ [[package]] name = "async-nats" -version = "0.29.0" +version = "0.30.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1174495e436c928905018f10a36160f7a8a6786450f50f4ce7fba05d1539704c" +checksum = "94e3e851ddf3b62be8a8085e1e453968df9cdbf990a37bbb589b5b4f587c68d7" dependencies = [ - "async-nats-tokio-rustls-deps", - "base64 0.13.1", - "base64-url", + "base64 0.21.2", "bytes", "futures", "http", "itoa 1.0.8", "memchr", "nkeys", - "nuid", + "nuid 0.3.2", "once_cell", "rand", "regex", "ring", "rustls-native-certs", "rustls-pemfile", + "rustls-webpki", "serde", "serde_json", "serde_nanos", @@ -119,21 +118,11 @@ dependencies = [ "time 0.3.22", "tokio", "tokio-retry", + "tokio-rustls 0.24.1", "tracing", "url", ] -[[package]] -name = "async-nats-tokio-rustls-deps" -version = "0.24.0-ALPHA.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8cdefe54cd7867d937c0a507d2a3a830af410044282cd3e4002b5b7860e1892e" -dependencies = [ - "rustls 0.21.2", - "tokio", - "webpki 0.22.0", -] - [[package]] name = "async-trait" version = "0.1.70" @@ -264,15 +253,6 @@ version = "0.21.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "604178f6c5c21f02dc555784810edfb88d34ac2c73b2eae109655649ee73ce3d" -[[package]] -name = "base64-url" -version = "1.4.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67a99c239d0c7e77c85dddfa9cebce48704b3c49550fcd3b84dd637e4484899f" -dependencies = [ - "base64 0.13.1", -] - [[package]] name = "base64ct" version = "1.1.1" @@ -337,6 +317,9 @@ name = "bytes" version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" +dependencies = [ + "serde", +] [[package]] name = "cc" @@ -571,29 +554,6 @@ dependencies = [ "const-oid", ] -[[package]] -name = "diesel" -version = "1.4.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b28135ecf6b7d446b43e27e225622a038cc4e2930a1022f51cdb97ada19b8e4d" -dependencies = [ - "bitflags 1.3.2", - "byteorder", - "diesel_derives", - "pq-sys", -] - -[[package]] -name = "diesel_derives" -version = "1.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45f5098f628d02a7a0f68ddba586fb61e80edec3bdc1be3b921f4ceec60858d3" -dependencies = [ - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "difflib" version = "0.4.0" @@ -813,8 +773,8 @@ dependencies = [ "svc-agent", "svc-authn", "svc-authz", - "svc-conference-events", - "svc-error", + "svc-error 0.6.0", + "svc-events", "svc-nats-client", "svc-utils", "tokio", @@ -1552,9 +1512,9 @@ dependencies = [ [[package]] name = "nkeys" -version = "0.2.0" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e66a7cd1358277b2a6f77078e70aea7315ff2f20db969cc61153103ec162594" +checksum = "c2d151f6ece2f3d1077f6c779268de2516653d8344ddde65addd785cce764fe5" dependencies = [ "byteorder", "data-encoding", @@ -1601,6 +1561,16 @@ dependencies = [ "rand", ] +[[package]] +name = "nuid" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b61b1710432e483e6a67b20b6c60c6afe0e2fad67aabba3bdb912f3f70ff6ae" +dependencies = [ + "once_cell", + "rand", +] + [[package]] name = "num-bigint" version = "0.2.6" @@ -1947,15 +1917,6 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" -[[package]] -name = "pq-sys" -version = "0.4.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31c0052426df997c0cbd30789eb44ca097e3541717a7b8fa36b1c464ee7edebd" -dependencies = [ - "vcpkg", -] - [[package]] name = "predicates" version = "2.1.5" @@ -2247,8 +2208,8 @@ dependencies = [ "pollster", "thiserror", "tokio", - "tokio-rustls", - "webpki 0.21.4", + "tokio-rustls 0.22.0", + "webpki", ] [[package]] @@ -2397,14 +2358,14 @@ dependencies = [ "log", "ring", "sct 0.6.1", - "webpki 0.21.4", + "webpki", ] [[package]] name = "rustls" -version = "0.21.2" +version = "0.21.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e32ca28af694bc1bbf399c33a516dbdf1c90090b8ab23c2bc24f834aa2247f5f" +checksum = "79ea77c539259495ce8ca47f53e66ae0330a8819f67e23ac96ca02f50e7b7d36" dependencies = [ "log", "ring", @@ -2435,9 +2396,9 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.100.1" +version = "0.101.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6207cd5ed3d8dca7816f8f3725513a34609c0c765bf652b8c3cb4cfd87db46b" +checksum = "15f36a6828982f422756984e47912a7a51dcbc2a197aa791158f8ca61cd8204e" dependencies = [ "ring", "untrusted", @@ -3064,9 +3025,9 @@ checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" [[package]] name = "svc-agent" -version = "0.20.0" +version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1c90205083c87c6bc25990d5cdd26f98d8c431441e86dbd4e7421b28c428e04d" +checksum = "bef9c610b65f24bab61b52f5a7264983d0e014858e8496282aec7d688baf7869" dependencies = [ "async-channel", "base64 0.21.2", @@ -3089,7 +3050,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eb5cf659f78c8fff863c17ac4e674829517919716eeecab602e8d2941e89c111" dependencies = [ "chrono", - "diesel", "http", "jsonwebtoken", "serde", @@ -3119,21 +3079,23 @@ dependencies = [ ] [[package]] -name = "svc-conference-events" -version = "0.2.0" +name = "svc-error" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e9185e72a1486f9edf5d66e03a13e8e5cc514e1a579506ce4954b14c2c62a783" +checksum = "3f841d7fd45d6f179e9f3765491fcb5eea100a5bbe50ea47faf3f262031966d9" dependencies = [ + "anyhow", + "crossbeam-channel", + "http", "serde", - "serde_json", - "thiserror", + "serde_derive", ] [[package]] name = "svc-error" -version = "0.5.0" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f841d7fd45d6f179e9f3765491fcb5eea100a5bbe50ea47faf3f262031966d9" +checksum = "ad220c6bc89bc2e7b8af01db6dcfa4a513e18d78e7cf2f778e623ac22577eadf" dependencies = [ "anyhow", "crossbeam-channel", @@ -3149,31 +3111,49 @@ dependencies = [ "svc-authz", ] +[[package]] +name = "svc-events" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ad84bd15a598b693df7dd08ca832c3414d59d6847f134f479ff547264669735" +dependencies = [ + "serde", + "serde_json", + "svc-agent", + "svc-authn", + "thiserror", + "uuid", +] + [[package]] name = "svc-nats-client" -version = "0.2.0" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab57232e87e0c5c4d1ae2b810ffc971cdefd2cdf011d7c2a678cdc40a1a20c51" +checksum = "cf7705838936003cae1b79e726be255ea9702b8aad516dec9c998c7c93ef6f8d" dependencies = [ "anyhow", "async-nats", "async-trait", "futures", + "futures-util", "humantime-serde", + "nuid 0.4.1", "reqwest", "serde", "svc-agent", - "svc-error", + "svc-error 0.6.0", + "svc-events", "thiserror", + "tokio", "tracing", "uuid", ] [[package]] name = "svc-utils" -version = "0.7.4" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb63daf177825295faa99773d6fb6de26be0d3e9fbe427e3cac794358c000e99" +checksum = "b8443737f4d2444dc9e6a140a83d720685290d1ad763e75b59421958fa4a1a96" dependencies = [ "axum", "futures", @@ -3183,7 +3163,7 @@ dependencies = [ "prometheus", "svc-agent", "svc-authn", - "svc-error", + "svc-error 0.5.0", "tokio", "tower", "tower-http", @@ -3324,12 +3304,11 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.29.1" +version = "1.28.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "532826ff75199d5833b9d2c5fe410f29235e25704ee5f0ef599fb51c21f4a4da" +checksum = "94d7b1cfd2aa4011f2de74c2c4c63665e27a71006b0a192dcd2710272e73dfa2" dependencies = [ "autocfg", - "backtrace", "bytes", "libc", "mio", @@ -3382,7 +3361,17 @@ checksum = "bc6844de72e57df1980054b38be3a9f4702aba4858be64dd700181a8a6d0e1b6" dependencies = [ "rustls 0.19.1", "tokio", - "webpki 0.21.4", + "webpki", +] + +[[package]] +name = "tokio-rustls" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" +dependencies = [ + "rustls 0.21.5", + "tokio", ] [[package]] @@ -3799,16 +3788,6 @@ dependencies = [ "untrusted", ] -[[package]] -name = "webpki" -version = "0.22.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd" -dependencies = [ - "ring", - "untrusted", -] - [[package]] name = "whoami" version = "1.4.1" diff --git a/Cargo.toml b/Cargo.toml index 271107de..74b0072b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,7 @@ crossbeam-channel = "0.5" enum-iterator = "1.4" futures = "0.3" futures-channel = "0.3" -futures-util = "0.3" +futures-util = "0.3.28" http = "0.2" humantime-serde = "1.1" hyper = { version = "0.14", features = [ "server" ] } @@ -34,13 +34,13 @@ serde_qs = "0.12" signal-hook = "0.3" signal-hook-tokio = { version = "0.3", features = ["futures-v0_3"] } sqlx = { version = "0.6", features = ["offline", "postgres", "macros", "uuid", "chrono", "json", "bigdecimal", "runtime-tokio-native-tls"] } -svc-agent = { version = "0.20", features = ["sqlx", "queue-counter"] } +svc-agent = { version = "0.21", features = ["sqlx", "queue-counter"] } svc-authn = { version = "0.8", features = ["jose", "sqlx"] } svc-authz = { version = "0.12" } -svc-error = { version = "0.5", features = ["sqlx", "svc-agent", "svc-authn", "svc-authz", "sentry-extension"] } -svc-utils = { version = "0.7", features = ["authn-extractor", "cors-middleware", "log-middleware", "metrics-middleware"] } -svc-nats-client = { version = "0.2" } -svc-conference-events = { version = "0.2" } +svc-error = { version = "0.6", features = ["sqlx", "svc-agent", "svc-authn", "svc-authz", "sentry-extension"] } +svc-utils = { version = "0.8", features = ["authn-extractor", "cors-middleware", "log-middleware", "metrics-middleware"] } +svc-nats-client = "0.8" +svc-events = "0.11" tokio = { version = "1.28", features = ["full"] } tower = "0.4" tower-http = { version = "0.4", features = ["trace", "cors"] } diff --git a/chart/Chart.yaml b/chart/Chart.yaml index e08a4eec..28c4e221 100644 --- a/chart/Chart.yaml +++ b/chart/Chart.yaml @@ -16,4 +16,4 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 0.2.11 +version: 0.2.12 diff --git a/chart/templates/app-cm.yaml b/chart/templates/app-cm.yaml index c9fc4ce2..1713f809 100644 --- a/chart/templates/app-cm.yaml +++ b/chart/templates/app-cm.yaml @@ -33,10 +33,10 @@ data: [nats] url = {{ .url | quote }} creds = {{ .creds | quote }} - subscribe.stream = {{ .subscribe.stream | quote }} - subscribe.consumer = {{ .subscribe.consumer | quote }} - subscribe.batch = {{ .subscribe.batch }} - subscribe.idle_heartbeat = {{ .subscribe.idle_heartbeat | quote }} + subscribe_durable.stream = {{ .subscribe.stream | quote }} + subscribe_durable.consumer = {{ .subscribe.consumer | quote }} + subscribe_durable.batch = {{ .subscribe.batch }} + subscribe_durable.idle_heartbeat = {{ .subscribe.idle_heartbeat | quote }} {{- end }} {{- with .Values.nats_consumer }} diff --git a/docker/Dockerfile b/docker/Dockerfile index 630296ad..773f1aa3 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -1,7 +1,7 @@ ## ----------------------------------------------------------------------------- ## Build ## ----------------------------------------------------------------------------- -FROM rust:1.68.0-slim-buster as build-stage +FROM rust:1.70.0-slim-buster as build-stage RUN apt update && apt install -y --no-install-recommends \ pkg-config \ diff --git a/docker/migration.dockerfile b/docker/migration.dockerfile index 81e56577..92d1123f 100644 --- a/docker/migration.dockerfile +++ b/docker/migration.dockerfile @@ -1,4 +1,4 @@ -FROM rust:1.68.0-slim-buster +FROM rust:1.70.0-slim-buster RUN apt update && apt install -y --no-install-recommends \ pkg-config \ @@ -6,7 +6,7 @@ RUN apt update && apt install -y --no-install-recommends \ libcurl4-openssl-dev \ libpq-dev -RUN cargo install sqlx-cli --version 0.5.7 --no-default-features --features postgres +RUN cargo install sqlx-cli --version 0.6.3 --no-default-features --features native-tls,postgres WORKDIR /app CMD ["cargo", "sqlx", "migrate", "run"] COPY ./migrations /app/migrations diff --git a/src/app/context.rs b/src/app/context.rs index 0d9e80bb..d5bdbbaf 100644 --- a/src/app/context.rs +++ b/src/app/context.rs @@ -7,6 +7,7 @@ use sqlx::pool::PoolConnection; use sqlx::postgres::{PgPool as Db, Postgres}; use svc_agent::{queue_counter::QueueCounterHandle, AgentId}; use svc_authz::cache::ConnectionPool as RedisConnectionPool; +use svc_nats_client::NatsClient; use crate::config::Config; use crate::{ @@ -22,7 +23,7 @@ use super::broker_client::BrokerClient; pub trait Context: GlobalContext + MessageContext {} #[async_trait] -pub trait GlobalContext: Sync { +pub trait GlobalContext { fn authz(&self) -> &Authz; fn config(&self) -> &Config; fn db(&self) -> &Db; @@ -33,6 +34,7 @@ pub trait GlobalContext: Sync { fn metrics(&self) -> Arc; fn s3_client(&self) -> Option; fn broker_client(&self) -> &dyn BrokerClient; + fn nats_client(&self) -> Option>; async fn get_conn(&self) -> Result, AppError> { self.db() @@ -51,7 +53,7 @@ pub trait GlobalContext: Sync { } } -pub trait MessageContext: Send { +pub trait MessageContext { fn start_timestamp(&self) -> DateTime; } @@ -69,6 +71,7 @@ pub struct AppContext { metrics: Arc, s3_client: Option, broker_client: Arc, + nats_client: Option>, } impl AppContext { @@ -117,6 +120,10 @@ impl GlobalContext for AppContext { fn broker_client(&self) -> &dyn BrokerClient { self.broker_client.as_ref() } + + fn nats_client(&self) -> Option> { + self.nats_client.clone() + } } /////////////////////////////////////////////////////////////////////////////// @@ -175,6 +182,10 @@ impl<'a, C: GlobalContext> GlobalContext for AppMessageContext<'a, C> { fn broker_client(&self) -> &dyn BrokerClient { self.global_context.broker_client() } + + fn nats_client(&self) -> Option> { + self.global_context.nats_client() + } } impl<'a, C: GlobalContext> MessageContext for AppMessageContext<'a, C> { @@ -196,6 +207,7 @@ pub struct AppContextBuilder { agent_id: AgentId, queue_counter: Option, redis_pool: Option, + nats_client: Option>, } impl AppContextBuilder { @@ -211,6 +223,7 @@ impl AppContextBuilder { agent_id, queue_counter: None, redis_pool: None, + nats_client: None, } } @@ -235,6 +248,13 @@ impl AppContextBuilder { } } + pub fn add_nats_client(self, nats_client: impl NatsClient + 'static) -> Self { + Self { + nats_client: Some(Arc::new(nats_client)), + ..self + } + } + pub fn build(self, metrics: Arc) -> AppContext { AppContext { config: Arc::new(self.config), @@ -247,6 +267,7 @@ impl AppContextBuilder { redis_pool: self.redis_pool, metrics, s3_client: S3Client::new(), + nats_client: self.nats_client, } } } diff --git a/src/app/endpoint/agent.rs b/src/app/endpoint/agent.rs index 8a2093ce..4c60c061 100644 --- a/src/app/endpoint/agent.rs +++ b/src/app/endpoint/agent.rs @@ -61,7 +61,7 @@ pub struct ListHandler; impl RequestHandler for ListHandler { type Payload = ListRequest; - async fn handle( + async fn handle( context: &mut C, Self::Payload { room_id, payload }: Self::Payload, reqp: RequestParams<'_>, @@ -173,7 +173,7 @@ impl RequestHandler for UpdateHandler { type Payload = UpdateRequest; #[instrument(skip_all, fields(scope, room_id, classroom_id))] - async fn handle( + async fn handle( context: &mut C, Self::Payload { room_id, payload }: Self::Payload, reqp: RequestParams<'_>, diff --git a/src/app/endpoint/ban.rs b/src/app/endpoint/ban.rs index 673f4bbd..e39000d9 100644 --- a/src/app/endpoint/ban.rs +++ b/src/app/endpoint/ban.rs @@ -42,7 +42,7 @@ pub struct ListHandler; impl RequestHandler for ListHandler { type Payload = ListRequest; - async fn handle( + async fn handle( context: &mut C, Self::Payload { room_id }: Self::Payload, reqp: RequestParams<'_>, @@ -61,7 +61,7 @@ impl RequestHandler for ListHandler { room.audience().into(), reqp.as_account_id().to_owned(), object, - "update".into(), + "read".into(), ) .await?; @@ -132,7 +132,7 @@ mod tests { authz.allow( agent.account_id(), vec!["classrooms", &room.classroom_id().to_string()], - "update", + "read", ); let mut context = TestContext::new(db, authz); @@ -162,15 +162,7 @@ mod tests { shared_helpers::insert_room(&mut conn).await }; - let mut authz = TestAuthz::new(); - let classroom_id = room.classroom_id().to_string(); - authz.allow( - agent.account_id(), - vec!["classrooms", &classroom_id], - "read", - ); - - let mut context = TestContext::new(db, authz); + let mut context = TestContext::new(db, TestAuthz::new()); let payload = ListRequest { room_id: room.id() }; diff --git a/src/app/endpoint/change/create.rs b/src/app/endpoint/change/create.rs index 39fa2a41..7ab48347 100644 --- a/src/app/endpoint/change/create.rs +++ b/src/app/endpoint/change/create.rs @@ -48,7 +48,7 @@ impl RequestHandler for CreateHandler { scope, room_id, classroom_id, change_id ) )] - async fn handle( + async fn handle( context: &mut C, payload: Self::Payload, reqp: RequestParams<'_>, diff --git a/src/app/endpoint/change/delete.rs b/src/app/endpoint/change/delete.rs index 297fc425..14e5efd1 100644 --- a/src/app/endpoint/change/delete.rs +++ b/src/app/endpoint/change/delete.rs @@ -48,7 +48,7 @@ impl RequestHandler for DeleteHandler { scope, room_id, classroom_id, edition_id ) )] - async fn handle( + async fn handle( context: &mut C, payload: Self::Payload, reqp: RequestParams<'_>, diff --git a/src/app/endpoint/change/list.rs b/src/app/endpoint/change/list.rs index c63eb91f..d5817221 100644 --- a/src/app/endpoint/change/list.rs +++ b/src/app/endpoint/change/list.rs @@ -50,7 +50,7 @@ impl RequestHandler for ListHandler { type Payload = ListRequest; #[instrument(skip_all, fields(edition_id, scope, room_id, classroom_id, change_id))] - async fn handle( + async fn handle( context: &mut C, Self::Payload { id, payload }: Self::Payload, reqp: RequestParams<'_>, diff --git a/src/app/endpoint/edition/commit.rs b/src/app/endpoint/edition/commit.rs index 1006f8aa..c2d73eb4 100644 --- a/src/app/endpoint/edition/commit.rs +++ b/src/app/endpoint/edition/commit.rs @@ -56,7 +56,7 @@ impl RequestHandler for CommitHandler { type Payload = CommitRequest; #[instrument(skip_all, fields(edition_id, offset, room_id, scope, classroom_id,))] - async fn handle( + async fn handle( context: &mut C, CommitRequest { id, diff --git a/src/app/endpoint/edition/create.rs b/src/app/endpoint/edition/create.rs index 97810e70..e62d0120 100644 --- a/src/app/endpoint/edition/create.rs +++ b/src/app/endpoint/edition/create.rs @@ -46,7 +46,7 @@ impl RequestHandler for CreateHandler { scope, classroom_id, edition_id ) )] - async fn handle( + async fn handle( context: &mut C, payload: Self::Payload, reqp: RequestParams<'_>, diff --git a/src/app/endpoint/edition/delete.rs b/src/app/endpoint/edition/delete.rs index 491ac204..abb9b2cc 100644 --- a/src/app/endpoint/edition/delete.rs +++ b/src/app/endpoint/edition/delete.rs @@ -46,7 +46,7 @@ impl RequestHandler for DeleteHandler { room_id, scope, classroom_id ) )] - async fn handle( + async fn handle( context: &mut C, payload: Self::Payload, reqp: RequestParams<'_>, diff --git a/src/app/endpoint/edition/list.rs b/src/app/endpoint/edition/list.rs index 96e1d965..45e5b2b3 100644 --- a/src/app/endpoint/edition/list.rs +++ b/src/app/endpoint/edition/list.rs @@ -50,7 +50,7 @@ impl RequestHandler for ListHandler { type Payload = ListRequest; #[instrument(skip_all, fields(room_id, scope, classroom_id))] - async fn handle( + async fn handle( context: &mut C, Self::Payload { room_id, payload }: Self::Payload, reqp: RequestParams<'_>, diff --git a/src/app/endpoint/event.rs b/src/app/endpoint/event.rs index d8fc3bbf..6953d0af 100644 --- a/src/app/endpoint/event.rs +++ b/src/app/endpoint/event.rs @@ -85,7 +85,7 @@ impl RequestHandler for CreateHandler { type Payload = CreateRequest; #[instrument(skip_all, fields(room_id, scope, classroom_id))] - async fn handle( + async fn handle( context: &mut C, Self::Payload { room_id, payload }: Self::Payload, reqp: RequestParams<'_>, @@ -355,7 +355,7 @@ impl RequestHandler for ListHandler { type Payload = ListRequest; #[instrument(skip_all, fields(room_id, scope, classroom_id))] - async fn handle( + async fn handle( context: &mut C, Self::Payload { room_id, payload }: Self::Payload, reqp: RequestParams<'_>, diff --git a/src/app/endpoint/helpers.rs b/src/app/endpoint/helpers.rs index 527eb0f1..45be482c 100644 --- a/src/app/endpoint/helpers.rs +++ b/src/app/endpoint/helpers.rs @@ -40,7 +40,7 @@ pub enum RoomTimeRequirement { Open, } -pub async fn find_room( +pub async fn find_room( context: &mut C, id: Uuid, opening_requirement: RoomTimeRequirement, diff --git a/src/app/endpoint/mod.rs b/src/app/endpoint/mod.rs index 81d67030..f1e3519f 100644 --- a/src/app/endpoint/mod.rs +++ b/src/app/endpoint/mod.rs @@ -23,7 +23,7 @@ pub type MqttResult = StdResult; pub trait RequestHandler { type Payload: Send + DeserializeOwned; - async fn handle( + async fn handle( context: &mut C, payload: Self::Payload, reqp: RequestParams<'_>, @@ -32,7 +32,7 @@ pub trait RequestHandler { macro_rules! request_routes { ($($m: pat => $h: ty),*) => { - pub async fn route_request( + pub async fn route_request( context: &mut C, request: &IncomingRequest, ) -> Option { @@ -102,7 +102,7 @@ pub trait ResponseHandler { pub trait EventHandler { type Payload: Send + DeserializeOwned; - async fn handle( + async fn handle( context: &mut C, payload: Self::Payload, evp: &IncomingEventProperties, @@ -112,7 +112,7 @@ pub trait EventHandler { macro_rules! event_routes { ($($l: pat => $h: ty),*) => { #[allow(unused_variables)] - pub async fn route_event( + pub async fn route_event( context: &mut C, event: &IncomingEvent, ) -> Option { diff --git a/src/app/endpoint/room.rs b/src/app/endpoint/room.rs index f11fd8d9..bd4f6758 100644 --- a/src/app/endpoint/room.rs +++ b/src/app/endpoint/room.rs @@ -68,7 +68,7 @@ impl RequestHandler for CreateHandler { type Payload = CreateRequest; #[instrument(skip_all, fields(room_id, scope, classroom_id))] - async fn handle( + async fn handle( context: &mut C, payload: Self::Payload, reqp: RequestParams<'_>, @@ -194,7 +194,7 @@ impl RequestHandler for ReadHandler { room_id = %payload.id, scope, classroom_id ) )] - async fn handle( + async fn handle( context: &mut C, payload: Self::Payload, reqp: RequestParams<'_>, @@ -268,7 +268,7 @@ impl RequestHandler for UpdateHandler { type Payload = UpdateRequest; #[instrument(skip_all, fields(room_id, scope, classroom_id))] - async fn handle( + async fn handle( context: &mut C, Self::Payload { id, payload }: Self::Payload, reqp: RequestParams<'_>, @@ -429,7 +429,7 @@ impl RequestHandler for EnterHandler { room_id = %payload.id, scope, classroom_id ) )] - async fn handle( + async fn handle( context: &mut C, payload: Self::Payload, reqp: RequestParams<'_>, @@ -581,7 +581,7 @@ impl RequestHandler for LockedTypesHandler { type Payload = LockedTypesRequest; #[instrument(skip_all, fields(room_id, scope, classroom_id))] - async fn handle( + async fn handle( context: &mut C, Self::Payload { id, payload }: Self::Payload, reqp: RequestParams<'_>, @@ -695,7 +695,7 @@ impl RequestHandler for WhiteboardAccessHandler { type Payload = WhiteboardAccessRequest; #[instrument(skip_all, fields(room_id, scope, classroom_id))] - async fn handle( + async fn handle( context: &mut C, Self::Payload { id, payload }: Self::Payload, reqp: RequestParams<'_>, @@ -818,7 +818,7 @@ impl RequestHandler for AdjustHandler { type Payload = AdjustRequest; #[instrument(skip_all, fields(room_id, scope, classroom_id))] - async fn handle( + async fn handle( context: &mut C, Self::Payload { id, payload }: Self::Payload, reqp: RequestParams<'_>, diff --git a/src/app/endpoint/room/dump_events.rs b/src/app/endpoint/room/dump_events.rs index 3c20ba47..447f6c61 100644 --- a/src/app/endpoint/room/dump_events.rs +++ b/src/app/endpoint/room/dump_events.rs @@ -65,7 +65,7 @@ pub struct EventsDumpHandler; impl RequestHandler for EventsDumpHandler { type Payload = EventsDumpRequest; - async fn handle( + async fn handle( context: &mut C, payload: Self::Payload, reqp: RequestParams<'_>, diff --git a/src/app/endpoint/state.rs b/src/app/endpoint/state.rs index 90529f5d..9d14541d 100644 --- a/src/app/endpoint/state.rs +++ b/src/app/endpoint/state.rs @@ -62,7 +62,7 @@ impl RequestHandler for ReadHandler { type Payload = ReadRequest; #[instrument(skip_all, fields(room_id, scope, classroom_id))] - async fn handle( + async fn handle( context: &mut C, Self::Payload { room_id, payload }: Self::Payload, reqp: RequestParams<'_>, diff --git a/src/app/endpoint/subscription.rs b/src/app/endpoint/subscription.rs index e8faecd3..fb228eb3 100644 --- a/src/app/endpoint/subscription.rs +++ b/src/app/endpoint/subscription.rs @@ -59,7 +59,7 @@ impl EventHandler for DeleteEventHandler { type Payload = DeleteEventPayload; #[instrument(skip_all, fields(room_id))] - async fn handle( + async fn handle( context: &mut C, payload: Self::Payload, evp: &IncomingEventProperties, @@ -136,7 +136,7 @@ impl EventHandler for BroadcastDeleteEventHandler { type Payload = DeleteEventPayload; #[instrument(skip_all, fields(room_id))] - async fn handle( + async fn handle( context: &mut C, payload: Self::Payload, evp: &IncomingEventProperties, diff --git a/src/app/endpoint/system.rs b/src/app/endpoint/system.rs index 5bc0912c..1952fadf 100644 --- a/src/app/endpoint/system.rs +++ b/src/app/endpoint/system.rs @@ -18,7 +18,7 @@ pub struct VacuumHandler; impl RequestHandler for VacuumHandler { type Payload = VacuumRequest; - async fn handle( + async fn handle( context: &mut C, _payload: Self::Payload, reqp: RequestParams<'_>, diff --git a/src/app/error.rs b/src/app/error.rs index 21b1da9c..587568dc 100644 --- a/src/app/error.rs +++ b/src/app/error.rs @@ -50,6 +50,7 @@ pub enum ErrorKind { InternalNatsError, NatsMessageHandlingFailed, NatsPublishFailed, + NatsClientNotFound, } impl ErrorKind { @@ -283,6 +284,12 @@ impl From for ErrorKindProperties { title: "Nats publish failed", is_notify_sentry: true }, + ErrorKind::NatsClientNotFound => ErrorKindProperties { + status: ResponseStatus::FAILED_DEPENDENCY, + kind: "nats_client_not_found", + title: "Nats client not found", + is_notify_sentry: true, + }, } } } @@ -388,6 +395,22 @@ impl From for Error { } } +impl From for anyhow::Error { + fn from(e: Error) -> Self { + anyhow::anyhow!(e.to_svc_error()) + } +} + +impl From for Error { + fn from(kind: ErrorKind) -> Self { + Self { + kind, + err: None, + tags: HashMap::new(), + } + } +} + //////////////////////////////////////////////////////////////////////////////// pub trait ErrorExt { diff --git a/src/app/message_handler.rs b/src/app/message_handler.rs index 89bf6409..3e827117 100644 --- a/src/app/message_handler.rs +++ b/src/app/message_handler.rs @@ -223,7 +223,7 @@ pub fn publish_message(agent: &mut Agent, message: Message) -> Result<(), AppErr // We just need to specify the payload type and specific logic. pub trait RequestEnvelopeHandler<'async_trait> { - fn handle_envelope( + fn handle_envelope( context: &'async_trait mut C, request: &'async_trait IncomingRequest, ) -> Pin + Send + 'async_trait>>; @@ -235,7 +235,7 @@ pub trait RequestEnvelopeHandler<'async_trait> { impl<'async_trait, H: 'async_trait + Sync + endpoint::RequestHandler> RequestEnvelopeHandler<'async_trait> for H { - fn handle_envelope( + fn handle_envelope( context: &'async_trait mut C, request: &'async_trait IncomingRequest, ) -> Pin + Send + 'async_trait>> @@ -243,7 +243,7 @@ impl<'async_trait, H: 'async_trait + Sync + endpoint::RequestHandler> Self: Sync + 'async_trait, { // The actual implementation. - async fn handle_envelope( + async fn handle_envelope( context: &mut C, request: &IncomingRequest, ) -> MessageStream { @@ -282,7 +282,7 @@ impl<'async_trait, H: 'async_trait + Sync + endpoint::RequestHandler> // This is the same as with the above. pub trait ResponseEnvelopeHandler<'async_trait, CD> { - fn handle_envelope( + fn handle_envelope( context: &'async_trait mut C, envelope: &'async_trait IncomingResponse, corr_data: &'async_trait CD, @@ -292,7 +292,7 @@ pub trait ResponseEnvelopeHandler<'async_trait, CD> { impl<'async_trait, H: 'async_trait + endpoint::ResponseHandler> ResponseEnvelopeHandler<'async_trait, H::CorrelationData> for H { - fn handle_envelope( + fn handle_envelope( context: &'async_trait mut C, response: &'async_trait IncomingResponse, corr_data: &'async_trait H::CorrelationData, @@ -333,7 +333,7 @@ impl<'async_trait, H: 'async_trait + endpoint::ResponseHandler> } pub trait EventEnvelopeHandler<'async_trait> { - fn handle_envelope( + fn handle_envelope( context: &'async_trait mut C, envelope: &'async_trait IncomingEvent, ) -> Pin + Send + 'async_trait>>; @@ -343,12 +343,12 @@ pub trait EventEnvelopeHandler<'async_trait> { impl<'async_trait, H: 'async_trait + endpoint::EventHandler> EventEnvelopeHandler<'async_trait> for H { - fn handle_envelope( + fn handle_envelope( context: &'async_trait mut C, event: &'async_trait IncomingEvent, ) -> Pin + Send + 'async_trait>> { // The actual implementation. - async fn handle_envelope( + async fn handle_envelope( context: &mut C, event: &IncomingEvent, ) -> MessageStream { diff --git a/src/app/mod.rs b/src/app/mod.rs index a44b5685..4f2ca4db 100644 --- a/src/app/mod.rs +++ b/src/app/mod.rs @@ -99,6 +99,23 @@ pub async fn run( None => context_builder, }; + let nats_client = match &config.nats { + Some(cfg) => { + let nats_client = svc_nats_client::Client::new(cfg.clone()) + .await + .context("nats client")?; + info!("Connected to nats"); + + Some(nats_client) + } + None => None, + }; + + let context_builder = match nats_client.clone() { + Some(nats_client) => context_builder.add_nats_client(nats_client), + None => context_builder, + }; + let context = context_builder.queue_counter(queue_counter).build(metrics); let metrics_task = config.metrics.as_ref().map(|metrics| { @@ -107,6 +124,8 @@ pub async fn run( let metrics = context.metrics(); + // for nats consumer + let context_: Arc = Arc::new(context.clone()); let ctx = Arc::new(context.clone()); let (graceful_tx, graceful_rx) = tokio::sync::watch::channel(()); let mut shutdown_server_rx = graceful_rx.clone(); @@ -120,26 +139,14 @@ pub async fn run( }), ); - let nats_consumer = match config.nats.zip(config.nats_consumer) { - Some((nats_cfg, nats_consumer_cfg)) => { - let nats_client = svc_nats_client::Client::new(nats_cfg) - .await - .context("nats client")?; - info!("Connected to nats"); - - let nats_consumer = nats_consumer::run( - ctx.clone(), - nats_client, - nats_consumer_cfg, - graceful_rx.clone(), - ) - .await - .context("nats consumer")?; - info!("Nats consumer started"); - - Some(nats_consumer) + let nats_consumer = match (nats_client, &config.nats_consumer) { + (Some(nats_client), Some(cfg)) => { + svc_nats_client::consumer::run(nats_client, cfg.clone(), graceful_rx.clone(), { + let ctx_ = context_.clone(); + move |msg| crate::app::stage::route_message(ctx_.clone(), msg) + }) } - None => None, + _ => tokio::spawn(std::future::ready(Ok(()))), }; // Message handler @@ -155,10 +162,8 @@ pub async fn run( let _ = graceful_tx.send(()); - if let Some(consumer) = nats_consumer { - if let Err(err) = consumer.await { - error!(%err, "failed to await nats consumer completion"); - } + if let Err(err) = nats_consumer.await { + tracing::error!(%err, "nats consumer failed"); } if let Some(metrics_task) = metrics_task { @@ -293,7 +298,7 @@ pub mod endpoint; pub mod error; pub mod http; pub mod message_handler; -pub mod nats_consumer; pub mod operations; pub mod s3_client; pub mod service_utils; +pub mod stage; diff --git a/src/app/nats_consumer.rs b/src/app/nats_consumer.rs deleted file mode 100644 index ee511abc..00000000 --- a/src/app/nats_consumer.rs +++ /dev/null @@ -1,301 +0,0 @@ -use crate::{ - app::{ - context::GlobalContext, - error::{Error as AppError, ErrorKind, ErrorKindExt}, - }, - config, db, -}; -use anyhow::{Context, Result}; -use chrono::{DateTime, TimeZone, Utc}; -use futures_util::StreamExt; -use serde_json::json; -use std::{str::FromStr, sync::Arc, time::Duration}; -use svc_conference_events::{Event, EventV1}; -use svc_nats_client::{ - AckKind as NatsAckKind, Client, Message, MessageStream, NatsClient, Subject, SubscribeError, -}; -use tokio::{sync::watch, task::JoinHandle, time::Instant}; -use tracing::{error, info, warn}; - -pub async fn run( - ctx: Arc, - nats_client: Client, - nats_consumer_config: config::NatsConsumer, - shutdown_rx: watch::Receiver<()>, -) -> Result>> { - let handle = tokio::spawn(async move { - // In case of subscription errors we don't want to spam sentry - let mut sentry_last_sent = Instant::now() - nats_consumer_config.suspend_sentry_interval; - - loop { - let result = nats_client.subscribe().await; - let messages = match result { - Ok(messages) => messages, - Err(err) => { - error!(%err); - - if sentry_last_sent.elapsed() >= nats_consumer_config.suspend_sentry_interval { - anyhow!(err) - .kind(ErrorKind::NatsSubscriptionFailed) - .notify_sentry(); - sentry_last_sent = Instant::now(); - } - - tokio::time::sleep(nats_consumer_config.resubscribe_interval).await; - continue; - } - }; - - // Run the loop of getting messages from the stream - let reason = handle_stream( - ctx.as_ref(), - &nats_client, - &nats_consumer_config, - messages, - shutdown_rx.clone(), - ) - .await; - - match reason { - CompletionReason::Shutdown => { - warn!("Nats consumer completes its work"); - break; - } - CompletionReason::StreamClosed => { - // If the `handle_stream` function ends, then the stream was closed. - // Send an error to sentry and try to resubscribe. - let error = anyhow!("nats stream was closed"); - error!(%error); - - if sentry_last_sent.elapsed() >= nats_consumer_config.suspend_sentry_interval { - error - .kind(ErrorKind::NatsSubscriptionFailed) - .notify_sentry(); - sentry_last_sent = Instant::now(); - } - - tokio::time::sleep(nats_consumer_config.resubscribe_interval).await; - continue; - } - } - } - - Ok::<_, SubscribeError>(()) - }); - - Ok(handle) -} - -enum CompletionReason { - Shutdown, - StreamClosed, -} - -async fn handle_stream( - ctx: &dyn GlobalContext, - nats_client: &Client, - nats_consumer_config: &config::NatsConsumer, - mut messages: MessageStream, - mut shutdown_rx: watch::Receiver<()>, -) -> CompletionReason { - let mut retry_count = 0; - let mut suspend_interval: Option = None; - - loop { - if let Some(interval) = suspend_interval.take() { - warn!( - "nats consumer suspenses the processing of nats messages on {} seconds", - interval.as_secs() - ); - tokio::time::sleep(interval).await; - } - - tokio::select! { - result = messages.next() => { - let message = match result { - Some(Ok(msg)) => msg, - Some(Err(err)) => { - // Types of internal nats errors that may arise here: - // * Heartbeat errors - // * Failed to send request - // * Consumer deleted - // * Received unknown message - anyhow!(err) - .context("internal nats error") - .kind(ErrorKind::InternalNatsError) - .log() - .notify_sentry(); - - continue; - } - None => { - // Stream was closed. Send an error to sentry and try to resubscribe. - return CompletionReason::StreamClosed; - } - }; - - info!( - "got a message from nats, subject: {:?}, payload: {:?}, headers: {:?}", - message.subject, message.payload, message.headers - ); - - let result = handle_message(ctx, &message).await; - match result { - Ok(_) => { - retry_count = 0; - - if let Err(err) = message.ack().await { - anyhow!(err) - .context("nats ack error") - .kind(ErrorKind::NatsPublishFailed) - .log() - .notify_sentry(); - } - } - Err(HandleMessageError::DbConnAcquisitionFailed(err)) => { - err.log().notify_sentry(); - - if let Err(err) = message.ack_with(NatsAckKind::Nak(None)).await { - anyhow!(err) - .context("nats nack error") - .kind(ErrorKind::NatsPublishFailed) - .log() - .notify_sentry(); - } - - retry_count += 1; - let interval = next_suspend_interval(retry_count, nats_consumer_config); - suspend_interval = Some(interval); - } - Err(HandleMessageError::Other(err)) => { - err - .kind(ErrorKind::NatsMessageHandlingFailed) - .log() - .notify_sentry(); - - if let Err(err) = nats_client.terminate(message).await { - anyhow!(err) - .context("failed to handle nats message") - .kind(ErrorKind::NatsPublishFailed) - .log() - .notify_sentry(); - } - } - } - } - // Graceful shutdown - _ = shutdown_rx.changed() => { - return CompletionReason::Shutdown; - } - } - } -} - -fn next_suspend_interval( - retry_count: u32, - nats_consumer_config: &config::NatsConsumer, -) -> Duration { - let seconds = std::cmp::min( - nats_consumer_config.suspend_interval.as_secs() * 2_u64.pow(retry_count), - nats_consumer_config.max_suspend_interval.as_secs(), - ); - - Duration::from_secs(seconds) -} - -enum HandleMessageError { - DbConnAcquisitionFailed(AppError), - Other(anyhow::Error), -} - -impl From for HandleMessageError { - fn from(error: anyhow::Error) -> Self { - HandleMessageError::Other(error) - } -} - -async fn handle_message( - ctx: &dyn GlobalContext, - message: &Message, -) -> Result<(), HandleMessageError> { - let subject = Subject::from_str(&message.subject).context("parse nats subject")?; - let entity_type = subject.entity_type(); - - let event = - serde_json::from_slice::(message.payload.as_ref()).context("parse nats payload")?; - - let (label, created_at) = match event { - Event::V1(EventV1::VideoGroup(e)) => (e.as_label().to_owned(), e.created_at()), - }; - - let classroom_id = subject.classroom_id(); - let room = { - let mut conn = ctx - .get_conn() - .await - .map_err(HandleMessageError::DbConnAcquisitionFailed)?; - - db::room::FindQuery::by_classroom_id(classroom_id) - .execute(&mut conn) - .await - .context("find room by classroom_id")? - .ok_or(HandleMessageError::Other(anyhow!( - "failed to get room by classroom_id: {}", - classroom_id - )))? - }; - - let headers = svc_nats_client::Headers::try_from(message.headers.clone().unwrap_or_default()) - .context("parse nats headers")?; - let agent_id = headers.sender_id(); - let entity_event_id = headers.event_id().sequence_id(); - - let created_at: DateTime = Utc.timestamp_nanos(created_at); - let occurred_at = room - .time() - .map(|t| { - (created_at - t.start().to_owned()) - .num_nanoseconds() - .unwrap_or(i64::MAX) - }) - .map_err(|_| HandleMessageError::Other(anyhow!("invalid room time")))?; - - let mut conn = ctx - .get_conn() - .await - .map_err(HandleMessageError::DbConnAcquisitionFailed)?; - - let result = db::event::InsertQuery::new( - room.id(), - entity_type.to_string(), - json!({ entity_type: label }), - occurred_at, - agent_id.to_owned(), - ) - .context("invalid event data")? - .entity_type(entity_type.to_string()) - .entity_event_id(entity_event_id) - .execute(&mut conn) - .await; - - if let Err(sqlx::Error::Database(ref err)) = result { - if let Some("uniq_entity_type_entity_event_id") = err.constraint() { - warn!( - "duplicate nats message, entity_type: {:?}, entity_event_id: {:?}", - entity_type.to_string(), - entity_event_id - ); - - return Ok(()); - }; - } - - if let Err(err) = result { - return Err(HandleMessageError::Other(anyhow!( - "failed to create event from nats: {}", - err - ))); - } - - Ok(()) -} diff --git a/src/app/stage/mod.rs b/src/app/stage/mod.rs new file mode 100644 index 00000000..1aa7412e --- /dev/null +++ b/src/app/stage/mod.rs @@ -0,0 +1,298 @@ +use std::{convert::TryFrom, str::FromStr, sync::Arc}; + +use anyhow::Context; +use chrono::{DateTime, TimeZone, Utc}; +use svc_events::{ + ban::{BanAcceptedV1, BanCollaborationCompletedV1}, + Event, EventId, EventV1, VideoGroupEventV1, +}; +use svc_nats_client::{ + consumer::{FailureKind, FailureKindExt, HandleMessageFailure}, + Subject, +}; + +use crate::{db, metrics::QueryKey}; + +use super::{ + error::{Error, ErrorExt, ErrorKind, ErrorKindExt}, + GlobalContext, +}; + +pub async fn route_message( + ctx: Arc, + msg: Arc, +) -> Result<(), HandleMessageFailure> { + let subject = Subject::from_str(&msg.subject) + .context("parse nats subject") + .permanent()?; + + let event = serde_json::from_slice::(msg.payload.as_ref()) + .context("parse nats payload") + .permanent()?; + + let classroom_id = subject.classroom_id(); + let room = { + let mut conn = ctx + .get_conn() + .await + .map_err(anyhow::Error::from) + .transient()?; + + db::room::FindQuery::by_classroom_id(classroom_id) + .execute(&mut conn) + .await + .context("find room by classroom_id") + .transient()? + .ok_or(anyhow!( + "failed to get room by classroom_id: {}", + classroom_id + )) + .permanent()? + }; + + tracing::info!(?event, class_id = %classroom_id); + + let headers = svc_nats_client::Headers::try_from(msg.headers.clone().unwrap_or_default()) + .context("parse nats headers") + .permanent()?; + let _agent_id = headers.sender_id(); + + let r = match event { + Event::V1(EventV1::VideoGroup(e)) => { + handle_video_group(ctx.as_ref(), e, &room, subject, &headers).await + } + Event::V1(EventV1::BanAccepted(e)) => { + handle_ban_accepted(ctx.as_ref(), e, &room, subject, &headers).await + } + _ => { + // ignore + Ok(()) + } + }; + + FailureKindExt::map_err(r, |e| anyhow!(e)) +} + +async fn handle_video_group( + ctx: &(dyn GlobalContext + Sync), + e: VideoGroupEventV1, + room: &db::room::Object, + subject: Subject, + headers: &svc_nats_client::Headers, +) -> Result<(), HandleMessageFailure> { + let (label, created_at) = (e.as_label().to_owned(), e.created_at()); + let entity_type = subject.entity_type(); + let agent_id = headers.sender_id(); + let entity_event_id = headers.event_id().sequence_id(); + + let created_at: DateTime = Utc.timestamp_nanos(created_at); + let occurred_at = room + .time() + .map(|t| { + (created_at - t.start().to_owned()) + .num_nanoseconds() + .unwrap_or(i64::MAX) + }) + .map_err(|_| Error::from(ErrorKind::InvalidRoomTime)) + .permanent()?; + + let mut conn = ctx + .get_conn() + .await + .map_err(|_| Error::from(ErrorKind::DbConnAcquisitionFailed)) + .transient()?; + + let query = db::event::InsertQuery::new( + room.id(), + entity_type.to_string(), + serde_json::json!({ entity_type: label }), + occurred_at, + agent_id.to_owned(), + ) + .context("invalid event data") + .map_err(|e| e.kind(ErrorKind::InvalidEvent)) + .permanent()? + .entity_type(entity_type.to_string()) + .entity_event_id(entity_event_id); + + let result = ctx + .metrics() + .measure_query(QueryKey::EventInsertQuery, query.execute(&mut conn)) + .await; + + if let Err(sqlx::Error::Database(ref err)) = result { + if let Some("uniq_entity_type_entity_event_id") = err.constraint() { + tracing::warn!( + "duplicate nats message, entity_type: {:?}, entity_event_id: {:?}", + entity_type.to_string(), + entity_event_id + ); + + return Ok(()); + }; + } + + if let Err(err) = result { + return Err(HandleMessageFailure::Transient(Error::new( + super::error::ErrorKind::DbQueryFailed, + anyhow!("failed to create event from nats: {}", err), + ))); + } + + Ok(()) +} + +async fn handle_ban_accepted( + ctx: &(dyn GlobalContext + Sync), + e: BanAcceptedV1, + room: &db::room::Object, + subject: Subject, + headers: &svc_nats_client::Headers, +) -> Result<(), HandleMessageFailure> { + let mut conn = ctx.get_conn().await.transient()?; + + if e.ban { + let mut query = db::room_ban::InsertQuery::new(e.target_account.clone(), room.id()); + query.reason("ban event"); + + ctx.metrics() + .measure_query(QueryKey::BanInsertQuery, query.execute(&mut conn)) + .await + .context("Failed to insert room ban") + .map_err(|e| Error::new(ErrorKind::DbQueryFailed, e)) + .transient()?; + } else { + let query = db::room_ban::DeleteQuery::new(e.target_account.clone(), room.id()); + + ctx.metrics() + .measure_query(QueryKey::BanDeleteQuery, query.execute(&mut conn)) + .await + .context("Failed to delete room ban") + .map_err(|e| Error::new(ErrorKind::DbQueryFailed, e)) + .transient()?; + } + + let event_id = headers.event_id(); + let event = BanCollaborationCompletedV1::new_from_accepted(e, event_id.clone()); + let event = Event::from(event); + + let payload = serde_json::to_vec(&event) + .error(ErrorKind::InvalidPayload) + .permanent()?; + + let event_id = EventId::from(( + event_id.entity_type().to_owned(), + "collaboration_completed".to_owned(), + event_id.sequence_id(), + )); + + let event = svc_nats_client::event::Builder::new( + subject, + payload, + event_id.to_owned(), + ctx.agent_id().to_owned(), + ) + .build(); + + ctx.nats_client() + .ok_or_else(|| anyhow!("nats client not found")) + .error(ErrorKind::NatsClientNotFound) + .transient()? + .publish(&event) + .await + .error(ErrorKind::NatsPublishFailed) + .transient()?; + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + use crate::test_helpers::prelude::*; + + #[tokio::test] + async fn ban_accepted_handler_enables_disables_ban() { + let db = TestDb::new().await; + let agent = TestAgent::new("web", "user", USR_AUDIENCE); + + let mut conn = db.get_conn().await; + let room = shared_helpers::insert_room(&mut conn).await; + + let subject = Subject::new("test".to_string(), room.classroom_id(), "ban".to_string()); + let event_id: EventId = ("ban".to_string(), "accepted".to_string(), 0).into(); + let headers = + svc_nats_client::test_helpers::HeadersBuilder::new(event_id, agent.agent_id().clone()) + .build(); + let e = BanAcceptedV1 { + ban: true, + classroom_id: room.classroom_id(), + target_account: agent.account_id().clone(), + operation_id: 0, + }; + + let ctx = TestContext::new(db, TestAuthz::new()); + + handle_ban_accepted(&ctx, e, &room, subject, &headers) + .await + .expect("handler failed"); + + // to drop pub_reqs after all checks + { + let pub_reqs = ctx.inspect_nats_client().get_publish_requests(); + assert_eq!(pub_reqs.len(), 1); + + let payload = serde_json::from_slice::(&pub_reqs[0].payload) + .expect("failed to parse event"); + assert!(matches!( + payload, + Event::V1(EventV1::BanCollaborationCompleted( + BanCollaborationCompletedV1 { ban: true, .. } + )) + )); + } + + let room_bans = db::room_ban::ListQuery::new(room.id()) + .execute(&mut conn) + .await + .expect("failed to list room bans"); + + assert_eq!(room_bans.len(), 1); + assert_eq!(room_bans[0].account_id(), agent.account_id()); + + let subject = Subject::new("test".to_string(), room.classroom_id(), "ban".to_string()); + let e: BanAcceptedV1 = BanAcceptedV1 { + ban: false, + classroom_id: room.classroom_id(), + target_account: agent.account_id().clone(), + operation_id: 0, + }; + + handle_ban_accepted(&ctx, e, &room, subject, &headers) + .await + .expect("handler failed"); + + // to drop pub_reqs after all checks + { + let pub_reqs = ctx.inspect_nats_client().get_publish_requests(); + assert_eq!(pub_reqs.len(), 2); + + let payload = serde_json::from_slice::(&pub_reqs[1].payload) + .expect("failed to parse event"); + assert!(matches!( + payload, + Event::V1(EventV1::BanCollaborationCompleted( + BanCollaborationCompletedV1 { ban: false, .. } + )) + )); + } + + let room_bans = db::room_ban::ListQuery::new(room.id()) + .execute(&mut conn) + .await + .expect("failed to list room bans"); + + assert_eq!(room_bans.len(), 0); + } +} diff --git a/src/config.rs b/src/config.rs index a4e386b1..369fadeb 100644 --- a/src/config.rs +++ b/src/config.rs @@ -29,7 +29,7 @@ pub struct Config { pub constraint: Constraint, pub adjust: AdjustConfig, pub nats: Option, - pub nats_consumer: Option, + pub nats_consumer: Option, } impl Config { @@ -100,15 +100,3 @@ pub struct AdjustConfig { #[serde(with = "humantime_serde")] pub min_segment_length: StdDuration, } - -#[derive(Clone, Debug, Deserialize)] -pub struct NatsConsumer { - #[serde(with = "humantime_serde")] - pub suspend_interval: StdDuration, - #[serde(with = "humantime_serde")] - pub max_suspend_interval: StdDuration, - #[serde(with = "humantime_serde")] - pub suspend_sentry_interval: StdDuration, - #[serde(with = "humantime_serde")] - pub resubscribe_interval: StdDuration, -} diff --git a/src/test_helpers/context.rs b/src/test_helpers/context.rs index b931ccff..adad0abe 100644 --- a/src/test_helpers/context.rs +++ b/src/test_helpers/context.rs @@ -6,6 +6,7 @@ use serde_json::json; use sqlx::postgres::PgPool as Db; use svc_agent::{queue_counter::QueueCounterHandle, AgentId}; use svc_authz::cache::ConnectionPool as RedisConnectionPool; +use svc_nats_client::{test_helpers::TestNatsClient, NatsClient}; use crate::{ app::{ @@ -69,6 +70,7 @@ pub struct TestContext { start_timestamp: DateTime, s3_client: Option, broker_client: Arc, + nats_client: Arc, } impl TestContext { @@ -86,6 +88,7 @@ impl TestContext { start_timestamp: Utc::now(), s3_client: None, broker_client: Arc::new(MockBrokerClient::new()), + nats_client: Arc::new(TestNatsClient::new()), } } @@ -103,6 +106,7 @@ impl TestContext { start_timestamp: Utc::now(), s3_client: None, broker_client: Arc::new(MockBrokerClient::new()), + nats_client: Arc::new(TestNatsClient::new()), } } @@ -120,6 +124,7 @@ impl TestContext { start_timestamp: Utc::now(), s3_client: None, broker_client: Arc::new(MockBrokerClient::new()), + nats_client: Arc::new(TestNatsClient::new()), } } @@ -130,6 +135,10 @@ impl TestContext { pub fn broker_client_mock(&mut self) -> &mut MockBrokerClient { Arc::get_mut(&mut self.broker_client).expect("Failed to get broker client mock") } + + pub fn inspect_nats_client(&self) -> &TestNatsClient { + &self.nats_client + } } impl GlobalContext for TestContext { @@ -172,6 +181,10 @@ impl GlobalContext for TestContext { fn broker_client(&self) -> &dyn BrokerClient { self.broker_client.as_ref() } + + fn nats_client(&self) -> Option> { + Some(self.nats_client.clone() as Arc) + } } impl MessageContext for TestContext {