From 6ee380b68d756b69ae074def9b99fcbd7668b58f Mon Sep 17 00:00:00 2001 From: Dmitry Shlagoff Date: Wed, 31 May 2023 16:45:14 +0200 Subject: [PATCH 01/20] WIP ban handle --- Cargo.lock | 96 +++++++++++++++++----------------------- Cargo.toml | 4 +- src/app/nats_consumer.rs | 66 ++++++++++++++++++++++++--- 3 files changed, 101 insertions(+), 65 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5f7cf6c5..8cce9ea7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -69,12 +69,9 @@ dependencies = [ [[package]] name = "async-nats" version = "0.29.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1174495e436c928905018f10a36160f7a8a6786450f50f4ce7fba05d1539704c" +source = "git+https://github.com/foxford/nats.rs?branch=main#f5fa297c236dcca2e406761e5cc0fe78338a0ae0" dependencies = [ - "async-nats-tokio-rustls-deps", - "base64 0.13.1", - "base64-url", + "base64 0.21.0", "bytes", "futures", "http", @@ -88,6 +85,7 @@ dependencies = [ "ring", "rustls-native-certs", "rustls-pemfile", + "rustls-webpki", "serde", "serde_json", "serde_nanos", @@ -96,21 +94,11 @@ dependencies = [ "time 0.3.20", "tokio", "tokio-retry", + "tokio-rustls 0.24.0", "tracing", "url", ] -[[package]] -name = "async-nats-tokio-rustls-deps" -version = "0.24.0-ALPHA.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8cdefe54cd7867d937c0a507d2a3a830af410044282cd3e4002b5b7860e1892e" -dependencies = [ - "rustls 0.21.1", - "tokio", - "webpki 0.22.0", -] - [[package]] name = "async-trait" version = "0.1.68" @@ -241,15 +229,6 @@ version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4a4ddaa51a5bc52a6948f74c06d20aaaddb71924eab79b8c97a8c556e942d6a" -[[package]] -name = "base64-url" -version = "1.4.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67a99c239d0c7e77c85dddfa9cebce48704b3c49550fcd3b84dd637e4484899f" -dependencies = [ - "base64 0.13.1", -] - [[package]] name = "base64ct" version = "1.1.1" @@ -838,8 +817,8 @@ dependencies = [ "svc-agent", "svc-authn", "svc-authz", - "svc-conference-events", "svc-error", + "svc-events", "svc-nats-client", "svc-utils", "tokio", @@ -2273,8 +2252,8 @@ dependencies = [ "pollster", "thiserror", "tokio", - "tokio-rustls", - "webpki 0.21.4", + "tokio-rustls 0.22.0", + "webpki", ] [[package]] @@ -2423,7 +2402,7 @@ dependencies = [ "log", "ring", "sct 0.6.1", - "webpki 0.21.4", + "webpki", ] [[package]] @@ -3136,17 +3115,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "svc-conference-events" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e9185e72a1486f9edf5d66e03a13e8e5cc514e1a579506ce4954b14c2c62a783" -dependencies = [ - "serde", - "serde_json", - "thiserror", -] - [[package]] name = "svc-error" version = "0.5.0" @@ -3168,21 +3136,37 @@ dependencies = [ ] [[package]] -name = "svc-nats-client" -version = "0.2.0" +name = "svc-events" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab57232e87e0c5c4d1ae2b810ffc971cdefd2cdf011d7c2a678cdc40a1a20c51" +checksum = "15432fcea5a7eaa54236ee253e2b573bd720ef6c94a7849be17804200830b7d5" +dependencies = [ + "serde", + "serde_json", + "svc-agent", + "svc-authn", + "thiserror", + "uuid", +] + +[[package]] +name = "svc-nats-client" +version = "0.4.0" +source = "git+https://github.com/foxford/svc-nats-client/?branch=ULMS-1896/add-transient-permanent-errors#705d841c12fbc0b26233fb5935b362540d40ff80" dependencies = [ "anyhow", "async-nats", "async-trait", "futures", + "futures-util", "humantime-serde", "reqwest", "serde", "svc-agent", "svc-error", + "svc-events", "thiserror", + "tokio", "tracing", "uuid", ] @@ -3348,9 +3332,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.28.0" +version = "1.28.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3c786bf8134e5a3a166db9b29ab8f48134739014a3eca7bc6bfa95d673b136f" +checksum = "94d7b1cfd2aa4011f2de74c2c4c63665e27a71006b0a192dcd2710272e73dfa2" dependencies = [ "autocfg", "bytes", @@ -3405,7 +3389,17 @@ checksum = "bc6844de72e57df1980054b38be3a9f4702aba4858be64dd700181a8a6d0e1b6" dependencies = [ "rustls 0.19.1", "tokio", - "webpki 0.21.4", + "webpki", +] + +[[package]] +name = "tokio-rustls" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0d409377ff5b1e3ca6437aa86c1eb7d40c134bfec254e44c830defa92669db5" +dependencies = [ + "rustls 0.21.1", + "tokio", ] [[package]] @@ -3829,16 +3823,6 @@ dependencies = [ "untrusted", ] -[[package]] -name = "webpki" -version = "0.22.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd" -dependencies = [ - "ring", - "untrusted", -] - [[package]] name = "whoami" version = "1.4.0" diff --git a/Cargo.toml b/Cargo.toml index 3e527785..523fc6e7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,8 +39,8 @@ svc-authn = { version = "0.8", features = ["jose", "sqlx"] } svc-authz = { version = "0.12" } svc-error = { version = "0.5", features = ["sqlx", "svc-agent", "svc-authn", "svc-authz", "sentry-extension"] } svc-utils = { version = "0.7", features = ["authn-extractor", "cors-middleware", "log-middleware"] } -svc-nats-client = { version = "0.2" } -svc-conference-events = { version = "0.2" } +svc-nats-client = { git = "https://github.com/foxford/svc-nats-client/", branch = "ULMS-1896/add-transient-permanent-errors" } +svc-events = "0.7" tokio = { version = "1.28", features = ["full"] } tower = "0.4" tower-http = { version = "0.4", features = ["trace", "cors"] } diff --git a/src/app/nats_consumer.rs b/src/app/nats_consumer.rs index ee511abc..91fa72a4 100644 --- a/src/app/nats_consumer.rs +++ b/src/app/nats_consumer.rs @@ -4,13 +4,14 @@ use crate::{ error::{Error as AppError, ErrorKind, ErrorKindExt}, }, config, db, + metrics::QueryKey, }; use anyhow::{Context, Result}; use chrono::{DateTime, TimeZone, Utc}; use futures_util::StreamExt; use serde_json::json; use std::{str::FromStr, sync::Arc, time::Duration}; -use svc_conference_events::{Event, EventV1}; +use svc_events::{ban::BanAcceptedV1, Event, EventV1, VideoGroupEventV1}; use svc_nats_client::{ AckKind as NatsAckKind, Client, Message, MessageStream, NatsClient, Subject, SubscribeError, }; @@ -219,15 +220,9 @@ async fn handle_message( message: &Message, ) -> Result<(), HandleMessageError> { let subject = Subject::from_str(&message.subject).context("parse nats subject")?; - let entity_type = subject.entity_type(); - let event = serde_json::from_slice::(message.payload.as_ref()).context("parse nats payload")?; - let (label, created_at) = match event { - Event::V1(EventV1::VideoGroup(e)) => (e.as_label().to_owned(), e.created_at()), - }; - let classroom_id = subject.classroom_id(); let room = { let mut conn = ctx @@ -247,6 +242,31 @@ async fn handle_message( let headers = svc_nats_client::Headers::try_from(message.headers.clone().unwrap_or_default()) .context("parse nats headers")?; + + match event { + Event::V1(EventV1::VideoGroup(e)) => { + handle_video_group(ctx, e, &room, subject, &headers).await?; + } + Event::V1(EventV1::BanAccepted(e)) => { + handle_ban_accepted(ctx, e, &room).await?; + } + _ => { + // ignore + } + }; + + Ok(()) +} + +async fn handle_video_group( + ctx: &dyn GlobalContext, + e: VideoGroupEventV1, + room: &db::room::Object, + subject: Subject, + headers: &svc_nats_client::Headers, +) -> Result<(), HandleMessageError> { + let (label, created_at) = (e.as_label().to_owned(), e.created_at()); + let entity_type = subject.entity_type(); let agent_id = headers.sender_id(); let entity_event_id = headers.event_id().sequence_id(); @@ -299,3 +319,35 @@ async fn handle_message( Ok(()) } + +async fn handle_ban_accepted( + ctx: &dyn GlobalContext, + e: BanAcceptedV1, + room: &db::room::Object, +) -> Result<(), HandleMessageError> { + let mut conn = ctx + .get_conn() + .await + .map_err(HandleMessageError::DbConnAcquisitionFailed)?; + + if e.ban { + let mut query = db::room_ban::InsertQuery::new(e.user_account, room.id()); + query.reason("ban event"); + + ctx.metrics() + .measure_query(QueryKey::BanInsertQuery, query.execute(&mut conn)) + .await + .context("Failed to insert room ban") + .map_err(HandleMessageError::Other)?; + } else { + let query = db::room_ban::DeleteQuery::new(e.user_account, room.id()); + + ctx.metrics() + .measure_query(QueryKey::BanDeleteQuery, query.execute(&mut conn)) + .await + .context("Failed to delete room ban") + .map_err(HandleMessageError::Other)?; + } + + Ok(()) +} From c4c9d36f6131bb1e60959006de61aa56cd49e072 Mon Sep 17 00:00:00 2001 From: Dmitry Shlagoff Date: Fri, 2 Jun 2023 13:26:12 +0200 Subject: [PATCH 02/20] Refactor to use nats consumer from svc-nats-client --- Cargo.toml | 2 +- src/app/context.rs | 2 +- src/app/error.rs | 16 ++ src/app/mod.rs | 36 ++-- src/app/nats_consumer.rs | 353 --------------------------------------- src/app/stage/mod.rs | 211 +++++++++++++++++++++++ src/config.rs | 14 +- 7 files changed, 247 insertions(+), 387 deletions(-) delete mode 100644 src/app/nats_consumer.rs create mode 100644 src/app/stage/mod.rs diff --git a/Cargo.toml b/Cargo.toml index 523fc6e7..43295c2a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,7 @@ crossbeam-channel = "0.5" enum-iterator = "1.4" futures = "0.3" futures-channel = "0.3" -futures-util = "0.3" +futures-util = "0.3.28" http = "0.2" humantime-serde = "1.1" hyper = { version = "0.14", features = [ "server" ] } diff --git a/src/app/context.rs b/src/app/context.rs index bce66c5b..0a1cf405 100644 --- a/src/app/context.rs +++ b/src/app/context.rs @@ -22,7 +22,7 @@ use super::broker_client::BrokerClient; pub trait Context: GlobalContext + MessageContext {} #[async_trait] -pub trait GlobalContext: Sync { +pub trait GlobalContext: Sync + Send { fn authz(&self) -> &Authz; fn config(&self) -> &Config; fn db(&self) -> &Db; diff --git a/src/app/error.rs b/src/app/error.rs index 6bfc5dac..a3e79ae7 100644 --- a/src/app/error.rs +++ b/src/app/error.rs @@ -381,6 +381,22 @@ impl From for Error { } } +impl From for anyhow::Error { + fn from(e: Error) -> Self { + anyhow::anyhow!(e.to_svc_error()) + } +} + +impl From for Error { + fn from(kind: ErrorKind) -> Self { + Self { + kind, + err: None, + tags: HashMap::new(), + } + } +} + //////////////////////////////////////////////////////////////////////////////// pub trait ErrorExt { diff --git a/src/app/mod.rs b/src/app/mod.rs index a44b5685..b80f1166 100644 --- a/src/app/mod.rs +++ b/src/app/mod.rs @@ -120,28 +120,28 @@ pub async fn run( }), ); - let nats_consumer = match config.nats.zip(config.nats_consumer) { - Some((nats_cfg, nats_consumer_cfg)) => { - let nats_client = svc_nats_client::Client::new(nats_cfg) + let nats_client = match &config.nats { + Some(cfg) => { + let nats_client = svc_nats_client::Client::new(cfg.clone()) .await .context("nats client")?; info!("Connected to nats"); - let nats_consumer = nats_consumer::run( - ctx.clone(), - nats_client, - nats_consumer_cfg, - graceful_rx.clone(), - ) - .await - .context("nats consumer")?; - info!("Nats consumer started"); - - Some(nats_consumer) + Some(nats_client) } None => None, }; + let nats_consumer = match (nats_client, &config.nats_consumer) { + (Some(nats_client), Some(cfg)) => { + svc_nats_client::consumer::run(nats_client, cfg.clone(), graceful_rx.clone(), { + let ctx_ = ctx.clone(); + move |msg| crate::app::stage::route_message(ctx_.clone(), msg) + }) + } + _ => tokio::spawn(std::future::ready(Ok(()))), + }; + // Message handler let message_handler = Arc::new(MessageHandler::new(agent.clone(), context, dispatcher)); @@ -155,10 +155,8 @@ pub async fn run( let _ = graceful_tx.send(()); - if let Some(consumer) = nats_consumer { - if let Err(err) = consumer.await { - error!(%err, "failed to await nats consumer completion"); - } + if let Err(err) = nats_consumer.await { + tracing::error!(%err, "nats consumer failed"); } if let Some(metrics_task) = metrics_task { @@ -293,7 +291,7 @@ pub mod endpoint; pub mod error; pub mod http; pub mod message_handler; -pub mod nats_consumer; pub mod operations; pub mod s3_client; pub mod service_utils; +pub mod stage; diff --git a/src/app/nats_consumer.rs b/src/app/nats_consumer.rs deleted file mode 100644 index 91fa72a4..00000000 --- a/src/app/nats_consumer.rs +++ /dev/null @@ -1,353 +0,0 @@ -use crate::{ - app::{ - context::GlobalContext, - error::{Error as AppError, ErrorKind, ErrorKindExt}, - }, - config, db, - metrics::QueryKey, -}; -use anyhow::{Context, Result}; -use chrono::{DateTime, TimeZone, Utc}; -use futures_util::StreamExt; -use serde_json::json; -use std::{str::FromStr, sync::Arc, time::Duration}; -use svc_events::{ban::BanAcceptedV1, Event, EventV1, VideoGroupEventV1}; -use svc_nats_client::{ - AckKind as NatsAckKind, Client, Message, MessageStream, NatsClient, Subject, SubscribeError, -}; -use tokio::{sync::watch, task::JoinHandle, time::Instant}; -use tracing::{error, info, warn}; - -pub async fn run( - ctx: Arc, - nats_client: Client, - nats_consumer_config: config::NatsConsumer, - shutdown_rx: watch::Receiver<()>, -) -> Result>> { - let handle = tokio::spawn(async move { - // In case of subscription errors we don't want to spam sentry - let mut sentry_last_sent = Instant::now() - nats_consumer_config.suspend_sentry_interval; - - loop { - let result = nats_client.subscribe().await; - let messages = match result { - Ok(messages) => messages, - Err(err) => { - error!(%err); - - if sentry_last_sent.elapsed() >= nats_consumer_config.suspend_sentry_interval { - anyhow!(err) - .kind(ErrorKind::NatsSubscriptionFailed) - .notify_sentry(); - sentry_last_sent = Instant::now(); - } - - tokio::time::sleep(nats_consumer_config.resubscribe_interval).await; - continue; - } - }; - - // Run the loop of getting messages from the stream - let reason = handle_stream( - ctx.as_ref(), - &nats_client, - &nats_consumer_config, - messages, - shutdown_rx.clone(), - ) - .await; - - match reason { - CompletionReason::Shutdown => { - warn!("Nats consumer completes its work"); - break; - } - CompletionReason::StreamClosed => { - // If the `handle_stream` function ends, then the stream was closed. - // Send an error to sentry and try to resubscribe. - let error = anyhow!("nats stream was closed"); - error!(%error); - - if sentry_last_sent.elapsed() >= nats_consumer_config.suspend_sentry_interval { - error - .kind(ErrorKind::NatsSubscriptionFailed) - .notify_sentry(); - sentry_last_sent = Instant::now(); - } - - tokio::time::sleep(nats_consumer_config.resubscribe_interval).await; - continue; - } - } - } - - Ok::<_, SubscribeError>(()) - }); - - Ok(handle) -} - -enum CompletionReason { - Shutdown, - StreamClosed, -} - -async fn handle_stream( - ctx: &dyn GlobalContext, - nats_client: &Client, - nats_consumer_config: &config::NatsConsumer, - mut messages: MessageStream, - mut shutdown_rx: watch::Receiver<()>, -) -> CompletionReason { - let mut retry_count = 0; - let mut suspend_interval: Option = None; - - loop { - if let Some(interval) = suspend_interval.take() { - warn!( - "nats consumer suspenses the processing of nats messages on {} seconds", - interval.as_secs() - ); - tokio::time::sleep(interval).await; - } - - tokio::select! { - result = messages.next() => { - let message = match result { - Some(Ok(msg)) => msg, - Some(Err(err)) => { - // Types of internal nats errors that may arise here: - // * Heartbeat errors - // * Failed to send request - // * Consumer deleted - // * Received unknown message - anyhow!(err) - .context("internal nats error") - .kind(ErrorKind::InternalNatsError) - .log() - .notify_sentry(); - - continue; - } - None => { - // Stream was closed. Send an error to sentry and try to resubscribe. - return CompletionReason::StreamClosed; - } - }; - - info!( - "got a message from nats, subject: {:?}, payload: {:?}, headers: {:?}", - message.subject, message.payload, message.headers - ); - - let result = handle_message(ctx, &message).await; - match result { - Ok(_) => { - retry_count = 0; - - if let Err(err) = message.ack().await { - anyhow!(err) - .context("nats ack error") - .kind(ErrorKind::NatsPublishFailed) - .log() - .notify_sentry(); - } - } - Err(HandleMessageError::DbConnAcquisitionFailed(err)) => { - err.log().notify_sentry(); - - if let Err(err) = message.ack_with(NatsAckKind::Nak(None)).await { - anyhow!(err) - .context("nats nack error") - .kind(ErrorKind::NatsPublishFailed) - .log() - .notify_sentry(); - } - - retry_count += 1; - let interval = next_suspend_interval(retry_count, nats_consumer_config); - suspend_interval = Some(interval); - } - Err(HandleMessageError::Other(err)) => { - err - .kind(ErrorKind::NatsMessageHandlingFailed) - .log() - .notify_sentry(); - - if let Err(err) = nats_client.terminate(message).await { - anyhow!(err) - .context("failed to handle nats message") - .kind(ErrorKind::NatsPublishFailed) - .log() - .notify_sentry(); - } - } - } - } - // Graceful shutdown - _ = shutdown_rx.changed() => { - return CompletionReason::Shutdown; - } - } - } -} - -fn next_suspend_interval( - retry_count: u32, - nats_consumer_config: &config::NatsConsumer, -) -> Duration { - let seconds = std::cmp::min( - nats_consumer_config.suspend_interval.as_secs() * 2_u64.pow(retry_count), - nats_consumer_config.max_suspend_interval.as_secs(), - ); - - Duration::from_secs(seconds) -} - -enum HandleMessageError { - DbConnAcquisitionFailed(AppError), - Other(anyhow::Error), -} - -impl From for HandleMessageError { - fn from(error: anyhow::Error) -> Self { - HandleMessageError::Other(error) - } -} - -async fn handle_message( - ctx: &dyn GlobalContext, - message: &Message, -) -> Result<(), HandleMessageError> { - let subject = Subject::from_str(&message.subject).context("parse nats subject")?; - let event = - serde_json::from_slice::(message.payload.as_ref()).context("parse nats payload")?; - - let classroom_id = subject.classroom_id(); - let room = { - let mut conn = ctx - .get_conn() - .await - .map_err(HandleMessageError::DbConnAcquisitionFailed)?; - - db::room::FindQuery::by_classroom_id(classroom_id) - .execute(&mut conn) - .await - .context("find room by classroom_id")? - .ok_or(HandleMessageError::Other(anyhow!( - "failed to get room by classroom_id: {}", - classroom_id - )))? - }; - - let headers = svc_nats_client::Headers::try_from(message.headers.clone().unwrap_or_default()) - .context("parse nats headers")?; - - match event { - Event::V1(EventV1::VideoGroup(e)) => { - handle_video_group(ctx, e, &room, subject, &headers).await?; - } - Event::V1(EventV1::BanAccepted(e)) => { - handle_ban_accepted(ctx, e, &room).await?; - } - _ => { - // ignore - } - }; - - Ok(()) -} - -async fn handle_video_group( - ctx: &dyn GlobalContext, - e: VideoGroupEventV1, - room: &db::room::Object, - subject: Subject, - headers: &svc_nats_client::Headers, -) -> Result<(), HandleMessageError> { - let (label, created_at) = (e.as_label().to_owned(), e.created_at()); - let entity_type = subject.entity_type(); - let agent_id = headers.sender_id(); - let entity_event_id = headers.event_id().sequence_id(); - - let created_at: DateTime = Utc.timestamp_nanos(created_at); - let occurred_at = room - .time() - .map(|t| { - (created_at - t.start().to_owned()) - .num_nanoseconds() - .unwrap_or(i64::MAX) - }) - .map_err(|_| HandleMessageError::Other(anyhow!("invalid room time")))?; - - let mut conn = ctx - .get_conn() - .await - .map_err(HandleMessageError::DbConnAcquisitionFailed)?; - - let result = db::event::InsertQuery::new( - room.id(), - entity_type.to_string(), - json!({ entity_type: label }), - occurred_at, - agent_id.to_owned(), - ) - .context("invalid event data")? - .entity_type(entity_type.to_string()) - .entity_event_id(entity_event_id) - .execute(&mut conn) - .await; - - if let Err(sqlx::Error::Database(ref err)) = result { - if let Some("uniq_entity_type_entity_event_id") = err.constraint() { - warn!( - "duplicate nats message, entity_type: {:?}, entity_event_id: {:?}", - entity_type.to_string(), - entity_event_id - ); - - return Ok(()); - }; - } - - if let Err(err) = result { - return Err(HandleMessageError::Other(anyhow!( - "failed to create event from nats: {}", - err - ))); - } - - Ok(()) -} - -async fn handle_ban_accepted( - ctx: &dyn GlobalContext, - e: BanAcceptedV1, - room: &db::room::Object, -) -> Result<(), HandleMessageError> { - let mut conn = ctx - .get_conn() - .await - .map_err(HandleMessageError::DbConnAcquisitionFailed)?; - - if e.ban { - let mut query = db::room_ban::InsertQuery::new(e.user_account, room.id()); - query.reason("ban event"); - - ctx.metrics() - .measure_query(QueryKey::BanInsertQuery, query.execute(&mut conn)) - .await - .context("Failed to insert room ban") - .map_err(HandleMessageError::Other)?; - } else { - let query = db::room_ban::DeleteQuery::new(e.user_account, room.id()); - - ctx.metrics() - .measure_query(QueryKey::BanDeleteQuery, query.execute(&mut conn)) - .await - .context("Failed to delete room ban") - .map_err(HandleMessageError::Other)?; - } - - Ok(()) -} diff --git a/src/app/stage/mod.rs b/src/app/stage/mod.rs new file mode 100644 index 00000000..3ddc9dc4 --- /dev/null +++ b/src/app/stage/mod.rs @@ -0,0 +1,211 @@ +use std::{convert::TryFrom, str::FromStr, sync::Arc}; + +use anyhow::Context; +use chrono::{DateTime, TimeZone, Utc}; +use svc_events::{ban::BanAcceptedV1, Event, EventV1, VideoGroupEventV1}; +use svc_nats_client::{consumer::HandleMessageOutcome, Subject}; + +use crate::{db, metrics::QueryKey}; + +use super::{ + error::{Error, ErrorKindExt}, + GlobalContext, +}; + +// pub mod ban; + +pub async fn route_message( + ctx: Arc, + msg: Arc, +) -> HandleMessageOutcome { + match do_route_msg(ctx, msg).await { + Ok(_) => HandleMessageOutcome::Processed, + Err(HandleMsgFailure::Transient(e)) => { + tracing::error!(%e, "transient failure, retrying"); + HandleMessageOutcome::ProcessLater + } + Err(HandleMsgFailure::Permanent(e)) => { + tracing::error!(%e, "permanent failure, won't process"); + HandleMessageOutcome::WontProcess + } + } +} + +pub enum HandleMsgFailure { + Transient(E), + Permanent(E), +} + +trait FailureKind { + /// This error can be fixed by retrying later. + fn transient(self) -> Result>; + /// This error can't be fixed by retrying later (parse failure, unknown id, etc). + fn permanent(self) -> Result>; +} + +impl FailureKind for Result { + fn transient(self) -> Result> { + self.map_err(|e| HandleMsgFailure::Transient(e)) + } + + fn permanent(self) -> Result> { + self.map_err(|e| HandleMsgFailure::Permanent(e)) + } +} + +async fn do_route_msg( + ctx: Arc, + msg: Arc, +) -> Result<(), HandleMsgFailure> { + let subject = Subject::from_str(&msg.subject) + .context("parse nats subject") + .permanent()?; + + let event = serde_json::from_slice::(msg.payload.as_ref()) + .context("parse nats payload") + .permanent()?; + + let classroom_id = subject.classroom_id(); + let room = { + let mut conn = ctx + .get_conn() + .await + .map_err(anyhow::Error::from) + .transient()?; + + db::room::FindQuery::by_classroom_id(classroom_id) + .execute(&mut conn) + .await + .context("find room by classroom_id") + .transient()? + .ok_or(anyhow!( + "failed to get room by classroom_id: {}", + classroom_id + )) + .permanent()? + }; + + let headers = svc_nats_client::Headers::try_from(msg.headers.clone().unwrap_or_default()) + .context("parse nats headers") + .permanent()?; + let _agent_id = headers.sender_id(); + + let r = match event { + Event::V1(EventV1::VideoGroup(e)) => { + handle_video_group(ctx.as_ref(), e, &room, subject, &headers).await + } + Event::V1(EventV1::BanAccepted(e)) => handle_ban_accepted(ctx.as_ref(), e, &room).await, + _ => { + // ignore + Ok(()) + } + }; + + match r { + Ok(_) => Ok(()), + Err(HandleMsgFailure::Transient(e)) => Err(HandleMsgFailure::Transient(anyhow!(e))), + Err(HandleMsgFailure::Permanent(e)) => { + e.notify_sentry(); + Err(HandleMsgFailure::Permanent(anyhow!(e))) + } + } +} + +async fn handle_video_group( + ctx: &dyn GlobalContext, + e: VideoGroupEventV1, + room: &db::room::Object, + subject: Subject, + headers: &svc_nats_client::Headers, +) -> Result<(), HandleMsgFailure> { + let (label, created_at) = (e.as_label().to_owned(), e.created_at()); + let entity_type = subject.entity_type(); + let agent_id = headers.sender_id(); + let entity_event_id = headers.event_id().sequence_id(); + + let created_at: DateTime = Utc.timestamp_nanos(created_at); + let occurred_at = room + .time() + .map(|t| { + (created_at - t.start().to_owned()) + .num_nanoseconds() + .unwrap_or(i64::MAX) + }) + .map_err(|_| Error::from(super::error::ErrorKind::InvalidRoomTime)) + .permanent()?; + + let mut conn = ctx + .get_conn() + .await + .map_err(|_| Error::from(super::error::ErrorKind::DbConnAcquisitionFailed)) + .transient()?; + + let result = db::event::InsertQuery::new( + room.id(), + entity_type.to_string(), + serde_json::json!({ entity_type: label }), + occurred_at, + agent_id.to_owned(), + ) + .context("invalid event data") + .map_err(|e| e.kind(super::error::ErrorKind::InvalidEvent)) + .permanent()? + .entity_type(entity_type.to_string()) + .entity_event_id(entity_event_id) + .execute(&mut conn) + .await; + + if let Err(sqlx::Error::Database(ref err)) = result { + if let Some("uniq_entity_type_entity_event_id") = err.constraint() { + tracing::warn!( + "duplicate nats message, entity_type: {:?}, entity_event_id: {:?}", + entity_type.to_string(), + entity_event_id + ); + + return Ok(()); + }; + } + + if let Err(err) = result { + return Err(HandleMsgFailure::Transient(Error::new( + super::error::ErrorKind::DbQueryFailed, + anyhow!("failed to create event from nats: {}", err), + ))); + } + + Ok(()) +} + +async fn handle_ban_accepted( + ctx: &dyn GlobalContext, + e: BanAcceptedV1, + room: &db::room::Object, +) -> Result<(), HandleMsgFailure> { + let mut conn = ctx.get_conn().await.transient()?; + + if e.ban { + let mut query = db::room_ban::InsertQuery::new(e.user_account, room.id()); + query.reason("ban event"); + + ctx.metrics() + .measure_query(QueryKey::BanInsertQuery, query.execute(&mut conn)) + .await + .context("Failed to insert room ban") + .map_err(|e| Error::new(super::error::ErrorKind::DbQueryFailed, e)) + .transient()?; + } else { + let query = db::room_ban::DeleteQuery::new(e.user_account, room.id()); + + ctx.metrics() + .measure_query(QueryKey::BanDeleteQuery, query.execute(&mut conn)) + .await + .context("Failed to delete room ban") + .map_err(|e| Error::new(super::error::ErrorKind::DbQueryFailed, e)) + .transient()?; + } + + // TODO: publish event + + Ok(()) +} diff --git a/src/config.rs b/src/config.rs index a4e386b1..369fadeb 100644 --- a/src/config.rs +++ b/src/config.rs @@ -29,7 +29,7 @@ pub struct Config { pub constraint: Constraint, pub adjust: AdjustConfig, pub nats: Option, - pub nats_consumer: Option, + pub nats_consumer: Option, } impl Config { @@ -100,15 +100,3 @@ pub struct AdjustConfig { #[serde(with = "humantime_serde")] pub min_segment_length: StdDuration, } - -#[derive(Clone, Debug, Deserialize)] -pub struct NatsConsumer { - #[serde(with = "humantime_serde")] - pub suspend_interval: StdDuration, - #[serde(with = "humantime_serde")] - pub max_suspend_interval: StdDuration, - #[serde(with = "humantime_serde")] - pub suspend_sentry_interval: StdDuration, - #[serde(with = "humantime_serde")] - pub resubscribe_interval: StdDuration, -} From 05718e38eb91f5f1af806951318a489f4f8f9b1f Mon Sep 17 00:00:00 2001 From: Dmitry Shlagoff Date: Fri, 2 Jun 2023 14:21:46 +0200 Subject: [PATCH 03/20] Publish completed event after ban handle --- src/app/context.rs | 21 ++++++++++++++++++ src/app/error.rs | 7 ++++++ src/app/mod.rs | 29 ++++++++++++++----------- src/app/stage/mod.rs | 51 ++++++++++++++++++++++++++++++++++---------- 4 files changed, 85 insertions(+), 23 deletions(-) diff --git a/src/app/context.rs b/src/app/context.rs index 0a1cf405..00d01b52 100644 --- a/src/app/context.rs +++ b/src/app/context.rs @@ -7,6 +7,7 @@ use sqlx::pool::PoolConnection; use sqlx::postgres::{PgPool as Db, Postgres}; use svc_agent::{queue_counter::QueueCounterHandle, AgentId}; use svc_authz::cache::ConnectionPool as RedisConnectionPool; +use svc_nats_client::NatsClient; use crate::config::Config; use crate::{ @@ -33,6 +34,7 @@ pub trait GlobalContext: Sync + Send { fn metrics(&self) -> Arc; fn s3_client(&self) -> Option; fn broker_client(&self) -> &dyn BrokerClient; + fn nats_client(&self) -> Option<&dyn NatsClient>; async fn get_conn(&self) -> Result, AppError> { self.db() @@ -69,6 +71,7 @@ pub struct AppContext { metrics: Arc, s3_client: Option, broker_client: Arc, + nats_client: Option>, } impl AppContext { @@ -117,6 +120,10 @@ impl GlobalContext for AppContext { fn broker_client(&self) -> &dyn BrokerClient { self.broker_client.as_ref() } + + fn nats_client(&self) -> Option<&dyn NatsClient> { + self.nats_client.as_deref() + } } /////////////////////////////////////////////////////////////////////////////// @@ -175,6 +182,10 @@ impl<'a, C: GlobalContext> GlobalContext for AppMessageContext<'a, C> { fn broker_client(&self) -> &dyn BrokerClient { self.global_context.broker_client() } + + fn nats_client(&self) -> Option<&dyn NatsClient> { + self.global_context.nats_client() + } } impl<'a, C: GlobalContext> MessageContext for AppMessageContext<'a, C> { @@ -196,6 +207,7 @@ pub(crate) struct AppContextBuilder { agent_id: AgentId, queue_counter: Option, redis_pool: Option, + nats_client: Option>, } impl AppContextBuilder { @@ -216,6 +228,7 @@ impl AppContextBuilder { agent_id, queue_counter: None, redis_pool: None, + nats_client: None, } } @@ -240,6 +253,13 @@ impl AppContextBuilder { } } + pub fn add_nats_client(self, nats_client: impl NatsClient + 'static) -> Self { + Self { + nats_client: Some(Arc::new(nats_client)), + ..self + } + } + pub(crate) fn build(self, metrics: Arc) -> AppContext { AppContext { config: Arc::new(self.config), @@ -252,6 +272,7 @@ impl AppContextBuilder { redis_pool: self.redis_pool, metrics, s3_client: S3Client::new(), + nats_client: self.nats_client, } } } diff --git a/src/app/error.rs b/src/app/error.rs index a3e79ae7..8f010fba 100644 --- a/src/app/error.rs +++ b/src/app/error.rs @@ -49,6 +49,7 @@ pub enum ErrorKind { InternalNatsError, NatsMessageHandlingFailed, NatsPublishFailed, + NatsClientNotFound, } impl ErrorKind { @@ -276,6 +277,12 @@ impl From for ErrorKindProperties { title: "Nats publish failed", is_notify_sentry: true }, + ErrorKind::NatsClientNotFound => ErrorKindProperties { + status: ResponseStatus::FAILED_DEPENDENCY, + kind: "nats_client_not_found", + title: "Nats client not found", + is_notify_sentry: true, + }, } } } diff --git a/src/app/mod.rs b/src/app/mod.rs index b80f1166..e1638d04 100644 --- a/src/app/mod.rs +++ b/src/app/mod.rs @@ -99,6 +99,23 @@ pub async fn run( None => context_builder, }; + let nats_client = match &config.nats { + Some(cfg) => { + let nats_client = svc_nats_client::Client::new(cfg.clone()) + .await + .context("nats client")?; + info!("Connected to nats"); + + Some(nats_client) + } + None => None, + }; + + let context_builder = match nats_client.clone() { + Some(nats_client) => context_builder.add_nats_client(nats_client), + None => context_builder, + }; + let context = context_builder.queue_counter(queue_counter).build(metrics); let metrics_task = config.metrics.as_ref().map(|metrics| { @@ -120,18 +137,6 @@ pub async fn run( }), ); - let nats_client = match &config.nats { - Some(cfg) => { - let nats_client = svc_nats_client::Client::new(cfg.clone()) - .await - .context("nats client")?; - info!("Connected to nats"); - - Some(nats_client) - } - None => None, - }; - let nats_consumer = match (nats_client, &config.nats_consumer) { (Some(nats_client), Some(cfg)) => { svc_nats_client::consumer::run(nats_client, cfg.clone(), graceful_rx.clone(), { diff --git a/src/app/stage/mod.rs b/src/app/stage/mod.rs index 3ddc9dc4..6766fc4e 100644 --- a/src/app/stage/mod.rs +++ b/src/app/stage/mod.rs @@ -2,13 +2,16 @@ use std::{convert::TryFrom, str::FromStr, sync::Arc}; use anyhow::Context; use chrono::{DateTime, TimeZone, Utc}; -use svc_events::{ban::BanAcceptedV1, Event, EventV1, VideoGroupEventV1}; +use svc_events::{ + ban::{BanAcceptedV1, BanCollaborationCompletedV1}, + Event, EventV1, VideoGroupEventV1, +}; use svc_nats_client::{consumer::HandleMessageOutcome, Subject}; use crate::{db, metrics::QueryKey}; use super::{ - error::{Error, ErrorKindExt}, + error::{Error, ErrorExt, ErrorKind, ErrorKindExt}, GlobalContext, }; @@ -94,7 +97,9 @@ async fn do_route_msg( Event::V1(EventV1::VideoGroup(e)) => { handle_video_group(ctx.as_ref(), e, &room, subject, &headers).await } - Event::V1(EventV1::BanAccepted(e)) => handle_ban_accepted(ctx.as_ref(), e, &room).await, + Event::V1(EventV1::BanAccepted(e)) => { + handle_ban_accepted(ctx.as_ref(), e, &room, subject, &headers).await + } _ => { // ignore Ok(()) @@ -131,13 +136,13 @@ async fn handle_video_group( .num_nanoseconds() .unwrap_or(i64::MAX) }) - .map_err(|_| Error::from(super::error::ErrorKind::InvalidRoomTime)) + .map_err(|_| Error::from(ErrorKind::InvalidRoomTime)) .permanent()?; let mut conn = ctx .get_conn() .await - .map_err(|_| Error::from(super::error::ErrorKind::DbConnAcquisitionFailed)) + .map_err(|_| Error::from(ErrorKind::DbConnAcquisitionFailed)) .transient()?; let result = db::event::InsertQuery::new( @@ -148,7 +153,7 @@ async fn handle_video_group( agent_id.to_owned(), ) .context("invalid event data") - .map_err(|e| e.kind(super::error::ErrorKind::InvalidEvent)) + .map_err(|e| e.kind(ErrorKind::InvalidEvent)) .permanent()? .entity_type(entity_type.to_string()) .entity_event_id(entity_event_id) @@ -181,31 +186,55 @@ async fn handle_ban_accepted( ctx: &dyn GlobalContext, e: BanAcceptedV1, room: &db::room::Object, + subject: Subject, + headers: &svc_nats_client::Headers, ) -> Result<(), HandleMsgFailure> { let mut conn = ctx.get_conn().await.transient()?; if e.ban { - let mut query = db::room_ban::InsertQuery::new(e.user_account, room.id()); + let mut query = db::room_ban::InsertQuery::new(e.user_account.clone(), room.id()); query.reason("ban event"); ctx.metrics() .measure_query(QueryKey::BanInsertQuery, query.execute(&mut conn)) .await .context("Failed to insert room ban") - .map_err(|e| Error::new(super::error::ErrorKind::DbQueryFailed, e)) + .map_err(|e| Error::new(ErrorKind::DbQueryFailed, e)) .transient()?; } else { - let query = db::room_ban::DeleteQuery::new(e.user_account, room.id()); + let query = db::room_ban::DeleteQuery::new(e.user_account.clone(), room.id()); ctx.metrics() .measure_query(QueryKey::BanDeleteQuery, query.execute(&mut conn)) .await .context("Failed to delete room ban") - .map_err(|e| Error::new(super::error::ErrorKind::DbQueryFailed, e)) + .map_err(|e| Error::new(ErrorKind::DbQueryFailed, e)) .transient()?; } - // TODO: publish event + let event_id = headers.event_id(); + let event = BanCollaborationCompletedV1::new_from_accepted(e, event_id.clone()); + + let payload = serde_json::to_vec(&event) + .error(ErrorKind::InvalidPayload) + .permanent()?; + + let event = svc_nats_client::event::Builder::new( + subject, + payload, + event_id.to_owned(), + ctx.agent_id().to_owned(), + ) + .build(); + + ctx.nats_client() + .ok_or_else(|| anyhow!("nats client not found")) + .error(ErrorKind::NatsClientNotFound) + .transient()? + .publish(&event) + .await + .error(ErrorKind::NatsPublishFailed) + .transient()?; Ok(()) } From 81710d5e27fdd46f698b235f2fbab1a599c2f955 Mon Sep 17 00:00:00 2001 From: Dmitry Shlagoff Date: Mon, 5 Jun 2023 14:01:08 +0200 Subject: [PATCH 04/20] Update svc-events --- src/app/stage/mod.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/app/stage/mod.rs b/src/app/stage/mod.rs index 6766fc4e..77670291 100644 --- a/src/app/stage/mod.rs +++ b/src/app/stage/mod.rs @@ -15,8 +15,6 @@ use super::{ GlobalContext, }; -// pub mod ban; - pub async fn route_message( ctx: Arc, msg: Arc, @@ -192,7 +190,7 @@ async fn handle_ban_accepted( let mut conn = ctx.get_conn().await.transient()?; if e.ban { - let mut query = db::room_ban::InsertQuery::new(e.user_account.clone(), room.id()); + let mut query = db::room_ban::InsertQuery::new(e.target_account.clone(), room.id()); query.reason("ban event"); ctx.metrics() @@ -202,7 +200,7 @@ async fn handle_ban_accepted( .map_err(|e| Error::new(ErrorKind::DbQueryFailed, e)) .transient()?; } else { - let query = db::room_ban::DeleteQuery::new(e.user_account.clone(), room.id()); + let query = db::room_ban::DeleteQuery::new(e.target_account.clone(), room.id()); ctx.metrics() .measure_query(QueryKey::BanDeleteQuery, query.execute(&mut conn)) From cca7bd9ccaa2ecddc79c0830f5f10d4a44d46c88 Mon Sep 17 00:00:00 2001 From: Dmitry Shlagoff Date: Mon, 5 Jun 2023 15:10:34 +0200 Subject: [PATCH 05/20] Attempt to remove super traits for GlobalContext --- src/app/context.rs | 6 +++--- src/app/endpoint/agent.rs | 4 ++-- src/app/endpoint/ban.rs | 2 +- src/app/endpoint/event.rs | 4 ++-- src/app/endpoint/helpers.rs | 2 +- src/app/endpoint/mod.rs | 8 ++++---- src/app/endpoint/room.rs | 14 +++++++------- src/app/endpoint/room/dump_events.rs | 2 +- src/app/endpoint/state.rs | 2 +- src/app/endpoint/subscription.rs | 2 +- src/app/endpoint/system.rs | 2 +- src/app/message_handler.rs | 12 ++++++------ src/app/mod.rs | 4 +++- src/app/stage/mod.rs | 8 ++++---- src/test_helpers/context.rs | 29 ++++++++++++++++++++++++++++ 15 files changed, 66 insertions(+), 35 deletions(-) diff --git a/src/app/context.rs b/src/app/context.rs index 00d01b52..96e23ce4 100644 --- a/src/app/context.rs +++ b/src/app/context.rs @@ -23,7 +23,7 @@ use super::broker_client::BrokerClient; pub trait Context: GlobalContext + MessageContext {} #[async_trait] -pub trait GlobalContext: Sync + Send { +pub trait GlobalContext { fn authz(&self) -> &Authz; fn config(&self) -> &Config; fn db(&self) -> &Db; @@ -188,13 +188,13 @@ impl<'a, C: GlobalContext> GlobalContext for AppMessageContext<'a, C> { } } -impl<'a, C: GlobalContext> MessageContext for AppMessageContext<'a, C> { +impl<'a, C: GlobalContext + Sync> MessageContext for AppMessageContext<'a, C> { fn start_timestamp(&self) -> DateTime { self.start_timestamp } } -impl<'a, C: GlobalContext> Context for AppMessageContext<'a, C> {} +impl<'a, C: GlobalContext + Sync> Context for AppMessageContext<'a, C> {} /////////////////////////////////////////////////////////////////////////////// diff --git a/src/app/endpoint/agent.rs b/src/app/endpoint/agent.rs index 9ecbbfe8..3451ae0a 100644 --- a/src/app/endpoint/agent.rs +++ b/src/app/endpoint/agent.rs @@ -61,7 +61,7 @@ pub(crate) struct ListHandler; impl RequestHandler for ListHandler { type Payload = ListRequest; - async fn handle( + async fn handle( context: &mut C, Self::Payload { room_id, payload }: Self::Payload, reqp: RequestParams<'_>, @@ -173,7 +173,7 @@ impl RequestHandler for UpdateHandler { type Payload = UpdateRequest; #[instrument(skip_all, fields(scope, room_id, classroom_id))] - async fn handle( + async fn handle( context: &mut C, Self::Payload { room_id, payload }: Self::Payload, reqp: RequestParams<'_>, diff --git a/src/app/endpoint/ban.rs b/src/app/endpoint/ban.rs index dc3cc842..73f0056c 100644 --- a/src/app/endpoint/ban.rs +++ b/src/app/endpoint/ban.rs @@ -42,7 +42,7 @@ pub(crate) struct ListHandler; impl RequestHandler for ListHandler { type Payload = ListRequest; - async fn handle( + async fn handle( context: &mut C, Self::Payload { room_id }: Self::Payload, reqp: RequestParams<'_>, diff --git a/src/app/endpoint/event.rs b/src/app/endpoint/event.rs index 9cf9cc43..0616a2cb 100644 --- a/src/app/endpoint/event.rs +++ b/src/app/endpoint/event.rs @@ -85,7 +85,7 @@ impl RequestHandler for CreateHandler { type Payload = CreateRequest; #[instrument(skip_all, fields(room_id, scope, classroom_id))] - async fn handle( + async fn handle( context: &mut C, Self::Payload { room_id, payload }: Self::Payload, reqp: RequestParams<'_>, @@ -355,7 +355,7 @@ impl RequestHandler for ListHandler { type Payload = ListRequest; #[instrument(skip_all, fields(room_id, scope, classroom_id))] - async fn handle( + async fn handle( context: &mut C, Self::Payload { room_id, payload }: Self::Payload, reqp: RequestParams<'_>, diff --git a/src/app/endpoint/helpers.rs b/src/app/endpoint/helpers.rs index cd779dc8..aab57f81 100644 --- a/src/app/endpoint/helpers.rs +++ b/src/app/endpoint/helpers.rs @@ -40,7 +40,7 @@ pub(crate) enum RoomTimeRequirement { Open, } -pub(crate) async fn find_room( +pub(crate) async fn find_room( context: &mut C, id: Uuid, opening_requirement: RoomTimeRequirement, diff --git a/src/app/endpoint/mod.rs b/src/app/endpoint/mod.rs index be4f328c..b7a0392d 100644 --- a/src/app/endpoint/mod.rs +++ b/src/app/endpoint/mod.rs @@ -23,7 +23,7 @@ pub(crate) type MqttResult = StdResult; pub(crate) trait RequestHandler { type Payload: Send + DeserializeOwned; - async fn handle( + async fn handle( context: &mut C, payload: Self::Payload, reqp: RequestParams<'_>, @@ -32,7 +32,7 @@ pub(crate) trait RequestHandler { macro_rules! request_routes { ($($m: pat => $h: ty),*) => { - pub(crate) async fn route_request( + pub(crate) async fn route_request( context: &mut C, request: &IncomingRequest, ) -> Option { @@ -102,7 +102,7 @@ pub(crate) trait ResponseHandler { pub(crate) trait EventHandler { type Payload: Send + DeserializeOwned; - async fn handle( + async fn handle( context: &mut C, payload: Self::Payload, evp: &IncomingEventProperties, @@ -112,7 +112,7 @@ pub(crate) trait EventHandler { macro_rules! event_routes { ($($l: pat => $h: ty),*) => { #[allow(unused_variables)] - pub(crate) async fn route_event( + pub(crate) async fn route_event( context: &mut C, event: &IncomingEvent, ) -> Option { diff --git a/src/app/endpoint/room.rs b/src/app/endpoint/room.rs index 37fae964..05d9544d 100644 --- a/src/app/endpoint/room.rs +++ b/src/app/endpoint/room.rs @@ -69,7 +69,7 @@ impl RequestHandler for CreateHandler { type Payload = CreateRequest; #[instrument(skip_all, fields(room_id, scope, classroom_id))] - async fn handle( + async fn handle( context: &mut C, payload: Self::Payload, reqp: RequestParams<'_>, @@ -195,7 +195,7 @@ impl RequestHandler for ReadHandler { room_id = %payload.id, scope, classroom_id ) )] - async fn handle( + async fn handle( context: &mut C, payload: Self::Payload, reqp: RequestParams<'_>, @@ -269,7 +269,7 @@ impl RequestHandler for UpdateHandler { type Payload = UpdateRequest; #[instrument(skip_all, fields(room_id, scope, classroom_id))] - async fn handle( + async fn handle( context: &mut C, Self::Payload { id, payload }: Self::Payload, reqp: RequestParams<'_>, @@ -430,7 +430,7 @@ impl RequestHandler for EnterHandler { room_id = %payload.id, scope, classroom_id ) )] - async fn handle( + async fn handle( context: &mut C, payload: Self::Payload, reqp: RequestParams<'_>, @@ -582,7 +582,7 @@ impl RequestHandler for LockedTypesHandler { type Payload = LockedTypesRequest; #[instrument(skip_all, fields(room_id, scope, classroom_id))] - async fn handle( + async fn handle( context: &mut C, Self::Payload { id, payload }: Self::Payload, reqp: RequestParams<'_>, @@ -696,7 +696,7 @@ impl RequestHandler for WhiteboardAccessHandler { type Payload = WhiteboardAccessRequest; #[instrument(skip_all, fields(room_id, scope, classroom_id))] - async fn handle( + async fn handle( context: &mut C, Self::Payload { id, payload }: Self::Payload, reqp: RequestParams<'_>, @@ -819,7 +819,7 @@ impl RequestHandler for AdjustHandler { type Payload = AdjustRequest; #[instrument(skip_all, fields(room_id, scope, classroom_id))] - async fn handle( + async fn handle( context: &mut C, Self::Payload { id, payload }: Self::Payload, reqp: RequestParams<'_>, diff --git a/src/app/endpoint/room/dump_events.rs b/src/app/endpoint/room/dump_events.rs index 03df67b6..f89eac27 100644 --- a/src/app/endpoint/room/dump_events.rs +++ b/src/app/endpoint/room/dump_events.rs @@ -65,7 +65,7 @@ pub(crate) struct EventsDumpHandler; impl RequestHandler for EventsDumpHandler { type Payload = EventsDumpRequest; - async fn handle( + async fn handle( context: &mut C, payload: Self::Payload, reqp: RequestParams<'_>, diff --git a/src/app/endpoint/state.rs b/src/app/endpoint/state.rs index 6a12e969..71e48820 100644 --- a/src/app/endpoint/state.rs +++ b/src/app/endpoint/state.rs @@ -62,7 +62,7 @@ impl RequestHandler for ReadHandler { type Payload = ReadRequest; #[instrument(skip_all, fields(room_id, scope, classroom_id))] - async fn handle( + async fn handle( context: &mut C, Self::Payload { room_id, payload }: Self::Payload, reqp: RequestParams<'_>, diff --git a/src/app/endpoint/subscription.rs b/src/app/endpoint/subscription.rs index cb5fbbde..0720faae 100644 --- a/src/app/endpoint/subscription.rs +++ b/src/app/endpoint/subscription.rs @@ -59,7 +59,7 @@ impl EventHandler for DeleteEventHandler { type Payload = DeleteEventPayload; #[instrument(skip_all, fields(room_id))] - async fn handle( + async fn handle( context: &mut C, payload: Self::Payload, evp: &IncomingEventProperties, diff --git a/src/app/endpoint/system.rs b/src/app/endpoint/system.rs index 400f23ab..1947b666 100644 --- a/src/app/endpoint/system.rs +++ b/src/app/endpoint/system.rs @@ -18,7 +18,7 @@ pub(crate) struct VacuumHandler; impl RequestHandler for VacuumHandler { type Payload = VacuumRequest; - async fn handle( + async fn handle( context: &mut C, _payload: Self::Payload, reqp: RequestParams<'_>, diff --git a/src/app/message_handler.rs b/src/app/message_handler.rs index 71407a51..79c40288 100644 --- a/src/app/message_handler.rs +++ b/src/app/message_handler.rs @@ -223,7 +223,7 @@ pub(crate) fn publish_message(agent: &mut Agent, message: Message) -> Result<(), // We just need to specify the payload type and specific logic. pub(crate) trait RequestEnvelopeHandler<'async_trait> { - fn handle_envelope( + fn handle_envelope( context: &'async_trait mut C, request: &'async_trait IncomingRequest, ) -> Pin + Send + 'async_trait>>; @@ -235,7 +235,7 @@ pub(crate) trait RequestEnvelopeHandler<'async_trait> { impl<'async_trait, H: 'async_trait + Sync + endpoint::RequestHandler> RequestEnvelopeHandler<'async_trait> for H { - fn handle_envelope( + fn handle_envelope( context: &'async_trait mut C, request: &'async_trait IncomingRequest, ) -> Pin + Send + 'async_trait>> @@ -243,7 +243,7 @@ impl<'async_trait, H: 'async_trait + Sync + endpoint::RequestHandler> Self: Sync + 'async_trait, { // The actual implementation. - async fn handle_envelope( + async fn handle_envelope( context: &mut C, request: &IncomingRequest, ) -> MessageStream { @@ -333,7 +333,7 @@ impl<'async_trait, H: 'async_trait + endpoint::ResponseHandler> } pub(crate) trait EventEnvelopeHandler<'async_trait> { - fn handle_envelope( + fn handle_envelope( context: &'async_trait mut C, envelope: &'async_trait IncomingEvent, ) -> Pin + Send + 'async_trait>>; @@ -343,12 +343,12 @@ pub(crate) trait EventEnvelopeHandler<'async_trait> { impl<'async_trait, H: 'async_trait + endpoint::EventHandler> EventEnvelopeHandler<'async_trait> for H { - fn handle_envelope( + fn handle_envelope( context: &'async_trait mut C, event: &'async_trait IncomingEvent, ) -> Pin + Send + 'async_trait>> { // The actual implementation. - async fn handle_envelope( + async fn handle_envelope( context: &mut C, event: &IncomingEvent, ) -> MessageStream { diff --git a/src/app/mod.rs b/src/app/mod.rs index e1638d04..4f2ca4db 100644 --- a/src/app/mod.rs +++ b/src/app/mod.rs @@ -124,6 +124,8 @@ pub async fn run( let metrics = context.metrics(); + // for nats consumer + let context_: Arc = Arc::new(context.clone()); let ctx = Arc::new(context.clone()); let (graceful_tx, graceful_rx) = tokio::sync::watch::channel(()); let mut shutdown_server_rx = graceful_rx.clone(); @@ -140,7 +142,7 @@ pub async fn run( let nats_consumer = match (nats_client, &config.nats_consumer) { (Some(nats_client), Some(cfg)) => { svc_nats_client::consumer::run(nats_client, cfg.clone(), graceful_rx.clone(), { - let ctx_ = ctx.clone(); + let ctx_ = context_.clone(); move |msg| crate::app::stage::route_message(ctx_.clone(), msg) }) } diff --git a/src/app/stage/mod.rs b/src/app/stage/mod.rs index 77670291..7eae3b47 100644 --- a/src/app/stage/mod.rs +++ b/src/app/stage/mod.rs @@ -16,7 +16,7 @@ use super::{ }; pub async fn route_message( - ctx: Arc, + ctx: Arc, msg: Arc, ) -> HandleMessageOutcome { match do_route_msg(ctx, msg).await { @@ -55,7 +55,7 @@ impl FailureKind for Result { } async fn do_route_msg( - ctx: Arc, + ctx: Arc, msg: Arc, ) -> Result<(), HandleMsgFailure> { let subject = Subject::from_str(&msg.subject) @@ -115,7 +115,7 @@ async fn do_route_msg( } async fn handle_video_group( - ctx: &dyn GlobalContext, + ctx: &(dyn GlobalContext + Sync), e: VideoGroupEventV1, room: &db::room::Object, subject: Subject, @@ -181,7 +181,7 @@ async fn handle_video_group( } async fn handle_ban_accepted( - ctx: &dyn GlobalContext, + ctx: &(dyn GlobalContext + Sync), e: BanAcceptedV1, room: &db::room::Object, subject: Subject, diff --git a/src/test_helpers/context.rs b/src/test_helpers/context.rs index 864e3499..c51decb7 100644 --- a/src/test_helpers/context.rs +++ b/src/test_helpers/context.rs @@ -1,11 +1,15 @@ use std::sync::Arc; +use async_trait::async_trait; use chrono::{DateTime, Utc}; use prometheus::Registry; use serde_json::json; use sqlx::postgres::PgPool as Db; use svc_agent::{queue_counter::QueueCounterHandle, AgentId}; use svc_authz::cache::ConnectionPool as RedisConnectionPool; +use svc_nats_client::{ + Event, Message, MessageStream, NatsClient, PublishError, SubscribeError, TermMessageError, +}; use crate::{ app::{ @@ -69,6 +73,7 @@ pub(crate) struct TestContext { start_timestamp: DateTime, s3_client: Option, broker_client: Arc, + nats_client: Option>, } impl TestContext { @@ -86,6 +91,7 @@ impl TestContext { start_timestamp: Utc::now(), s3_client: None, broker_client: Arc::new(MockBrokerClient::new()), + nats_client: Some(Arc::new(TestNatsClient {}) as Arc), } } @@ -103,6 +109,7 @@ impl TestContext { start_timestamp: Utc::now(), s3_client: None, broker_client: Arc::new(MockBrokerClient::new()), + nats_client: Some(Arc::new(TestNatsClient {}) as Arc), } } @@ -120,6 +127,7 @@ impl TestContext { start_timestamp: Utc::now(), s3_client: None, broker_client: Arc::new(MockBrokerClient::new()), + nats_client: Some(Arc::new(TestNatsClient {}) as Arc), } } @@ -132,6 +140,23 @@ impl TestContext { } } +struct TestNatsClient; + +#[async_trait] +impl NatsClient for TestNatsClient { + async fn publish(&self, _event: &Event) -> Result<(), PublishError> { + Ok(()) + } + + async fn subscribe(&self) -> Result { + unimplemented!() + } + + async fn terminate(&self, _message: &Message) -> Result<(), TermMessageError> { + unimplemented!() + } +} + impl GlobalContext for TestContext { fn authz(&self) -> &Authz { &self.authz @@ -172,6 +197,10 @@ impl GlobalContext for TestContext { fn broker_client(&self) -> &dyn BrokerClient { self.broker_client.as_ref() } + + fn nats_client(&self) -> Option<&dyn NatsClient> { + self.nats_client.as_deref() + } } impl MessageContext for TestContext { From 3cfd857bc6a57aa754f1a856c658b964c18ae247 Mon Sep 17 00:00:00 2001 From: Dmitry Shlagoff Date: Mon, 5 Jun 2023 17:03:55 +0200 Subject: [PATCH 06/20] Avoid ICE + remove super trait from MessageContext trait --- src/app/context.rs | 2 +- src/app/endpoint/agent.rs | 12 ++++---- src/app/endpoint/ban.rs | 6 ++-- src/app/endpoint/event.rs | 12 ++++---- src/app/endpoint/mod.rs | 12 ++++---- src/app/endpoint/room.rs | 42 ++++++++++++++-------------- src/app/endpoint/room/dump_events.rs | 6 ++-- src/app/endpoint/state.rs | 6 ++-- src/app/endpoint/subscription.rs | 4 +-- src/app/endpoint/system.rs | 6 ++-- src/app/message_handler.rs | 16 +++++------ src/app/stage/mod.rs | 8 +++--- 12 files changed, 66 insertions(+), 66 deletions(-) diff --git a/src/app/context.rs b/src/app/context.rs index 96e23ce4..2c2f3b4a 100644 --- a/src/app/context.rs +++ b/src/app/context.rs @@ -53,7 +53,7 @@ pub trait GlobalContext { } } -pub trait MessageContext: Send { +pub trait MessageContext { fn start_timestamp(&self) -> DateTime; } diff --git a/src/app/endpoint/agent.rs b/src/app/endpoint/agent.rs index 3451ae0a..811354cc 100644 --- a/src/app/endpoint/agent.rs +++ b/src/app/endpoint/agent.rs @@ -61,10 +61,10 @@ pub(crate) struct ListHandler; impl RequestHandler for ListHandler { type Payload = ListRequest; - async fn handle( - context: &mut C, + async fn handle<'a, C: Context + Sync + Send>( + context: &'a mut C, Self::Payload { room_id, payload }: Self::Payload, - reqp: RequestParams<'_>, + reqp: RequestParams<'a>, ) -> RequestResult { let room = helpers::find_room(context, room_id, helpers::RoomTimeRequirement::Open).await?; @@ -173,10 +173,10 @@ impl RequestHandler for UpdateHandler { type Payload = UpdateRequest; #[instrument(skip_all, fields(scope, room_id, classroom_id))] - async fn handle( - context: &mut C, + async fn handle<'a, C: Context + Sync + Send>( + context: &'a mut C, Self::Payload { room_id, payload }: Self::Payload, - reqp: RequestParams<'_>, + reqp: RequestParams<'a>, ) -> RequestResult { let room = helpers::find_room(context, room_id, helpers::RoomTimeRequirement::Open).await?; diff --git a/src/app/endpoint/ban.rs b/src/app/endpoint/ban.rs index 73f0056c..88dd1898 100644 --- a/src/app/endpoint/ban.rs +++ b/src/app/endpoint/ban.rs @@ -42,10 +42,10 @@ pub(crate) struct ListHandler; impl RequestHandler for ListHandler { type Payload = ListRequest; - async fn handle( - context: &mut C, + async fn handle<'a, C: Context + Sync + Send>( + context: &'a mut C, Self::Payload { room_id }: Self::Payload, - reqp: RequestParams<'_>, + reqp: RequestParams<'a>, ) -> RequestResult { let room = helpers::find_room(context, room_id, helpers::RoomTimeRequirement::Open).await?; diff --git a/src/app/endpoint/event.rs b/src/app/endpoint/event.rs index 0616a2cb..6d5c4d83 100644 --- a/src/app/endpoint/event.rs +++ b/src/app/endpoint/event.rs @@ -85,10 +85,10 @@ impl RequestHandler for CreateHandler { type Payload = CreateRequest; #[instrument(skip_all, fields(room_id, scope, classroom_id))] - async fn handle( - context: &mut C, + async fn handle<'a, C: Context + Sync + Send>( + context: &'a mut C, Self::Payload { room_id, payload }: Self::Payload, - reqp: RequestParams<'_>, + reqp: RequestParams<'a>, ) -> RequestResult { let (room, author) = { let room = @@ -355,10 +355,10 @@ impl RequestHandler for ListHandler { type Payload = ListRequest; #[instrument(skip_all, fields(room_id, scope, classroom_id))] - async fn handle( - context: &mut C, + async fn handle<'a, C: Context + Sync + Send>( + context: &'a mut C, Self::Payload { room_id, payload }: Self::Payload, - reqp: RequestParams<'_>, + reqp: RequestParams<'a>, ) -> RequestResult { let room = helpers::find_room(context, room_id, helpers::RoomTimeRequirement::Any).await?; diff --git a/src/app/endpoint/mod.rs b/src/app/endpoint/mod.rs index b7a0392d..acad748d 100644 --- a/src/app/endpoint/mod.rs +++ b/src/app/endpoint/mod.rs @@ -23,16 +23,16 @@ pub(crate) type MqttResult = StdResult; pub(crate) trait RequestHandler { type Payload: Send + DeserializeOwned; - async fn handle( - context: &mut C, + async fn handle<'a, C: Context + Sync + Send>( + context: &'a mut C, payload: Self::Payload, - reqp: RequestParams<'_>, + reqp: RequestParams<'a>, ) -> RequestResult; } macro_rules! request_routes { ($($m: pat => $h: ty),*) => { - pub(crate) async fn route_request( + pub(crate) async fn route_request( context: &mut C, request: &IncomingRequest, ) -> Option { @@ -102,7 +102,7 @@ pub(crate) trait ResponseHandler { pub(crate) trait EventHandler { type Payload: Send + DeserializeOwned; - async fn handle( + async fn handle( context: &mut C, payload: Self::Payload, evp: &IncomingEventProperties, @@ -112,7 +112,7 @@ pub(crate) trait EventHandler { macro_rules! event_routes { ($($l: pat => $h: ty),*) => { #[allow(unused_variables)] - pub(crate) async fn route_event( + pub(crate) async fn route_event( context: &mut C, event: &IncomingEvent, ) -> Option { diff --git a/src/app/endpoint/room.rs b/src/app/endpoint/room.rs index 05d9544d..fac70c23 100644 --- a/src/app/endpoint/room.rs +++ b/src/app/endpoint/room.rs @@ -69,10 +69,10 @@ impl RequestHandler for CreateHandler { type Payload = CreateRequest; #[instrument(skip_all, fields(room_id, scope, classroom_id))] - async fn handle( - context: &mut C, + async fn handle<'a, C: Context + Sync + Send>( + context: &'a mut C, payload: Self::Payload, - reqp: RequestParams<'_>, + reqp: RequestParams<'a>, ) -> RequestResult { // Validate opening time. match RoomTime::new(payload.time) { @@ -195,10 +195,10 @@ impl RequestHandler for ReadHandler { room_id = %payload.id, scope, classroom_id ) )] - async fn handle( - context: &mut C, + async fn handle<'a, C: Context + Sync + Send>( + context: &'a mut C, payload: Self::Payload, - reqp: RequestParams<'_>, + reqp: RequestParams<'a>, ) -> RequestResult { let room = helpers::find_room(context, payload.id, helpers::RoomTimeRequirement::Any).await?; @@ -269,10 +269,10 @@ impl RequestHandler for UpdateHandler { type Payload = UpdateRequest; #[instrument(skip_all, fields(room_id, scope, classroom_id))] - async fn handle( - context: &mut C, + async fn handle<'a, C: Context + Sync + Send>( + context: &'a mut C, Self::Payload { id, payload }: Self::Payload, - reqp: RequestParams<'_>, + reqp: RequestParams<'a>, ) -> RequestResult { let time_requirement = if payload.time.is_some() { // Forbid changing time of a closed room. @@ -430,10 +430,10 @@ impl RequestHandler for EnterHandler { room_id = %payload.id, scope, classroom_id ) )] - async fn handle( - context: &mut C, + async fn handle<'a, C: Context + Sync + Send>( + context: &'a mut C, payload: Self::Payload, - reqp: RequestParams<'_>, + reqp: RequestParams<'a>, ) -> RequestResult { let room = helpers::find_room(context, payload.id, helpers::RoomTimeRequirement::Open).await?; @@ -582,10 +582,10 @@ impl RequestHandler for LockedTypesHandler { type Payload = LockedTypesRequest; #[instrument(skip_all, fields(room_id, scope, classroom_id))] - async fn handle( - context: &mut C, + async fn handle<'a, C: Context + Sync + Send>( + context: &'a mut C, Self::Payload { id, payload }: Self::Payload, - reqp: RequestParams<'_>, + reqp: RequestParams<'a>, ) -> RequestResult { // Find realtime room. let room = helpers::find_room(context, id, helpers::RoomTimeRequirement::Any).await?; @@ -696,10 +696,10 @@ impl RequestHandler for WhiteboardAccessHandler { type Payload = WhiteboardAccessRequest; #[instrument(skip_all, fields(room_id, scope, classroom_id))] - async fn handle( - context: &mut C, + async fn handle<'a, C: Context + Sync + Send>( + context: &'a mut C, Self::Payload { id, payload }: Self::Payload, - reqp: RequestParams<'_>, + reqp: RequestParams<'a>, ) -> RequestResult { // Find realtime room. let room = helpers::find_room(context, id, helpers::RoomTimeRequirement::Any).await?; @@ -819,10 +819,10 @@ impl RequestHandler for AdjustHandler { type Payload = AdjustRequest; #[instrument(skip_all, fields(room_id, scope, classroom_id))] - async fn handle( - context: &mut C, + async fn handle<'a, C: Context + Sync + Send>( + context: &'a mut C, Self::Payload { id, payload }: Self::Payload, - reqp: RequestParams<'_>, + reqp: RequestParams<'a>, ) -> RequestResult { // Find realtime room. let room = helpers::find_room(context, id, helpers::RoomTimeRequirement::Any).await?; diff --git a/src/app/endpoint/room/dump_events.rs b/src/app/endpoint/room/dump_events.rs index f89eac27..9a17d7b0 100644 --- a/src/app/endpoint/room/dump_events.rs +++ b/src/app/endpoint/room/dump_events.rs @@ -65,10 +65,10 @@ pub(crate) struct EventsDumpHandler; impl RequestHandler for EventsDumpHandler { type Payload = EventsDumpRequest; - async fn handle( - context: &mut C, + async fn handle<'a, C: Context + Sync + Send>( + context: &'a mut C, payload: Self::Payload, - reqp: RequestParams<'_>, + reqp: RequestParams<'a>, ) -> RequestResult { let room = helpers::find_room(context, payload.id, helpers::RoomTimeRequirement::Any).await?; diff --git a/src/app/endpoint/state.rs b/src/app/endpoint/state.rs index 71e48820..27cff23d 100644 --- a/src/app/endpoint/state.rs +++ b/src/app/endpoint/state.rs @@ -62,10 +62,10 @@ impl RequestHandler for ReadHandler { type Payload = ReadRequest; #[instrument(skip_all, fields(room_id, scope, classroom_id))] - async fn handle( - context: &mut C, + async fn handle<'a, C: Context + Sync + Send>( + context: &'a mut C, Self::Payload { room_id, payload }: Self::Payload, - reqp: RequestParams<'_>, + reqp: RequestParams<'a>, ) -> RequestResult { Span::current().record("room_id", &display(room_id)); diff --git a/src/app/endpoint/subscription.rs b/src/app/endpoint/subscription.rs index 0720faae..6423d591 100644 --- a/src/app/endpoint/subscription.rs +++ b/src/app/endpoint/subscription.rs @@ -59,7 +59,7 @@ impl EventHandler for DeleteEventHandler { type Payload = DeleteEventPayload; #[instrument(skip_all, fields(room_id))] - async fn handle( + async fn handle( context: &mut C, payload: Self::Payload, evp: &IncomingEventProperties, @@ -136,7 +136,7 @@ impl EventHandler for BroadcastDeleteEventHandler { type Payload = DeleteEventPayload; #[instrument(skip_all, fields(room_id))] - async fn handle( + async fn handle( context: &mut C, payload: Self::Payload, evp: &IncomingEventProperties, diff --git a/src/app/endpoint/system.rs b/src/app/endpoint/system.rs index 1947b666..db3fe31f 100644 --- a/src/app/endpoint/system.rs +++ b/src/app/endpoint/system.rs @@ -18,10 +18,10 @@ pub(crate) struct VacuumHandler; impl RequestHandler for VacuumHandler { type Payload = VacuumRequest; - async fn handle( - context: &mut C, + async fn handle<'a, C: Context + Sync + Send>( + context: &'a mut C, _payload: Self::Payload, - reqp: RequestParams<'_>, + reqp: RequestParams<'a>, ) -> RequestResult { // Authz: only trusted subjects. let authz_time = context diff --git a/src/app/message_handler.rs b/src/app/message_handler.rs index 79c40288..7e27e979 100644 --- a/src/app/message_handler.rs +++ b/src/app/message_handler.rs @@ -223,7 +223,7 @@ pub(crate) fn publish_message(agent: &mut Agent, message: Message) -> Result<(), // We just need to specify the payload type and specific logic. pub(crate) trait RequestEnvelopeHandler<'async_trait> { - fn handle_envelope( + fn handle_envelope( context: &'async_trait mut C, request: &'async_trait IncomingRequest, ) -> Pin + Send + 'async_trait>>; @@ -235,7 +235,7 @@ pub(crate) trait RequestEnvelopeHandler<'async_trait> { impl<'async_trait, H: 'async_trait + Sync + endpoint::RequestHandler> RequestEnvelopeHandler<'async_trait> for H { - fn handle_envelope( + fn handle_envelope( context: &'async_trait mut C, request: &'async_trait IncomingRequest, ) -> Pin + Send + 'async_trait>> @@ -243,7 +243,7 @@ impl<'async_trait, H: 'async_trait + Sync + endpoint::RequestHandler> Self: Sync + 'async_trait, { // The actual implementation. - async fn handle_envelope( + async fn handle_envelope( context: &mut C, request: &IncomingRequest, ) -> MessageStream { @@ -282,7 +282,7 @@ impl<'async_trait, H: 'async_trait + Sync + endpoint::RequestHandler> // This is the same as with the above. pub(crate) trait ResponseEnvelopeHandler<'async_trait, CD> { - fn handle_envelope( + fn handle_envelope( context: &'async_trait mut C, envelope: &'async_trait IncomingResponse, corr_data: &'async_trait CD, @@ -292,7 +292,7 @@ pub(crate) trait ResponseEnvelopeHandler<'async_trait, CD> { impl<'async_trait, H: 'async_trait + endpoint::ResponseHandler> ResponseEnvelopeHandler<'async_trait, H::CorrelationData> for H { - fn handle_envelope( + fn handle_envelope( context: &'async_trait mut C, response: &'async_trait IncomingResponse, corr_data: &'async_trait H::CorrelationData, @@ -333,7 +333,7 @@ impl<'async_trait, H: 'async_trait + endpoint::ResponseHandler> } pub(crate) trait EventEnvelopeHandler<'async_trait> { - fn handle_envelope( + fn handle_envelope( context: &'async_trait mut C, envelope: &'async_trait IncomingEvent, ) -> Pin + Send + 'async_trait>>; @@ -343,12 +343,12 @@ pub(crate) trait EventEnvelopeHandler<'async_trait> { impl<'async_trait, H: 'async_trait + endpoint::EventHandler> EventEnvelopeHandler<'async_trait> for H { - fn handle_envelope( + fn handle_envelope( context: &'async_trait mut C, event: &'async_trait IncomingEvent, ) -> Pin + Send + 'async_trait>> { // The actual implementation. - async fn handle_envelope( + async fn handle_envelope( context: &mut C, event: &IncomingEvent, ) -> MessageStream { diff --git a/src/app/stage/mod.rs b/src/app/stage/mod.rs index 7eae3b47..a9df64e3 100644 --- a/src/app/stage/mod.rs +++ b/src/app/stage/mod.rs @@ -19,7 +19,7 @@ pub async fn route_message( ctx: Arc, msg: Arc, ) -> HandleMessageOutcome { - match do_route_msg(ctx, msg).await { + match do_route_msg(ctx.as_ref(), msg).await { Ok(_) => HandleMessageOutcome::Processed, Err(HandleMsgFailure::Transient(e)) => { tracing::error!(%e, "transient failure, retrying"); @@ -55,7 +55,7 @@ impl FailureKind for Result { } async fn do_route_msg( - ctx: Arc, + ctx: &(dyn GlobalContext + Sync + Send), msg: Arc, ) -> Result<(), HandleMsgFailure> { let subject = Subject::from_str(&msg.subject) @@ -93,10 +93,10 @@ async fn do_route_msg( let r = match event { Event::V1(EventV1::VideoGroup(e)) => { - handle_video_group(ctx.as_ref(), e, &room, subject, &headers).await + handle_video_group(ctx, e, &room, subject, &headers).await } Event::V1(EventV1::BanAccepted(e)) => { - handle_ban_accepted(ctx.as_ref(), e, &room, subject, &headers).await + handle_ban_accepted(ctx, e, &room, subject, &headers).await } _ => { // ignore From 3295df4989f726b7cb80b03def068b4a3c6087e4 Mon Sep 17 00:00:00 2001 From: Dmitry Shlagoff Date: Mon, 5 Jun 2023 17:11:25 +0200 Subject: [PATCH 07/20] Add note --- src/app/endpoint/mod.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/app/endpoint/mod.rs b/src/app/endpoint/mod.rs index acad748d..3c5b6dd9 100644 --- a/src/app/endpoint/mod.rs +++ b/src/app/endpoint/mod.rs @@ -23,6 +23,10 @@ pub(crate) type MqttResult = StdResult; pub(crate) trait RequestHandler { type Payload: Send + DeserializeOwned; + // this lifetime is not elided (e.g. RequestParams<'_') + // to avoid ICE with some rustc versions + // tested on 1.67 and 1.69 + // maybe other versions have this ICE as well async fn handle<'a, C: Context + Sync + Send>( context: &'a mut C, payload: Self::Payload, From f2cfa06803e7b66d0d011f29b82026650ac9bc53 Mon Sep 17 00:00:00 2001 From: Dmitry Shlagoff Date: Mon, 5 Jun 2023 17:20:40 +0200 Subject: [PATCH 08/20] Wrap event insert query in 'measure_query' --- src/app/stage/mod.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/app/stage/mod.rs b/src/app/stage/mod.rs index a9df64e3..9e7f987a 100644 --- a/src/app/stage/mod.rs +++ b/src/app/stage/mod.rs @@ -143,7 +143,7 @@ async fn handle_video_group( .map_err(|_| Error::from(ErrorKind::DbConnAcquisitionFailed)) .transient()?; - let result = db::event::InsertQuery::new( + let query = db::event::InsertQuery::new( room.id(), entity_type.to_string(), serde_json::json!({ entity_type: label }), @@ -154,9 +154,12 @@ async fn handle_video_group( .map_err(|e| e.kind(ErrorKind::InvalidEvent)) .permanent()? .entity_type(entity_type.to_string()) - .entity_event_id(entity_event_id) - .execute(&mut conn) - .await; + .entity_event_id(entity_event_id); + + let result = ctx + .metrics() + .measure_query(QueryKey::EventInsertQuery, query.execute(&mut conn)) + .await; if let Err(sqlx::Error::Database(ref err)) = result { if let Some("uniq_entity_type_entity_event_id") = err.constraint() { From 58235573d5a97439f553a7ff593d958ef3b94a9b Mon Sep 17 00:00:00 2001 From: Dmitry Shlagoff Date: Mon, 5 Jun 2023 17:24:58 +0200 Subject: [PATCH 09/20] Remove unused trait bounds --- src/app/context.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/app/context.rs b/src/app/context.rs index 2c2f3b4a..e6532eb3 100644 --- a/src/app/context.rs +++ b/src/app/context.rs @@ -188,13 +188,13 @@ impl<'a, C: GlobalContext> GlobalContext for AppMessageContext<'a, C> { } } -impl<'a, C: GlobalContext + Sync> MessageContext for AppMessageContext<'a, C> { +impl<'a, C: GlobalContext> MessageContext for AppMessageContext<'a, C> { fn start_timestamp(&self) -> DateTime { self.start_timestamp } } -impl<'a, C: GlobalContext + Sync> Context for AppMessageContext<'a, C> {} +impl<'a, C: GlobalContext> Context for AppMessageContext<'a, C> {} /////////////////////////////////////////////////////////////////////////////// From 79b4227ba0bbefee58722a370c8221524515bf3a Mon Sep 17 00:00:00 2001 From: Dmitry Shlagoff Date: Tue, 6 Jun 2023 14:46:14 +0200 Subject: [PATCH 10/20] Fix for updated versions --- sqlx-data.json | 132 +++++++++++++++++++++++++++++ src/app/endpoint/change/create.rs | 6 +- src/app/endpoint/change/delete.rs | 6 +- src/app/endpoint/change/list.rs | 6 +- src/app/endpoint/edition/commit.rs | 6 +- src/app/endpoint/edition/create.rs | 6 +- src/app/endpoint/edition/delete.rs | 6 +- src/app/endpoint/edition/list.rs | 6 +- src/app/stage/mod.rs | 65 +++----------- src/test_helpers/context.rs | 14 ++- 10 files changed, 176 insertions(+), 77 deletions(-) diff --git a/sqlx-data.json b/sqlx-data.json index b3aa7bfa..77b583e0 100644 --- a/sqlx-data.json +++ b/sqlx-data.json @@ -1185,6 +1185,138 @@ }, "query": "\n INSERT INTO adjustment (room_id, started_at, segments, \"offset\")\n VALUES ($1, $2, $3, $4)\n RETURNING\n room_id,\n started_at,\n segments AS \"segments!: Segments\",\n \"offset\",\n created_at\n " }, + "4655b9d7ff2c98f8ab757e13c267f9714428f2db508ffac956b642a05c8d7371": { + "describe": { + "columns": [ + { + "name": "id", + "ordinal": 0, + "type_info": "Uuid" + }, + { + "name": "agent_id!: AgentId", + "ordinal": 1, + "type_info": { + "Custom": { + "kind": { + "Composite": [ + [ + "account_id", + { + "Custom": { + "kind": { + "Composite": [ + [ + "label", + "Text" + ], + [ + "audience", + "Text" + ] + ] + }, + "name": "account_id" + } + } + ], + [ + "label", + "Text" + ] + ] + }, + "name": "agent_id" + } + } + }, + { + "name": "room_id", + "ordinal": 2, + "type_info": "Uuid" + }, + { + "name": "status!: Status", + "ordinal": 3, + "type_info": { + "Custom": { + "kind": { + "Enum": [ + "in_progress", + "ready" + ] + }, + "name": "agent_status" + } + } + }, + { + "name": "created_at", + "ordinal": 4, + "type_info": "Timestamptz" + } + ], + "nullable": [ + false, + false, + false, + false, + false + ], + "parameters": { + "Left": [ + { + "Custom": { + "kind": { + "Composite": [ + [ + "account_id", + { + "Custom": { + "kind": { + "Composite": [ + [ + "label", + "Text" + ], + [ + "audience", + "Text" + ] + ] + }, + "name": "account_id" + } + } + ], + [ + "label", + "Text" + ] + ] + }, + "name": "agent_id" + } + }, + "Uuid", + { + "Custom": { + "kind": { + "Enum": [ + "in_progress", + "ready" + ] + }, + "name": "agent_status" + } + }, + "Int8", + "Int8" + ] + } + }, + "query": "\n SELECT\n id,\n agent_id AS \"agent_id!: AgentId\",\n room_id,\n status AS \"status!: Status\",\n created_at\n FROM agent\n WHERE ($1::agent_id IS NULL OR agent_id = $1)\n AND ($2::uuid IS NULL OR room_id = $2)\n AND ($3::agent_status IS NULL OR status = $3)\n ORDER BY created_at DESC LIMIT $4 OFFSET $5\n " + }, "6039c8fc3c3a35e4104b47ef5d3763a5dabb4e7e8a42ddc8f8f5fe787919b28e": { "describe": { "columns": [ diff --git a/src/app/endpoint/change/create.rs b/src/app/endpoint/change/create.rs index b47809b5..eb849353 100644 --- a/src/app/endpoint/change/create.rs +++ b/src/app/endpoint/change/create.rs @@ -48,10 +48,10 @@ impl RequestHandler for CreateHandler { scope, room_id, classroom_id, change_id ) )] - async fn handle( - context: &mut C, + async fn handle<'a, C: Context + Sync + Send>( + context: &'a mut C, payload: Self::Payload, - reqp: RequestParams<'_>, + reqp: RequestParams<'a>, ) -> RequestResult { let (_edition, room) = { let query = db::edition::FindWithRoomQuery::new(payload.edition_id); diff --git a/src/app/endpoint/change/delete.rs b/src/app/endpoint/change/delete.rs index f2a30fd8..352e9dde 100644 --- a/src/app/endpoint/change/delete.rs +++ b/src/app/endpoint/change/delete.rs @@ -48,10 +48,10 @@ impl RequestHandler for DeleteHandler { scope, room_id, classroom_id, edition_id ) )] - async fn handle( - context: &mut C, + async fn handle<'a, C: Context + Sync + Send>( + context: &'a mut C, payload: Self::Payload, - reqp: RequestParams<'_>, + reqp: RequestParams<'a>, ) -> RequestResult { let (change, room) = { let query = db::change::FindWithRoomQuery::new(payload.id); diff --git a/src/app/endpoint/change/list.rs b/src/app/endpoint/change/list.rs index 49e24a0d..3cfdc7c0 100644 --- a/src/app/endpoint/change/list.rs +++ b/src/app/endpoint/change/list.rs @@ -50,10 +50,10 @@ impl RequestHandler for ListHandler { type Payload = ListRequest; #[instrument(skip_all, fields(edition_id, scope, room_id, classroom_id, change_id))] - async fn handle( - context: &mut C, + async fn handle<'a, C: Context + Sync + Send>( + context: &'a mut C, Self::Payload { id, payload }: Self::Payload, - reqp: RequestParams<'_>, + reqp: RequestParams<'a>, ) -> RequestResult { Span::current().record("edition_id", &display(id)); let (edition, room) = { diff --git a/src/app/endpoint/edition/commit.rs b/src/app/endpoint/edition/commit.rs index eab9fe71..a3dd0862 100644 --- a/src/app/endpoint/edition/commit.rs +++ b/src/app/endpoint/edition/commit.rs @@ -56,13 +56,13 @@ impl RequestHandler for CommitHandler { type Payload = CommitRequest; #[instrument(skip_all, fields(edition_id, offset, room_id, scope, classroom_id,))] - async fn handle( - context: &mut C, + async fn handle<'a, C: Context + Sync + Send>( + context: &'a mut C, CommitRequest { id, payload: CommitPayload { offset }, }: Self::Payload, - reqp: RequestParams<'_>, + reqp: RequestParams<'a>, ) -> RequestResult { Span::current().record("edition_id", &display(id)); Span::current().record("offset", &display(offset)); diff --git a/src/app/endpoint/edition/create.rs b/src/app/endpoint/edition/create.rs index f46ccdae..35593342 100644 --- a/src/app/endpoint/edition/create.rs +++ b/src/app/endpoint/edition/create.rs @@ -46,10 +46,10 @@ impl RequestHandler for CreateHandler { scope, classroom_id, edition_id ) )] - async fn handle( - context: &mut C, + async fn handle<'a, C: Context + Sync + Send>( + context: &'a mut C, payload: Self::Payload, - reqp: RequestParams<'_>, + reqp: RequestParams<'a>, ) -> RequestResult { let room = helpers::find_room(context, payload.room_id, helpers::RoomTimeRequirement::Any).await?; diff --git a/src/app/endpoint/edition/delete.rs b/src/app/endpoint/edition/delete.rs index b03c86e4..df2d468c 100644 --- a/src/app/endpoint/edition/delete.rs +++ b/src/app/endpoint/edition/delete.rs @@ -46,10 +46,10 @@ impl RequestHandler for DeleteHandler { room_id, scope, classroom_id ) )] - async fn handle( - context: &mut C, + async fn handle<'a, C: Context + Sync + Send>( + context: &'a mut C, payload: Self::Payload, - reqp: RequestParams<'_>, + reqp: RequestParams<'a>, ) -> RequestResult { let (edition, room) = { let query = db::edition::FindWithRoomQuery::new(payload.id); diff --git a/src/app/endpoint/edition/list.rs b/src/app/endpoint/edition/list.rs index 225eb329..3d7d134f 100644 --- a/src/app/endpoint/edition/list.rs +++ b/src/app/endpoint/edition/list.rs @@ -50,10 +50,10 @@ impl RequestHandler for ListHandler { type Payload = ListRequest; #[instrument(skip_all, fields(room_id, scope, classroom_id))] - async fn handle( - context: &mut C, + async fn handle<'a, C: Context + Sync + Send>( + context: &'a mut C, Self::Payload { room_id, payload }: Self::Payload, - reqp: RequestParams<'_>, + reqp: RequestParams<'a>, ) -> RequestResult { let room = helpers::find_room(context, room_id, helpers::RoomTimeRequirement::Any).await?; diff --git a/src/app/stage/mod.rs b/src/app/stage/mod.rs index 9e7f987a..ac9268f3 100644 --- a/src/app/stage/mod.rs +++ b/src/app/stage/mod.rs @@ -6,7 +6,10 @@ use svc_events::{ ban::{BanAcceptedV1, BanCollaborationCompletedV1}, Event, EventV1, VideoGroupEventV1, }; -use svc_nats_client::{consumer::HandleMessageOutcome, Subject}; +use svc_nats_client::{ + consumer::{FailureKind, FailureKindExt, HandleMessageFailure}, + Subject, +}; use crate::{db, metrics::QueryKey}; @@ -18,46 +21,7 @@ use super::{ pub async fn route_message( ctx: Arc, msg: Arc, -) -> HandleMessageOutcome { - match do_route_msg(ctx.as_ref(), msg).await { - Ok(_) => HandleMessageOutcome::Processed, - Err(HandleMsgFailure::Transient(e)) => { - tracing::error!(%e, "transient failure, retrying"); - HandleMessageOutcome::ProcessLater - } - Err(HandleMsgFailure::Permanent(e)) => { - tracing::error!(%e, "permanent failure, won't process"); - HandleMessageOutcome::WontProcess - } - } -} - -pub enum HandleMsgFailure { - Transient(E), - Permanent(E), -} - -trait FailureKind { - /// This error can be fixed by retrying later. - fn transient(self) -> Result>; - /// This error can't be fixed by retrying later (parse failure, unknown id, etc). - fn permanent(self) -> Result>; -} - -impl FailureKind for Result { - fn transient(self) -> Result> { - self.map_err(|e| HandleMsgFailure::Transient(e)) - } - - fn permanent(self) -> Result> { - self.map_err(|e| HandleMsgFailure::Permanent(e)) - } -} - -async fn do_route_msg( - ctx: &(dyn GlobalContext + Sync + Send), - msg: Arc, -) -> Result<(), HandleMsgFailure> { +) -> Result<(), HandleMessageFailure> { let subject = Subject::from_str(&msg.subject) .context("parse nats subject") .permanent()?; @@ -93,10 +57,10 @@ async fn do_route_msg( let r = match event { Event::V1(EventV1::VideoGroup(e)) => { - handle_video_group(ctx, e, &room, subject, &headers).await + handle_video_group(ctx.as_ref(), e, &room, subject, &headers).await } Event::V1(EventV1::BanAccepted(e)) => { - handle_ban_accepted(ctx, e, &room, subject, &headers).await + handle_ban_accepted(ctx.as_ref(), e, &room, subject, &headers).await } _ => { // ignore @@ -104,14 +68,7 @@ async fn do_route_msg( } }; - match r { - Ok(_) => Ok(()), - Err(HandleMsgFailure::Transient(e)) => Err(HandleMsgFailure::Transient(anyhow!(e))), - Err(HandleMsgFailure::Permanent(e)) => { - e.notify_sentry(); - Err(HandleMsgFailure::Permanent(anyhow!(e))) - } - } + FailureKindExt::map_err(r, |e| anyhow!(e)) } async fn handle_video_group( @@ -120,7 +77,7 @@ async fn handle_video_group( room: &db::room::Object, subject: Subject, headers: &svc_nats_client::Headers, -) -> Result<(), HandleMsgFailure> { +) -> Result<(), HandleMessageFailure> { let (label, created_at) = (e.as_label().to_owned(), e.created_at()); let entity_type = subject.entity_type(); let agent_id = headers.sender_id(); @@ -174,7 +131,7 @@ async fn handle_video_group( } if let Err(err) = result { - return Err(HandleMsgFailure::Transient(Error::new( + return Err(HandleMessageFailure::Transient(Error::new( super::error::ErrorKind::DbQueryFailed, anyhow!("failed to create event from nats: {}", err), ))); @@ -189,7 +146,7 @@ async fn handle_ban_accepted( room: &db::room::Object, subject: Subject, headers: &svc_nats_client::Headers, -) -> Result<(), HandleMsgFailure> { +) -> Result<(), HandleMessageFailure> { let mut conn = ctx.get_conn().await.transient()?; if e.ban { diff --git a/src/test_helpers/context.rs b/src/test_helpers/context.rs index c51decb7..1d1e9fa6 100644 --- a/src/test_helpers/context.rs +++ b/src/test_helpers/context.rs @@ -8,7 +8,8 @@ use sqlx::postgres::PgPool as Db; use svc_agent::{queue_counter::QueueCounterHandle, AgentId}; use svc_authz::cache::ConnectionPool as RedisConnectionPool; use svc_nats_client::{ - Event, Message, MessageStream, NatsClient, PublishError, SubscribeError, TermMessageError, + AckPolicy, DeliverPolicy, Event, Message, MessageStream, Messages, NatsClient, PublishError, + Subject, SubscribeError, TermMessageError, }; use crate::{ @@ -148,7 +149,16 @@ impl NatsClient for TestNatsClient { Ok(()) } - async fn subscribe(&self) -> Result { + async fn subscribe_durable(&self) -> Result { + unimplemented!() + } + + async fn subscribe_ephemeral( + &self, + _subject: Subject, + _deliver_policy: DeliverPolicy, + _ack_policy: AckPolicy, + ) -> Result { unimplemented!() } From 7cbd69b80d9d7505f92c8c58e861562ac1557619 Mon Sep 17 00:00:00 2001 From: Dmitry Shlagoff Date: Wed, 7 Jun 2023 11:34:22 +0200 Subject: [PATCH 11/20] Elide lifetime in trait def --- docker/Dockerfile | 2 +- docker/migration.dockerfile | 4 +-- src/app/endpoint/agent.rs | 12 ++++---- src/app/endpoint/ban.rs | 6 ++-- src/app/endpoint/change/create.rs | 6 ++-- src/app/endpoint/change/delete.rs | 6 ++-- src/app/endpoint/change/list.rs | 6 ++-- src/app/endpoint/edition/commit.rs | 6 ++-- src/app/endpoint/edition/create.rs | 6 ++-- src/app/endpoint/edition/delete.rs | 6 ++-- src/app/endpoint/edition/list.rs | 6 ++-- src/app/endpoint/event.rs | 12 ++++---- src/app/endpoint/mod.rs | 10 ++----- src/app/endpoint/room.rs | 42 ++++++++++++++-------------- src/app/endpoint/room/dump_events.rs | 6 ++-- src/app/endpoint/state.rs | 6 ++-- src/app/endpoint/system.rs | 6 ++-- 17 files changed, 72 insertions(+), 76 deletions(-) diff --git a/docker/Dockerfile b/docker/Dockerfile index 630296ad..773f1aa3 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -1,7 +1,7 @@ ## ----------------------------------------------------------------------------- ## Build ## ----------------------------------------------------------------------------- -FROM rust:1.68.0-slim-buster as build-stage +FROM rust:1.70.0-slim-buster as build-stage RUN apt update && apt install -y --no-install-recommends \ pkg-config \ diff --git a/docker/migration.dockerfile b/docker/migration.dockerfile index 81e56577..88166540 100644 --- a/docker/migration.dockerfile +++ b/docker/migration.dockerfile @@ -1,4 +1,4 @@ -FROM rust:1.68.0-slim-buster +FROM rust:1.70.0-slim-buster RUN apt update && apt install -y --no-install-recommends \ pkg-config \ @@ -6,7 +6,7 @@ RUN apt update && apt install -y --no-install-recommends \ libcurl4-openssl-dev \ libpq-dev -RUN cargo install sqlx-cli --version 0.5.7 --no-default-features --features postgres +RUN cargo install sqlx-cli --version 0.6.3 --no-default-features --features postgres WORKDIR /app CMD ["cargo", "sqlx", "migrate", "run"] COPY ./migrations /app/migrations diff --git a/src/app/endpoint/agent.rs b/src/app/endpoint/agent.rs index 811354cc..ab2ff04c 100644 --- a/src/app/endpoint/agent.rs +++ b/src/app/endpoint/agent.rs @@ -61,10 +61,10 @@ pub(crate) struct ListHandler; impl RequestHandler for ListHandler { type Payload = ListRequest; - async fn handle<'a, C: Context + Sync + Send>( - context: &'a mut C, + async fn handle( + context: &mut C, Self::Payload { room_id, payload }: Self::Payload, - reqp: RequestParams<'a>, + reqp: RequestParams<'_>, ) -> RequestResult { let room = helpers::find_room(context, room_id, helpers::RoomTimeRequirement::Open).await?; @@ -173,10 +173,10 @@ impl RequestHandler for UpdateHandler { type Payload = UpdateRequest; #[instrument(skip_all, fields(scope, room_id, classroom_id))] - async fn handle<'a, C: Context + Sync + Send>( - context: &'a mut C, + async fn handle( + context: &mut C, Self::Payload { room_id, payload }: Self::Payload, - reqp: RequestParams<'a>, + reqp: RequestParams<'_>, ) -> RequestResult { let room = helpers::find_room(context, room_id, helpers::RoomTimeRequirement::Open).await?; diff --git a/src/app/endpoint/ban.rs b/src/app/endpoint/ban.rs index 88dd1898..2cd93c5b 100644 --- a/src/app/endpoint/ban.rs +++ b/src/app/endpoint/ban.rs @@ -42,10 +42,10 @@ pub(crate) struct ListHandler; impl RequestHandler for ListHandler { type Payload = ListRequest; - async fn handle<'a, C: Context + Sync + Send>( - context: &'a mut C, + async fn handle( + context: &mut C, Self::Payload { room_id }: Self::Payload, - reqp: RequestParams<'a>, + reqp: RequestParams<'_>, ) -> RequestResult { let room = helpers::find_room(context, room_id, helpers::RoomTimeRequirement::Open).await?; diff --git a/src/app/endpoint/change/create.rs b/src/app/endpoint/change/create.rs index eb849353..56a51047 100644 --- a/src/app/endpoint/change/create.rs +++ b/src/app/endpoint/change/create.rs @@ -48,10 +48,10 @@ impl RequestHandler for CreateHandler { scope, room_id, classroom_id, change_id ) )] - async fn handle<'a, C: Context + Sync + Send>( - context: &'a mut C, + async fn handle( + context: &mut C, payload: Self::Payload, - reqp: RequestParams<'a>, + reqp: RequestParams<'_>, ) -> RequestResult { let (_edition, room) = { let query = db::edition::FindWithRoomQuery::new(payload.edition_id); diff --git a/src/app/endpoint/change/delete.rs b/src/app/endpoint/change/delete.rs index 352e9dde..5192645a 100644 --- a/src/app/endpoint/change/delete.rs +++ b/src/app/endpoint/change/delete.rs @@ -48,10 +48,10 @@ impl RequestHandler for DeleteHandler { scope, room_id, classroom_id, edition_id ) )] - async fn handle<'a, C: Context + Sync + Send>( - context: &'a mut C, + async fn handle( + context: &mut C, payload: Self::Payload, - reqp: RequestParams<'a>, + reqp: RequestParams<'_>, ) -> RequestResult { let (change, room) = { let query = db::change::FindWithRoomQuery::new(payload.id); diff --git a/src/app/endpoint/change/list.rs b/src/app/endpoint/change/list.rs index 3cfdc7c0..2310647c 100644 --- a/src/app/endpoint/change/list.rs +++ b/src/app/endpoint/change/list.rs @@ -50,10 +50,10 @@ impl RequestHandler for ListHandler { type Payload = ListRequest; #[instrument(skip_all, fields(edition_id, scope, room_id, classroom_id, change_id))] - async fn handle<'a, C: Context + Sync + Send>( - context: &'a mut C, + async fn handle( + context: &mut C, Self::Payload { id, payload }: Self::Payload, - reqp: RequestParams<'a>, + reqp: RequestParams<'_>, ) -> RequestResult { Span::current().record("edition_id", &display(id)); let (edition, room) = { diff --git a/src/app/endpoint/edition/commit.rs b/src/app/endpoint/edition/commit.rs index a3dd0862..9fa18b66 100644 --- a/src/app/endpoint/edition/commit.rs +++ b/src/app/endpoint/edition/commit.rs @@ -56,13 +56,13 @@ impl RequestHandler for CommitHandler { type Payload = CommitRequest; #[instrument(skip_all, fields(edition_id, offset, room_id, scope, classroom_id,))] - async fn handle<'a, C: Context + Sync + Send>( - context: &'a mut C, + async fn handle( + context: &mut C, CommitRequest { id, payload: CommitPayload { offset }, }: Self::Payload, - reqp: RequestParams<'a>, + reqp: RequestParams<'_>, ) -> RequestResult { Span::current().record("edition_id", &display(id)); Span::current().record("offset", &display(offset)); diff --git a/src/app/endpoint/edition/create.rs b/src/app/endpoint/edition/create.rs index 35593342..52d4915a 100644 --- a/src/app/endpoint/edition/create.rs +++ b/src/app/endpoint/edition/create.rs @@ -46,10 +46,10 @@ impl RequestHandler for CreateHandler { scope, classroom_id, edition_id ) )] - async fn handle<'a, C: Context + Sync + Send>( - context: &'a mut C, + async fn handle( + context: &mut C, payload: Self::Payload, - reqp: RequestParams<'a>, + reqp: RequestParams<'_>, ) -> RequestResult { let room = helpers::find_room(context, payload.room_id, helpers::RoomTimeRequirement::Any).await?; diff --git a/src/app/endpoint/edition/delete.rs b/src/app/endpoint/edition/delete.rs index df2d468c..2c74fb2f 100644 --- a/src/app/endpoint/edition/delete.rs +++ b/src/app/endpoint/edition/delete.rs @@ -46,10 +46,10 @@ impl RequestHandler for DeleteHandler { room_id, scope, classroom_id ) )] - async fn handle<'a, C: Context + Sync + Send>( - context: &'a mut C, + async fn handle( + context: &mut C, payload: Self::Payload, - reqp: RequestParams<'a>, + reqp: RequestParams<'_>, ) -> RequestResult { let (edition, room) = { let query = db::edition::FindWithRoomQuery::new(payload.id); diff --git a/src/app/endpoint/edition/list.rs b/src/app/endpoint/edition/list.rs index 3d7d134f..957ce636 100644 --- a/src/app/endpoint/edition/list.rs +++ b/src/app/endpoint/edition/list.rs @@ -50,10 +50,10 @@ impl RequestHandler for ListHandler { type Payload = ListRequest; #[instrument(skip_all, fields(room_id, scope, classroom_id))] - async fn handle<'a, C: Context + Sync + Send>( - context: &'a mut C, + async fn handle( + context: &mut C, Self::Payload { room_id, payload }: Self::Payload, - reqp: RequestParams<'a>, + reqp: RequestParams<'_>, ) -> RequestResult { let room = helpers::find_room(context, room_id, helpers::RoomTimeRequirement::Any).await?; diff --git a/src/app/endpoint/event.rs b/src/app/endpoint/event.rs index 6d5c4d83..5759eecc 100644 --- a/src/app/endpoint/event.rs +++ b/src/app/endpoint/event.rs @@ -85,10 +85,10 @@ impl RequestHandler for CreateHandler { type Payload = CreateRequest; #[instrument(skip_all, fields(room_id, scope, classroom_id))] - async fn handle<'a, C: Context + Sync + Send>( - context: &'a mut C, + async fn handle( + context: &mut C, Self::Payload { room_id, payload }: Self::Payload, - reqp: RequestParams<'a>, + reqp: RequestParams<'_>, ) -> RequestResult { let (room, author) = { let room = @@ -355,10 +355,10 @@ impl RequestHandler for ListHandler { type Payload = ListRequest; #[instrument(skip_all, fields(room_id, scope, classroom_id))] - async fn handle<'a, C: Context + Sync + Send>( - context: &'a mut C, + async fn handle( + context: &mut C, Self::Payload { room_id, payload }: Self::Payload, - reqp: RequestParams<'a>, + reqp: RequestParams<'_>, ) -> RequestResult { let room = helpers::find_room(context, room_id, helpers::RoomTimeRequirement::Any).await?; diff --git a/src/app/endpoint/mod.rs b/src/app/endpoint/mod.rs index 3c5b6dd9..81b0bf47 100644 --- a/src/app/endpoint/mod.rs +++ b/src/app/endpoint/mod.rs @@ -23,14 +23,10 @@ pub(crate) type MqttResult = StdResult; pub(crate) trait RequestHandler { type Payload: Send + DeserializeOwned; - // this lifetime is not elided (e.g. RequestParams<'_') - // to avoid ICE with some rustc versions - // tested on 1.67 and 1.69 - // maybe other versions have this ICE as well - async fn handle<'a, C: Context + Sync + Send>( - context: &'a mut C, + async fn handle( + context: &mut C, payload: Self::Payload, - reqp: RequestParams<'a>, + reqp: RequestParams<'_>, ) -> RequestResult; } diff --git a/src/app/endpoint/room.rs b/src/app/endpoint/room.rs index fac70c23..e5bde245 100644 --- a/src/app/endpoint/room.rs +++ b/src/app/endpoint/room.rs @@ -69,10 +69,10 @@ impl RequestHandler for CreateHandler { type Payload = CreateRequest; #[instrument(skip_all, fields(room_id, scope, classroom_id))] - async fn handle<'a, C: Context + Sync + Send>( - context: &'a mut C, + async fn handle( + context: &mut C, payload: Self::Payload, - reqp: RequestParams<'a>, + reqp: RequestParams<'_>, ) -> RequestResult { // Validate opening time. match RoomTime::new(payload.time) { @@ -195,10 +195,10 @@ impl RequestHandler for ReadHandler { room_id = %payload.id, scope, classroom_id ) )] - async fn handle<'a, C: Context + Sync + Send>( - context: &'a mut C, + async fn handle( + context: &mut C, payload: Self::Payload, - reqp: RequestParams<'a>, + reqp: RequestParams<'_>, ) -> RequestResult { let room = helpers::find_room(context, payload.id, helpers::RoomTimeRequirement::Any).await?; @@ -269,10 +269,10 @@ impl RequestHandler for UpdateHandler { type Payload = UpdateRequest; #[instrument(skip_all, fields(room_id, scope, classroom_id))] - async fn handle<'a, C: Context + Sync + Send>( - context: &'a mut C, + async fn handle( + context: &mut C, Self::Payload { id, payload }: Self::Payload, - reqp: RequestParams<'a>, + reqp: RequestParams<'_>, ) -> RequestResult { let time_requirement = if payload.time.is_some() { // Forbid changing time of a closed room. @@ -430,10 +430,10 @@ impl RequestHandler for EnterHandler { room_id = %payload.id, scope, classroom_id ) )] - async fn handle<'a, C: Context + Sync + Send>( - context: &'a mut C, + async fn handle( + context: &mut C, payload: Self::Payload, - reqp: RequestParams<'a>, + reqp: RequestParams<'_>, ) -> RequestResult { let room = helpers::find_room(context, payload.id, helpers::RoomTimeRequirement::Open).await?; @@ -582,10 +582,10 @@ impl RequestHandler for LockedTypesHandler { type Payload = LockedTypesRequest; #[instrument(skip_all, fields(room_id, scope, classroom_id))] - async fn handle<'a, C: Context + Sync + Send>( - context: &'a mut C, + async fn handle( + context: &mut C, Self::Payload { id, payload }: Self::Payload, - reqp: RequestParams<'a>, + reqp: RequestParams<'_>, ) -> RequestResult { // Find realtime room. let room = helpers::find_room(context, id, helpers::RoomTimeRequirement::Any).await?; @@ -696,10 +696,10 @@ impl RequestHandler for WhiteboardAccessHandler { type Payload = WhiteboardAccessRequest; #[instrument(skip_all, fields(room_id, scope, classroom_id))] - async fn handle<'a, C: Context + Sync + Send>( - context: &'a mut C, + async fn handle( + context: &mut C, Self::Payload { id, payload }: Self::Payload, - reqp: RequestParams<'a>, + reqp: RequestParams<'_>, ) -> RequestResult { // Find realtime room. let room = helpers::find_room(context, id, helpers::RoomTimeRequirement::Any).await?; @@ -819,10 +819,10 @@ impl RequestHandler for AdjustHandler { type Payload = AdjustRequest; #[instrument(skip_all, fields(room_id, scope, classroom_id))] - async fn handle<'a, C: Context + Sync + Send>( - context: &'a mut C, + async fn handle( + context: &mut C, Self::Payload { id, payload }: Self::Payload, - reqp: RequestParams<'a>, + reqp: RequestParams<'_>, ) -> RequestResult { // Find realtime room. let room = helpers::find_room(context, id, helpers::RoomTimeRequirement::Any).await?; diff --git a/src/app/endpoint/room/dump_events.rs b/src/app/endpoint/room/dump_events.rs index 9a17d7b0..f17f10c6 100644 --- a/src/app/endpoint/room/dump_events.rs +++ b/src/app/endpoint/room/dump_events.rs @@ -65,10 +65,10 @@ pub(crate) struct EventsDumpHandler; impl RequestHandler for EventsDumpHandler { type Payload = EventsDumpRequest; - async fn handle<'a, C: Context + Sync + Send>( - context: &'a mut C, + async fn handle( + context: &mut C, payload: Self::Payload, - reqp: RequestParams<'a>, + reqp: RequestParams<'_>, ) -> RequestResult { let room = helpers::find_room(context, payload.id, helpers::RoomTimeRequirement::Any).await?; diff --git a/src/app/endpoint/state.rs b/src/app/endpoint/state.rs index 27cff23d..e72ea577 100644 --- a/src/app/endpoint/state.rs +++ b/src/app/endpoint/state.rs @@ -62,10 +62,10 @@ impl RequestHandler for ReadHandler { type Payload = ReadRequest; #[instrument(skip_all, fields(room_id, scope, classroom_id))] - async fn handle<'a, C: Context + Sync + Send>( - context: &'a mut C, + async fn handle( + context: &mut C, Self::Payload { room_id, payload }: Self::Payload, - reqp: RequestParams<'a>, + reqp: RequestParams<'_>, ) -> RequestResult { Span::current().record("room_id", &display(room_id)); diff --git a/src/app/endpoint/system.rs b/src/app/endpoint/system.rs index db3fe31f..1f9e2478 100644 --- a/src/app/endpoint/system.rs +++ b/src/app/endpoint/system.rs @@ -18,10 +18,10 @@ pub(crate) struct VacuumHandler; impl RequestHandler for VacuumHandler { type Payload = VacuumRequest; - async fn handle<'a, C: Context + Sync + Send>( - context: &'a mut C, + async fn handle( + context: &mut C, _payload: Self::Payload, - reqp: RequestParams<'a>, + reqp: RequestParams<'_>, ) -> RequestResult { // Authz: only trusted subjects. let authz_time = context From 81d43d950cee23945727daa67d0ea0b55d32ee9d Mon Sep 17 00:00:00 2001 From: Dmitry Shlagoff Date: Wed, 7 Jun 2023 12:35:27 +0200 Subject: [PATCH 12/20] Use latest crate versions from crates.io --- Cargo.lock | 66 ++++++++++++++++++++++++++++++++++++------------------ Cargo.toml | 4 ++-- 2 files changed, 46 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8cce9ea7..5269007e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -69,9 +69,12 @@ dependencies = [ [[package]] name = "async-nats" version = "0.29.0" -source = "git+https://github.com/foxford/nats.rs?branch=main#f5fa297c236dcca2e406761e5cc0fe78338a0ae0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1174495e436c928905018f10a36160f7a8a6786450f50f4ce7fba05d1539704c" dependencies = [ - "base64 0.21.0", + "async-nats-tokio-rustls-deps", + "base64 0.13.1", + "base64-url", "bytes", "futures", "http", @@ -85,7 +88,6 @@ dependencies = [ "ring", "rustls-native-certs", "rustls-pemfile", - "rustls-webpki", "serde", "serde_json", "serde_nanos", @@ -94,11 +96,21 @@ dependencies = [ "time 0.3.20", "tokio", "tokio-retry", - "tokio-rustls 0.24.0", "tracing", "url", ] +[[package]] +name = "async-nats-tokio-rustls-deps" +version = "0.24.0-ALPHA.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8cdefe54cd7867d937c0a507d2a3a830af410044282cd3e4002b5b7860e1892e" +dependencies = [ + "rustls 0.21.1", + "tokio", + "webpki 0.22.0", +] + [[package]] name = "async-trait" version = "0.1.68" @@ -229,6 +241,15 @@ version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4a4ddaa51a5bc52a6948f74c06d20aaaddb71924eab79b8c97a8c556e942d6a" +[[package]] +name = "base64-url" +version = "1.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67a99c239d0c7e77c85dddfa9cebce48704b3c49550fcd3b84dd637e4484899f" +dependencies = [ + "base64 0.13.1", +] + [[package]] name = "base64ct" version = "1.1.1" @@ -2252,8 +2273,8 @@ dependencies = [ "pollster", "thiserror", "tokio", - "tokio-rustls 0.22.0", - "webpki", + "tokio-rustls", + "webpki 0.21.4", ] [[package]] @@ -2402,7 +2423,7 @@ dependencies = [ "log", "ring", "sct 0.6.1", - "webpki", + "webpki 0.21.4", ] [[package]] @@ -3137,9 +3158,9 @@ dependencies = [ [[package]] name = "svc-events" -version = "0.7.0" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15432fcea5a7eaa54236ee253e2b573bd720ef6c94a7849be17804200830b7d5" +checksum = "a30b9e3f342270165a9e7543f9ba85a45bff586cc04f4d495ffd8b4a44447974" dependencies = [ "serde", "serde_json", @@ -3151,8 +3172,9 @@ dependencies = [ [[package]] name = "svc-nats-client" -version = "0.4.0" -source = "git+https://github.com/foxford/svc-nats-client/?branch=ULMS-1896/add-transient-permanent-errors#705d841c12fbc0b26233fb5935b362540d40ff80" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c35276c77ae7ff528a5fb243f16bf95065a3ac8c91837356f2ee797c9a697203" dependencies = [ "anyhow", "async-nats", @@ -3389,17 +3411,7 @@ checksum = "bc6844de72e57df1980054b38be3a9f4702aba4858be64dd700181a8a6d0e1b6" dependencies = [ "rustls 0.19.1", "tokio", - "webpki", -] - -[[package]] -name = "tokio-rustls" -version = "0.24.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0d409377ff5b1e3ca6437aa86c1eb7d40c134bfec254e44c830defa92669db5" -dependencies = [ - "rustls 0.21.1", - "tokio", + "webpki 0.21.4", ] [[package]] @@ -3823,6 +3835,16 @@ dependencies = [ "untrusted", ] +[[package]] +name = "webpki" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "whoami" version = "1.4.0" diff --git a/Cargo.toml b/Cargo.toml index 43295c2a..8a82da20 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,8 +39,8 @@ svc-authn = { version = "0.8", features = ["jose", "sqlx"] } svc-authz = { version = "0.12" } svc-error = { version = "0.5", features = ["sqlx", "svc-agent", "svc-authn", "svc-authz", "sentry-extension"] } svc-utils = { version = "0.7", features = ["authn-extractor", "cors-middleware", "log-middleware"] } -svc-nats-client = { git = "https://github.com/foxford/svc-nats-client/", branch = "ULMS-1896/add-transient-permanent-errors" } -svc-events = "0.7" +svc-nats-client = "0.5" +svc-events = "0.9" tokio = { version = "1.28", features = ["full"] } tower = "0.4" tower-http = { version = "0.4", features = ["trace", "cors"] } From 3c2d619f22805530b67ee76d97806d425609b45c Mon Sep 17 00:00:00 2001 From: Dmitry Shlagoff Date: Thu, 22 Jun 2023 16:37:56 +0200 Subject: [PATCH 13/20] Fix migration container build --- docker/migration.dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker/migration.dockerfile b/docker/migration.dockerfile index 88166540..92d1123f 100644 --- a/docker/migration.dockerfile +++ b/docker/migration.dockerfile @@ -6,7 +6,7 @@ RUN apt update && apt install -y --no-install-recommends \ libcurl4-openssl-dev \ libpq-dev -RUN cargo install sqlx-cli --version 0.6.3 --no-default-features --features postgres +RUN cargo install sqlx-cli --version 0.6.3 --no-default-features --features native-tls,postgres WORKDIR /app CMD ["cargo", "sqlx", "migrate", "run"] COPY ./migrations /app/migrations From 3b87531767a27fa15f4c656a586dc782b49bf019 Mon Sep 17 00:00:00 2001 From: Dmitry Shlagoff Date: Fri, 23 Jun 2023 14:00:11 +0200 Subject: [PATCH 14/20] Fix config --- chart/Chart.yaml | 2 +- chart/templates/app-cm.yaml | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/chart/Chart.yaml b/chart/Chart.yaml index edc564b9..5d704c36 100644 --- a/chart/Chart.yaml +++ b/chart/Chart.yaml @@ -16,4 +16,4 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 0.2.9 +version: 0.2.10 diff --git a/chart/templates/app-cm.yaml b/chart/templates/app-cm.yaml index c9fc4ce2..1713f809 100644 --- a/chart/templates/app-cm.yaml +++ b/chart/templates/app-cm.yaml @@ -33,10 +33,10 @@ data: [nats] url = {{ .url | quote }} creds = {{ .creds | quote }} - subscribe.stream = {{ .subscribe.stream | quote }} - subscribe.consumer = {{ .subscribe.consumer | quote }} - subscribe.batch = {{ .subscribe.batch }} - subscribe.idle_heartbeat = {{ .subscribe.idle_heartbeat | quote }} + subscribe_durable.stream = {{ .subscribe.stream | quote }} + subscribe_durable.consumer = {{ .subscribe.consumer | quote }} + subscribe_durable.batch = {{ .subscribe.batch }} + subscribe_durable.idle_heartbeat = {{ .subscribe.idle_heartbeat | quote }} {{- end }} {{- with .Values.nats_consumer }} From a919dad4bd74ee623d42f97c8a553617581ae838 Mon Sep 17 00:00:00 2001 From: Dmitry Shlagoff Date: Fri, 23 Jun 2023 14:46:14 +0200 Subject: [PATCH 15/20] Fix event format --- src/app/stage/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/app/stage/mod.rs b/src/app/stage/mod.rs index ac9268f3..dac0968f 100644 --- a/src/app/stage/mod.rs +++ b/src/app/stage/mod.rs @@ -172,6 +172,7 @@ async fn handle_ban_accepted( let event_id = headers.event_id(); let event = BanCollaborationCompletedV1::new_from_accepted(e, event_id.clone()); + let event = Event::from(event); let payload = serde_json::to_vec(&event) .error(ErrorKind::InvalidPayload) From 49febd2d2dc96a3583b625c796d85d71c42f6aea Mon Sep 17 00:00:00 2001 From: Dmitry Shlagoff Date: Fri, 23 Jun 2023 17:37:56 +0200 Subject: [PATCH 16/20] Fix event id --- src/app/stage/mod.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/app/stage/mod.rs b/src/app/stage/mod.rs index dac0968f..5e483557 100644 --- a/src/app/stage/mod.rs +++ b/src/app/stage/mod.rs @@ -4,7 +4,7 @@ use anyhow::Context; use chrono::{DateTime, TimeZone, Utc}; use svc_events::{ ban::{BanAcceptedV1, BanCollaborationCompletedV1}, - Event, EventV1, VideoGroupEventV1, + Event, EventId, EventV1, VideoGroupEventV1, }; use svc_nats_client::{ consumer::{FailureKind, FailureKindExt, HandleMessageFailure}, @@ -178,6 +178,12 @@ async fn handle_ban_accepted( .error(ErrorKind::InvalidPayload) .permanent()?; + let event_id = EventId::from(( + event_id.entity_type().to_owned(), + "collaboration_completed".to_owned(), + event_id.sequence_id(), + )); + let event = svc_nats_client::event::Builder::new( subject, payload, From bebb8fa17bab9a6a0703f82ca567f917248ab179 Mon Sep 17 00:00:00 2001 From: Dmitry Shlagoff Date: Tue, 11 Jul 2023 14:03:52 +0200 Subject: [PATCH 17/20] Add trace --- src/app/stage/mod.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/app/stage/mod.rs b/src/app/stage/mod.rs index 5e483557..14cdebb2 100644 --- a/src/app/stage/mod.rs +++ b/src/app/stage/mod.rs @@ -50,6 +50,8 @@ pub async fn route_message( .permanent()? }; + tracing::info!(?event, class_id = %classroom_id); + let headers = svc_nats_client::Headers::try_from(msg.headers.clone().unwrap_or_default()) .context("parse nats headers") .permanent()?; From 4955f6be549b4b9450680c2895f6bcbef1945c50 Mon Sep 17 00:00:00 2001 From: Dmitry Shlagoff Date: Tue, 11 Jul 2023 14:19:10 +0200 Subject: [PATCH 18/20] Allow student to read banned users --- src/app/endpoint/ban.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/app/endpoint/ban.rs b/src/app/endpoint/ban.rs index 2cd93c5b..e5181b60 100644 --- a/src/app/endpoint/ban.rs +++ b/src/app/endpoint/ban.rs @@ -61,7 +61,7 @@ impl RequestHandler for ListHandler { room.audience().into(), reqp.as_account_id().to_owned(), object, - "update".into(), + "read".into(), ) .await?; From fb92e63ff93a995a43fb4975d37b24b1a973a5a0 Mon Sep 17 00:00:00 2001 From: Dmitry Shlagoff Date: Tue, 18 Jul 2023 12:23:32 +0200 Subject: [PATCH 19/20] Fix tests --- src/app/endpoint/ban.rs | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/src/app/endpoint/ban.rs b/src/app/endpoint/ban.rs index a01d2332..a49517a8 100644 --- a/src/app/endpoint/ban.rs +++ b/src/app/endpoint/ban.rs @@ -132,7 +132,7 @@ mod tests { authz.allow( agent.account_id(), vec!["classrooms", &room.classroom_id().to_string()], - "update", + "read", ); let mut context = TestContext::new(db, authz); @@ -162,15 +162,7 @@ mod tests { shared_helpers::insert_room(&mut conn).await }; - let mut authz = TestAuthz::new(); - let classroom_id = room.classroom_id().to_string(); - authz.allow( - agent.account_id(), - vec!["classrooms", &classroom_id], - "read", - ); - - let mut context = TestContext::new(db, authz); + let mut context = TestContext::new(db, TestAuthz::new()); let payload = ListRequest { room_id: room.id() }; From dbb409498646c23c6e70666c65fa58027a4d829c Mon Sep 17 00:00:00 2001 From: Dmitry Shlagoff Date: Thu, 20 Jul 2023 12:18:04 +0200 Subject: [PATCH 20/20] Add tests --- Cargo.lock | 154 +++++++++++++++--------------------- Cargo.toml | 10 +-- src/app/context.rs | 8 +- src/app/stage/mod.rs | 91 +++++++++++++++++++++ src/test_helpers/context.rs | 44 +++-------- 5 files changed, 173 insertions(+), 134 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d516b66f..8c75e04a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -91,26 +91,25 @@ dependencies = [ [[package]] name = "async-nats" -version = "0.29.0" +version = "0.30.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1174495e436c928905018f10a36160f7a8a6786450f50f4ce7fba05d1539704c" +checksum = "94e3e851ddf3b62be8a8085e1e453968df9cdbf990a37bbb589b5b4f587c68d7" dependencies = [ - "async-nats-tokio-rustls-deps", - "base64 0.13.1", - "base64-url", + "base64 0.21.2", "bytes", "futures", "http", "itoa 1.0.8", "memchr", "nkeys", - "nuid", + "nuid 0.3.2", "once_cell", "rand", "regex", "ring", "rustls-native-certs", "rustls-pemfile", + "rustls-webpki", "serde", "serde_json", "serde_nanos", @@ -119,21 +118,11 @@ dependencies = [ "time 0.3.22", "tokio", "tokio-retry", + "tokio-rustls 0.24.1", "tracing", "url", ] -[[package]] -name = "async-nats-tokio-rustls-deps" -version = "0.24.0-ALPHA.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8cdefe54cd7867d937c0a507d2a3a830af410044282cd3e4002b5b7860e1892e" -dependencies = [ - "rustls 0.21.2", - "tokio", - "webpki 0.22.0", -] - [[package]] name = "async-trait" version = "0.1.70" @@ -264,15 +253,6 @@ version = "0.21.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "604178f6c5c21f02dc555784810edfb88d34ac2c73b2eae109655649ee73ce3d" -[[package]] -name = "base64-url" -version = "1.4.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67a99c239d0c7e77c85dddfa9cebce48704b3c49550fcd3b84dd637e4484899f" -dependencies = [ - "base64 0.13.1", -] - [[package]] name = "base64ct" version = "1.1.1" @@ -337,6 +317,9 @@ name = "bytes" version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" +dependencies = [ + "serde", +] [[package]] name = "cc" @@ -571,29 +554,6 @@ dependencies = [ "const-oid", ] -[[package]] -name = "diesel" -version = "1.4.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b28135ecf6b7d446b43e27e225622a038cc4e2930a1022f51cdb97ada19b8e4d" -dependencies = [ - "bitflags 1.3.2", - "byteorder", - "diesel_derives", - "pq-sys", -] - -[[package]] -name = "diesel_derives" -version = "1.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45f5098f628d02a7a0f68ddba586fb61e80edec3bdc1be3b921f4ceec60858d3" -dependencies = [ - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "difflib" version = "0.4.0" @@ -813,7 +773,7 @@ dependencies = [ "svc-agent", "svc-authn", "svc-authz", - "svc-error", + "svc-error 0.6.0", "svc-events", "svc-nats-client", "svc-utils", @@ -1552,9 +1512,9 @@ dependencies = [ [[package]] name = "nkeys" -version = "0.2.0" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e66a7cd1358277b2a6f77078e70aea7315ff2f20db969cc61153103ec162594" +checksum = "c2d151f6ece2f3d1077f6c779268de2516653d8344ddde65addd785cce764fe5" dependencies = [ "byteorder", "data-encoding", @@ -1601,6 +1561,16 @@ dependencies = [ "rand", ] +[[package]] +name = "nuid" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b61b1710432e483e6a67b20b6c60c6afe0e2fad67aabba3bdb912f3f70ff6ae" +dependencies = [ + "once_cell", + "rand", +] + [[package]] name = "num-bigint" version = "0.2.6" @@ -1947,15 +1917,6 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" -[[package]] -name = "pq-sys" -version = "0.4.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31c0052426df997c0cbd30789eb44ca097e3541717a7b8fa36b1c464ee7edebd" -dependencies = [ - "vcpkg", -] - [[package]] name = "predicates" version = "2.1.5" @@ -2247,8 +2208,8 @@ dependencies = [ "pollster", "thiserror", "tokio", - "tokio-rustls", - "webpki 0.21.4", + "tokio-rustls 0.22.0", + "webpki", ] [[package]] @@ -2397,14 +2358,14 @@ dependencies = [ "log", "ring", "sct 0.6.1", - "webpki 0.21.4", + "webpki", ] [[package]] name = "rustls" -version = "0.21.2" +version = "0.21.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e32ca28af694bc1bbf399c33a516dbdf1c90090b8ab23c2bc24f834aa2247f5f" +checksum = "79ea77c539259495ce8ca47f53e66ae0330a8819f67e23ac96ca02f50e7b7d36" dependencies = [ "log", "ring", @@ -2435,9 +2396,9 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.100.1" +version = "0.101.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6207cd5ed3d8dca7816f8f3725513a34609c0c765bf652b8c3cb4cfd87db46b" +checksum = "15f36a6828982f422756984e47912a7a51dcbc2a197aa791158f8ca61cd8204e" dependencies = [ "ring", "untrusted", @@ -3064,9 +3025,9 @@ checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" [[package]] name = "svc-agent" -version = "0.20.0" +version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1c90205083c87c6bc25990d5cdd26f98d8c431441e86dbd4e7421b28c428e04d" +checksum = "bef9c610b65f24bab61b52f5a7264983d0e014858e8496282aec7d688baf7869" dependencies = [ "async-channel", "base64 0.21.2", @@ -3089,7 +3050,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eb5cf659f78c8fff863c17ac4e674829517919716eeecab602e8d2941e89c111" dependencies = [ "chrono", - "diesel", "http", "jsonwebtoken", "serde", @@ -3123,6 +3083,19 @@ name = "svc-error" version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f841d7fd45d6f179e9f3765491fcb5eea100a5bbe50ea47faf3f262031966d9" +dependencies = [ + "anyhow", + "crossbeam-channel", + "http", + "serde", + "serde_derive", +] + +[[package]] +name = "svc-error" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad220c6bc89bc2e7b8af01db6dcfa4a513e18d78e7cf2f778e623ac22577eadf" dependencies = [ "anyhow", "crossbeam-channel", @@ -3140,9 +3113,9 @@ dependencies = [ [[package]] name = "svc-events" -version = "0.9.0" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a30b9e3f342270165a9e7543f9ba85a45bff586cc04f4d495ffd8b4a44447974" +checksum = "3ad84bd15a598b693df7dd08ca832c3414d59d6847f134f479ff547264669735" dependencies = [ "serde", "serde_json", @@ -3154,9 +3127,9 @@ dependencies = [ [[package]] name = "svc-nats-client" -version = "0.5.0" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c35276c77ae7ff528a5fb243f16bf95065a3ac8c91837356f2ee797c9a697203" +checksum = "cf7705838936003cae1b79e726be255ea9702b8aad516dec9c998c7c93ef6f8d" dependencies = [ "anyhow", "async-nats", @@ -3164,10 +3137,11 @@ dependencies = [ "futures", "futures-util", "humantime-serde", + "nuid 0.4.1", "reqwest", "serde", "svc-agent", - "svc-error", + "svc-error 0.6.0", "svc-events", "thiserror", "tokio", @@ -3177,9 +3151,9 @@ dependencies = [ [[package]] name = "svc-utils" -version = "0.7.4" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb63daf177825295faa99773d6fb6de26be0d3e9fbe427e3cac794358c000e99" +checksum = "b8443737f4d2444dc9e6a140a83d720685290d1ad763e75b59421958fa4a1a96" dependencies = [ "axum", "futures", @@ -3188,7 +3162,7 @@ dependencies = [ "prometheus", "svc-agent", "svc-authn", - "svc-error", + "svc-error 0.5.0", "tokio", "tower", "tower-http", @@ -3386,7 +3360,17 @@ checksum = "bc6844de72e57df1980054b38be3a9f4702aba4858be64dd700181a8a6d0e1b6" dependencies = [ "rustls 0.19.1", "tokio", - "webpki 0.21.4", + "webpki", +] + +[[package]] +name = "tokio-rustls" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" +dependencies = [ + "rustls 0.21.5", + "tokio", ] [[package]] @@ -3803,16 +3787,6 @@ dependencies = [ "untrusted", ] -[[package]] -name = "webpki" -version = "0.22.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd" -dependencies = [ - "ring", - "untrusted", -] - [[package]] name = "whoami" version = "1.4.1" diff --git a/Cargo.toml b/Cargo.toml index b76d0dbe..76741829 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,13 +34,13 @@ serde_qs = "0.12" signal-hook = "0.3" signal-hook-tokio = { version = "0.3", features = ["futures-v0_3"] } sqlx = { version = "0.6", features = ["offline", "postgres", "macros", "uuid", "chrono", "json", "bigdecimal", "runtime-tokio-native-tls"] } -svc-agent = { version = "0.20", features = ["sqlx", "queue-counter"] } +svc-agent = { version = "0.21", features = ["sqlx", "queue-counter"] } svc-authn = { version = "0.8", features = ["jose", "sqlx"] } svc-authz = { version = "0.12" } -svc-error = { version = "0.5", features = ["sqlx", "svc-agent", "svc-authn", "svc-authz", "sentry-extension"] } -svc-utils = { version = "0.7", features = ["authn-extractor", "cors-middleware", "log-middleware"] } -svc-nats-client = "0.5" -svc-events = "0.9" +svc-error = { version = "0.6", features = ["sqlx", "svc-agent", "svc-authn", "svc-authz", "sentry-extension"] } +svc-utils = { version = "0.8", features = ["authn-extractor", "cors-middleware", "log-middleware"] } +svc-nats-client = "0.8" +svc-events = "0.11" tokio = { version = "1.28", features = ["full"] } tower = "0.4" tower-http = { version = "0.4", features = ["trace", "cors"] } diff --git a/src/app/context.rs b/src/app/context.rs index 8b02f462..d5bdbbaf 100644 --- a/src/app/context.rs +++ b/src/app/context.rs @@ -34,7 +34,7 @@ pub trait GlobalContext { fn metrics(&self) -> Arc; fn s3_client(&self) -> Option; fn broker_client(&self) -> &dyn BrokerClient; - fn nats_client(&self) -> Option<&dyn NatsClient>; + fn nats_client(&self) -> Option>; async fn get_conn(&self) -> Result, AppError> { self.db() @@ -121,8 +121,8 @@ impl GlobalContext for AppContext { self.broker_client.as_ref() } - fn nats_client(&self) -> Option<&dyn NatsClient> { - self.nats_client.as_deref() + fn nats_client(&self) -> Option> { + self.nats_client.clone() } } @@ -183,7 +183,7 @@ impl<'a, C: GlobalContext> GlobalContext for AppMessageContext<'a, C> { self.global_context.broker_client() } - fn nats_client(&self) -> Option<&dyn NatsClient> { + fn nats_client(&self) -> Option> { self.global_context.nats_client() } } diff --git a/src/app/stage/mod.rs b/src/app/stage/mod.rs index 14cdebb2..1aa7412e 100644 --- a/src/app/stage/mod.rs +++ b/src/app/stage/mod.rs @@ -205,3 +205,94 @@ async fn handle_ban_accepted( Ok(()) } + +#[cfg(test)] +mod tests { + use super::*; + + use crate::test_helpers::prelude::*; + + #[tokio::test] + async fn ban_accepted_handler_enables_disables_ban() { + let db = TestDb::new().await; + let agent = TestAgent::new("web", "user", USR_AUDIENCE); + + let mut conn = db.get_conn().await; + let room = shared_helpers::insert_room(&mut conn).await; + + let subject = Subject::new("test".to_string(), room.classroom_id(), "ban".to_string()); + let event_id: EventId = ("ban".to_string(), "accepted".to_string(), 0).into(); + let headers = + svc_nats_client::test_helpers::HeadersBuilder::new(event_id, agent.agent_id().clone()) + .build(); + let e = BanAcceptedV1 { + ban: true, + classroom_id: room.classroom_id(), + target_account: agent.account_id().clone(), + operation_id: 0, + }; + + let ctx = TestContext::new(db, TestAuthz::new()); + + handle_ban_accepted(&ctx, e, &room, subject, &headers) + .await + .expect("handler failed"); + + // to drop pub_reqs after all checks + { + let pub_reqs = ctx.inspect_nats_client().get_publish_requests(); + assert_eq!(pub_reqs.len(), 1); + + let payload = serde_json::from_slice::(&pub_reqs[0].payload) + .expect("failed to parse event"); + assert!(matches!( + payload, + Event::V1(EventV1::BanCollaborationCompleted( + BanCollaborationCompletedV1 { ban: true, .. } + )) + )); + } + + let room_bans = db::room_ban::ListQuery::new(room.id()) + .execute(&mut conn) + .await + .expect("failed to list room bans"); + + assert_eq!(room_bans.len(), 1); + assert_eq!(room_bans[0].account_id(), agent.account_id()); + + let subject = Subject::new("test".to_string(), room.classroom_id(), "ban".to_string()); + let e: BanAcceptedV1 = BanAcceptedV1 { + ban: false, + classroom_id: room.classroom_id(), + target_account: agent.account_id().clone(), + operation_id: 0, + }; + + handle_ban_accepted(&ctx, e, &room, subject, &headers) + .await + .expect("handler failed"); + + // to drop pub_reqs after all checks + { + let pub_reqs = ctx.inspect_nats_client().get_publish_requests(); + assert_eq!(pub_reqs.len(), 2); + + let payload = serde_json::from_slice::(&pub_reqs[1].payload) + .expect("failed to parse event"); + assert!(matches!( + payload, + Event::V1(EventV1::BanCollaborationCompleted( + BanCollaborationCompletedV1 { ban: false, .. } + )) + )); + } + + let room_bans = db::room_ban::ListQuery::new(room.id()) + .execute(&mut conn) + .await + .expect("failed to list room bans"); + + assert_eq!(room_bans.len(), 0); + } +} diff --git a/src/test_helpers/context.rs b/src/test_helpers/context.rs index 1db2b249..adad0abe 100644 --- a/src/test_helpers/context.rs +++ b/src/test_helpers/context.rs @@ -1,16 +1,12 @@ use std::sync::Arc; -use async_trait::async_trait; use chrono::{DateTime, Utc}; use prometheus::Registry; use serde_json::json; use sqlx::postgres::PgPool as Db; use svc_agent::{queue_counter::QueueCounterHandle, AgentId}; use svc_authz::cache::ConnectionPool as RedisConnectionPool; -use svc_nats_client::{ - AckPolicy, DeliverPolicy, Event, Message, MessageStream, Messages, NatsClient, PublishError, - Subject, SubscribeError, TermMessageError, -}; +use svc_nats_client::{test_helpers::TestNatsClient, NatsClient}; use crate::{ app::{ @@ -74,7 +70,7 @@ pub struct TestContext { start_timestamp: DateTime, s3_client: Option, broker_client: Arc, - nats_client: Option>, + nats_client: Arc, } impl TestContext { @@ -92,7 +88,7 @@ impl TestContext { start_timestamp: Utc::now(), s3_client: None, broker_client: Arc::new(MockBrokerClient::new()), - nats_client: Some(Arc::new(TestNatsClient {}) as Arc), + nats_client: Arc::new(TestNatsClient::new()), } } @@ -110,7 +106,7 @@ impl TestContext { start_timestamp: Utc::now(), s3_client: None, broker_client: Arc::new(MockBrokerClient::new()), - nats_client: Some(Arc::new(TestNatsClient {}) as Arc), + nats_client: Arc::new(TestNatsClient::new()), } } @@ -128,7 +124,7 @@ impl TestContext { start_timestamp: Utc::now(), s3_client: None, broker_client: Arc::new(MockBrokerClient::new()), - nats_client: Some(Arc::new(TestNatsClient {}) as Arc), + nats_client: Arc::new(TestNatsClient::new()), } } @@ -139,31 +135,9 @@ impl TestContext { pub fn broker_client_mock(&mut self) -> &mut MockBrokerClient { Arc::get_mut(&mut self.broker_client).expect("Failed to get broker client mock") } -} - -struct TestNatsClient; - -#[async_trait] -impl NatsClient for TestNatsClient { - async fn publish(&self, _event: &Event) -> Result<(), PublishError> { - Ok(()) - } - - async fn subscribe_durable(&self) -> Result { - unimplemented!() - } - - async fn subscribe_ephemeral( - &self, - _subject: Subject, - _deliver_policy: DeliverPolicy, - _ack_policy: AckPolicy, - ) -> Result { - unimplemented!() - } - async fn terminate(&self, _message: &Message) -> Result<(), TermMessageError> { - unimplemented!() + pub fn inspect_nats_client(&self) -> &TestNatsClient { + &self.nats_client } } @@ -208,8 +182,8 @@ impl GlobalContext for TestContext { self.broker_client.as_ref() } - fn nats_client(&self) -> Option<&dyn NatsClient> { - self.nats_client.as_deref() + fn nats_client(&self) -> Option> { + Some(self.nats_client.clone() as Arc) } }