diff --git a/relay_client/src/http.rs b/relay_client/src/http.rs index a615559..4e792fc 100644 --- a/relay_client/src/http.rs +++ b/relay_client/src/http.rs @@ -142,21 +142,13 @@ impl Client { /// when fully processed by the relay. /// Note: This function is experimental and will likely be removed in the /// future. - pub async fn subscribe_blocking(&self, topic: Topic) -> Response { - self.request(rpc::Subscribe { topic, block: true }).await + pub async fn subscribe_blocking(&self, topic: Topic) -> Response { + self.request(rpc::SubscribeBlocking { topic }).await } /// Unsubscribes from a topic. - pub async fn unsubscribe( - &self, - topic: Topic, - subscription_id: SubscriptionId, - ) -> Response { - self.request(rpc::Unsubscribe { - topic, - subscription_id, - }) - .await + pub async fn unsubscribe(&self, topic: Topic) -> Response { + self.request(rpc::Unsubscribe { topic }).await } /// Fetch mailbox messages for a specific topic. @@ -265,12 +257,18 @@ impl Client { pub async fn batch_subscribe_blocking( &self, topics: impl Into>, - ) -> Response { - self.request(rpc::BatchSubscribe { - topics: topics.into(), - block: true, - }) - .await + ) -> Result< + Vec>>, + Error, + > { + Ok(self + .request(rpc::BatchSubscribeBlocking { + topics: topics.into(), + }) + .await? + .into_iter() + .map(crate::convert_subscription_result) + .collect()) } /// Unsubscribes from multiple topics. diff --git a/relay_client/src/lib.rs b/relay_client/src/lib.rs index fe6f8e9..7408975 100644 --- a/relay_client/src/lib.rs +++ b/relay_client/src/lib.rs @@ -3,7 +3,8 @@ use { ::http::HeaderMap, relay_rpc::{ auth::{SerializedAuthToken, RELAY_WEBSOCKET_ADDRESS}, - domain::{MessageId, ProjectId}, + domain::{MessageId, ProjectId, SubscriptionId}, + rpc::{SubscriptionError, SubscriptionResult}, user_agent::UserAgent, }, serde::Serialize, @@ -170,6 +171,16 @@ impl Default for MessageIdGenerator { } } +#[inline] +fn convert_subscription_result( + res: SubscriptionResult, +) -> Result> { + match res { + SubscriptionResult::Id(id) => Ok(id), + SubscriptionResult::Error(err) => Err(ClientError::from(err).into()), + } +} + #[cfg(test)] mod tests { use { diff --git a/relay_client/src/websocket.rs b/relay_client/src/websocket.rs index bde3eda..9b02658 100644 --- a/relay_client/src/websocket.rs +++ b/relay_client/src/websocket.rs @@ -1,22 +1,28 @@ use { self::connection::{connection_event_loop, ConnectionControl}, - crate::{error::ClientError, ConnectionOptions}, + crate::{ + error::{ClientError, Error}, + ConnectionOptions, + }, relay_rpc::{ domain::{MessageId, SubscriptionId, Topic}, rpc::{ BatchFetchMessages, BatchReceiveMessages, BatchSubscribe, + BatchSubscribeBlocking, BatchUnsubscribe, FetchMessages, Publish, Receipt, Subscribe, + SubscribeBlocking, Subscription, + SubscriptionError, Unsubscribe, }, }, - std::{sync::Arc, time::Duration}, + std::{future::Future, sync::Arc, time::Duration}, tokio::sync::{ mpsc::{self, UnboundedSender}, oneshot, @@ -182,8 +188,8 @@ impl Client { /// when fully processed by the relay. /// Note: This function is experimental and will likely be removed in the /// future. - pub fn subscribe_blocking(&self, topic: Topic) -> ResponseFuture { - let (request, response) = create_request(Subscribe { topic, block: true }); + pub fn subscribe_blocking(&self, topic: Topic) -> ResponseFuture { + let (request, response) = create_request(SubscribeBlocking { topic }); self.request(request); @@ -191,15 +197,8 @@ impl Client { } /// Unsubscribes from a topic. - pub fn unsubscribe( - &self, - topic: Topic, - subscription_id: SubscriptionId, - ) -> EmptyResponseFuture { - let (request, response) = create_request(Unsubscribe { - topic, - subscription_id, - }); + pub fn unsubscribe(&self, topic: Topic) -> EmptyResponseFuture { + let (request, response) = create_request(Unsubscribe { topic }); self.request(request); @@ -240,15 +239,25 @@ impl Client { pub fn batch_subscribe_blocking( &self, topics: impl Into>, - ) -> ResponseFuture { - let (request, response) = create_request(BatchSubscribe { + ) -> impl Future< + Output = Result< + Vec>>, + Error, + >, + > { + let (request, response) = create_request(BatchSubscribeBlocking { topics: topics.into(), - block: true, }); self.request(request); - response + async move { + Ok(response + .await? + .into_iter() + .map(crate::convert_subscription_result) + .collect()) + } } /// Unsubscribes from multiple topics. diff --git a/relay_rpc/src/rpc.rs b/relay_rpc/src/rpc.rs index b9c1c4d..17db730 100644 --- a/relay_rpc/src/rpc.rs +++ b/relay_rpc/src/rpc.rs @@ -196,7 +196,7 @@ impl ErrorResponse { } /// Data structure representing error response params. -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)] pub struct ErrorData { /// Error code. pub code: i32, @@ -215,7 +215,9 @@ pub enum SubscriptionError { SubscriberLimitExceeded, } -/// Data structure representing subscribe request params. +/// Subscription request parameters. This request does not require the +/// subscription to be fully processed, and returns as soon as the server +/// receives it. #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct Subscribe { /// The topic to subscribe to. @@ -244,15 +246,36 @@ impl ServiceRequest for Subscribe { } } +/// Subscription request parameters. This request awaits the subscription to be +/// fully processed and returns possible errors. +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct SubscribeBlocking { + /// The topic to subscribe to. + pub topic: Topic, +} + +impl ServiceRequest for SubscribeBlocking { + type Error = SubscriptionError; + type Response = SubscriptionId; + + fn validate(&self) -> Result<(), PayloadError> { + self.topic + .decode() + .map_err(|_| PayloadError::InvalidTopic)?; + + Ok(()) + } + + fn into_params(self) -> Params { + Params::SubscribeBlocking(self) + } +} + /// Data structure representing unsubscribe request params. #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct Unsubscribe { /// The topic to unsubscribe from. pub topic: Topic, - - /// The id of the subscription to unsubscribe from. - #[serde(rename = "id")] - pub subscription_id: SubscriptionId, } impl ServiceRequest for Unsubscribe { @@ -317,7 +340,9 @@ pub struct FetchResponse { pub has_more: bool, } -/// Multi-topic subscription request parameters. +/// Multi-topic subscription request parameters. This request does not require +/// all subscriptions to be fully processed, and returns as soon as the server +/// receives it. #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct BatchSubscribe { /// The topics to subscribe to. @@ -329,12 +354,9 @@ pub struct BatchSubscribe { pub block: bool, } -impl ServiceRequest for BatchSubscribe { - type Error = SubscriptionError; - type Response = Vec; - - fn validate(&self) -> Result<(), PayloadError> { - let batch_size = self.topics.len(); +impl BatchSubscribe { + fn validate_topics(topics: &[Topic]) -> Result<(), PayloadError> { + let batch_size = topics.len(); if batch_size == 0 { return Err(PayloadError::BatchEmpty); @@ -344,18 +366,55 @@ impl ServiceRequest for BatchSubscribe { return Err(PayloadError::BatchLimitExceeded); } - for topic in &self.topics { + for topic in topics { topic.decode().map_err(|_| PayloadError::InvalidTopic)?; } Ok(()) } +} + +impl ServiceRequest for BatchSubscribe { + type Error = SubscriptionError; + type Response = Vec; + + fn validate(&self) -> Result<(), PayloadError> { + Self::validate_topics(&self.topics) + } fn into_params(self) -> Params { Params::BatchSubscribe(self) } } +/// Multi-topic subscription request parameters. This request awaits all +/// subscriptions to be fully processed and returns possible errors per topic. +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct BatchSubscribeBlocking { + /// The topics to subscribe to. + pub topics: Vec, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum SubscriptionResult { + Id(SubscriptionId), + Error(ErrorData), +} + +impl ServiceRequest for BatchSubscribeBlocking { + type Error = SubscriptionError; + type Response = Vec; + + fn validate(&self) -> Result<(), PayloadError> { + BatchSubscribe::validate_topics(&self.topics) + } + + fn into_params(self) -> Params { + Params::BatchSubscribeBlocking(self) + } +} + /// Multi-topic unsubscription request parameters. #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct BatchUnsubscribe { @@ -696,6 +755,10 @@ pub enum Params { #[serde(rename = "irn_subscribe", alias = "iridium_subscribe")] Subscribe(Subscribe), + /// Parameters to blocking subscribe. + #[serde(rename = "irn_subscribeBlocking", alias = "iridium_subscribeBlocking")] + SubscribeBlocking(SubscribeBlocking), + /// Parameters to unsubscribe. #[serde(rename = "irn_unsubscribe", alias = "iridium_unsubscribe")] Unsubscribe(Unsubscribe), @@ -708,6 +771,13 @@ pub enum Params { #[serde(rename = "irn_batchSubscribe", alias = "iridium_batchSubscribe")] BatchSubscribe(BatchSubscribe), + /// Parameters to blocking batch subscribe. + #[serde( + rename = "irn_batchSubscribeBlocking", + alias = "iridium_batchSubscribeBlocking" + )] + BatchSubscribeBlocking(BatchSubscribeBlocking), + /// Parameters to batch unsubscribe. #[serde(rename = "irn_batchUnsubscribe", alias = "iridium_batchUnsubscribe")] BatchUnsubscribe(BatchUnsubscribe), @@ -779,9 +849,11 @@ impl Request { match &self.params { Params::Subscribe(params) => params.validate(), + Params::SubscribeBlocking(params) => params.validate(), Params::Unsubscribe(params) => params.validate(), Params::FetchMessages(params) => params.validate(), Params::BatchSubscribe(params) => params.validate(), + Params::BatchSubscribeBlocking(params) => params.validate(), Params::BatchUnsubscribe(params) => params.validate(), Params::BatchFetchMessages(params) => params.validate(), Params::Publish(params) => params.validate(), diff --git a/relay_rpc/src/rpc/tests.rs b/relay_rpc/src/rpc/tests.rs index 0345b73..c3c1c29 100644 --- a/relay_rpc/src/rpc/tests.rs +++ b/relay_rpc/src/rpc/tests.rs @@ -220,12 +220,10 @@ fn deserialize_batch_methods() { "params": { "subscriptions": [ { - "topic": "c4163cf65859106b3f5435fc296e7765411178ed452d1c30337a6230138c9840", - "id": "c4163cf65859106b3f5435fc296e7765411178ed452d1c30337a6230138c9841" + "topic": "c4163cf65859106b3f5435fc296e7765411178ed452d1c30337a6230138c9840" }, { - "topic": "c4163cf65859106b3f5435fc296e7765411178ed452d1c30337a6230138c9842", - "id": "c4163cf65859106b3f5435fc296e7765411178ed452d1c30337a6230138c9843" + "topic": "c4163cf65859106b3f5435fc296e7765411178ed452d1c30337a6230138c9842" } ] } @@ -241,22 +239,39 @@ fn deserialize_batch_methods() { topic: Topic::from( "c4163cf65859106b3f5435fc296e7765411178ed452d1c30337a6230138c9840" ), - subscription_id: SubscriptionId::from( - "c4163cf65859106b3f5435fc296e7765411178ed452d1c30337a6230138c9841" - ), }, Unsubscribe { topic: Topic::from( "c4163cf65859106b3f5435fc296e7765411178ed452d1c30337a6230138c9842" ), - subscription_id: SubscriptionId::from( - "c4163cf65859106b3f5435fc296e7765411178ed452d1c30337a6230138c9843" - ), } ] }) }) ); + + let serialized = + r#"{ "id": "c4163cf65859106b3f5435fc296e7765411178ed452d1c30337a6230138c9840" }"#; + assert_eq!( + serde_json::from_str::<'_, SubscriptionResult>(serialized).unwrap(), + SubscriptionResult::Id(SubscriptionId::from( + "c4163cf65859106b3f5435fc296e7765411178ed452d1c30337a6230138c9840" + )) + ); + + let serialized = r#"{ + "error": { + "code": -32600, + "message": "Invalid payload: The batch contains too many items", + "data": "BatchLimitExceeded" + } + }"#; + assert_eq!( + serde_json::from_str::<'_, SubscriptionResult>(serialized).unwrap(), + SubscriptionResult::Error( + Error::::Payload(PayloadError::BatchLimitExceeded).into() + ) + ); } #[test] @@ -353,7 +368,6 @@ fn validation() { jsonrpc: jsonrpc.clone(), params: Params::Unsubscribe(Unsubscribe { topic: topic.clone(), - subscription_id: subscription_id.clone(), }), }; assert_eq!(request.validate(), Ok(())); @@ -364,7 +378,6 @@ fn validation() { jsonrpc: jsonrpc.clone(), params: Params::Unsubscribe(Unsubscribe { topic: Topic::from("invalid"), - subscription_id: subscription_id.clone(), }), }; assert_eq!(request.validate(), Err(PayloadError::InvalidTopic)); @@ -491,10 +504,7 @@ fn validation() { id, jsonrpc: jsonrpc.clone(), params: Params::BatchUnsubscribe(BatchUnsubscribe { - subscriptions: vec![Unsubscribe { - topic, - subscription_id: subscription_id.clone(), - }], + subscriptions: vec![Unsubscribe { topic }], }), }; assert_eq!(request.validate(), Ok(())); @@ -513,7 +523,6 @@ fn validation() { let subscriptions = (0..MAX_SUBSCRIPTION_BATCH_SIZE + 1) .map(|_| Unsubscribe { topic: Topic::generate(), - subscription_id: SubscriptionId::generate(), }) .collect(); let request = Request { @@ -532,7 +541,6 @@ fn validation() { topic: Topic::from( "c4163cf65859106b3f5435fc296e7765411178ed452d1c30337a6230138c98401", ), - subscription_id, }], }), };