diff --git a/CHANGES.md b/CHANGES.md index 83f5be4..41a2ec1 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [4.2.0] - 2024-10-31 + +* Call control service on readiness error + ## [4.1.1] - 2024-10-15 * Disconnect on error from service readiness check diff --git a/Cargo.toml b/Cargo.toml index c82a2a0..59390ec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-mqtt" -version = "4.1.1" +version = "4.2.0" authors = ["ntex contributors "] description = "Client and Server framework for MQTT v5 and v3.1.1 protocols" documentation = "https://docs.rs/ntex-mqtt" @@ -18,7 +18,7 @@ features = ["ntex/tokio"] ntex-io = "2" ntex-net = "2" ntex-util = "2" -ntex-service = "3" +ntex-service = "3.2.1" ntex-bytes = "0.1" ntex-codec = "0.6" ntex-router = "0.5" diff --git a/src/error.rs b/src/error.rs index 87abef5..fbca7f9 100644 --- a/src/error.rs +++ b/src/error.rs @@ -10,6 +10,9 @@ pub enum MqttError { /// Publish handler service error #[error("Service error")] Service(E), + /// Publish service readiness error + #[error("Service readiness error")] + Readiness(Option), /// Handshake error #[error("Mqtt handshake error: {}", _0)] Handshake(#[from] HandshakeError), diff --git a/src/v3/client/dispatcher.rs b/src/v3/client/dispatcher.rs index 6bbf2df..190e4b6 100644 --- a/src/v3/client/dispatcher.rs +++ b/src/v3/client/dispatcher.rs @@ -69,8 +69,18 @@ where #[inline] async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> { let (res1, res2) = join(ctx.ready(&self.publish), ctx.ready(&self.inner.control)).await; - res1.map_err(MqttError::Service)?; - res2 + if let Err(e) = res1 { + if res2.is_err() { + Err(MqttError::Readiness(Some(e))) + } else { + match ctx.call_nowait(&self.inner.control, Control::error(e)).await { + Ok(_) => Err(MqttError::Readiness(None)), + Err(err) => Err(err), + } + } + } else { + res2 + } } async fn shutdown(&self) { diff --git a/src/v3/dispatcher.rs b/src/v3/dispatcher.rs index c90db41..83a9a6f 100644 --- a/src/v3/dispatcher.rs +++ b/src/v3/dispatcher.rs @@ -128,8 +128,18 @@ where #[inline] async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> { let (res1, res2) = join(ctx.ready(&self.publish), ctx.ready(&self.inner.control)).await; - res1.map_err(|e| MqttError::Service(e.into()))?; - res2 + if let Err(e) = res1 { + if res2.is_err() { + Err(MqttError::Readiness(Some(e.into()))) + } else { + match ctx.call_nowait(&self.inner.control, Control::error(e.into())).await { + Ok(_) => Err(MqttError::Readiness(None)), + Err(err) => Err(err), + } + } + } else { + res2 + } } async fn shutdown(&self) { diff --git a/src/v5/client/dispatcher.rs b/src/v5/client/dispatcher.rs index 00861ee..9889e03 100644 --- a/src/v5/client/dispatcher.rs +++ b/src/v5/client/dispatcher.rs @@ -95,8 +95,18 @@ where #[inline] async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> { let (res1, res2) = join(ctx.ready(&self.publish), ctx.ready(&self.inner.control)).await; - res1.map_err(MqttError::Service)?; - res2 + if let Err(e) = res1 { + if res2.is_err() { + Err(MqttError::Readiness(Some(e))) + } else { + match ctx.call_nowait(&self.inner.control, Control::error(e)).await { + Ok(_) => Err(MqttError::Readiness(None)), + Err(err) => Err(err), + } + } + } else { + res2 + } } async fn shutdown(&self) { diff --git a/src/v5/dispatcher.rs b/src/v5/dispatcher.rs index 4d12c98..44d6ddb 100644 --- a/src/v5/dispatcher.rs +++ b/src/v5/dispatcher.rs @@ -136,8 +136,18 @@ where #[inline] async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> { let (res1, res2) = join(ctx.ready(&self.publish), ctx.ready(&self.inner.control)).await; - res1.map_err(|e| MqttError::Service(e.into()))?; - res2 + if let Err(e) = res1 { + if res2.is_err() { + Err(MqttError::Readiness(Some(e.into()))) + } else { + match ctx.call_nowait(&self.inner.control, Control::error(e.into())).await { + Ok(_) => Err(MqttError::Readiness(None)), + Err(err) => Err(err), + } + } + } else { + res2 + } } async fn shutdown(&self) {