From e70568b197eac7abb099030bb74135d125a1ee7c Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 14 May 2024 15:59:58 +0500 Subject: [PATCH] better naming --- CHANGES.md | 4 ++ Cargo.toml | 2 +- src/v3/client/connector.rs | 32 ++++++++------ src/v3/dispatcher.rs | 8 ++-- src/v3/handshake.rs | 21 ++++++--- src/v3/server.rs | 90 ++++++++++++++++++++++---------------- src/v5/client/connector.rs | 8 +++- src/v5/server.rs | 36 ++++++++++----- tests/test_server_v5.rs | 2 +- 9 files changed, 127 insertions(+), 76 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 4f27dbb..8ac0c2d 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.0.1] - 2024-05-14 + +* Better naming + ## [2.0.0] - 2024-05-1x * Mark `Control` type as `non exhaustive` diff --git a/Cargo.toml b/Cargo.toml index 441c959..e8efe30 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-mqtt" -version = "2.0.0" +version = "2.0.1" authors = ["ntex contributors "] description = "Client and Server framework for MQTT v5 and v3.1.1 protocols" documentation = "https://docs.rs/ntex-mqtt" diff --git a/src/v3/client/connector.rs b/src/v3/client/connector.rs index 8fa63fd..1978bed 100644 --- a/src/v3/client/connector.rs +++ b/src/v3/client/connector.rs @@ -14,9 +14,9 @@ pub struct MqttConnector { address: A, connector: Pipeline, pkt: codec::Connect, + max_size: u32, max_send: usize, max_receive: usize, - max_packet_size: u32, handshake_timeout: Seconds, config: DispatcherConfig, pool: Rc, @@ -37,9 +37,9 @@ where config, pkt: codec::Connect::default(), connector: Pipeline::new(Connector::default()), + max_size: 64 * 1024, max_send: 16, max_receive: 16, - max_packet_size: 64 * 1024, handshake_timeout: Seconds::ZERO, pool: Rc::new(MqttSinkPool::default()), } @@ -102,10 +102,19 @@ where self } + #[inline] + /// Max incoming packet size. + /// + /// To disable max size limit set value to 0. + pub fn max_size(mut self, val: u32) -> Self { + self.max_size = val; + self + } + #[inline] /// Set max send packets number /// - /// Number of in-flight outgoing publish packets. By default receive max is set to 16 packets. + /// Number of in-flight outgoing publish packets. By default send max is set to 16 packets. /// To disable in-flight limit set value to 0. pub fn max_send(mut self, val: u16) -> Self { self.max_send = val as usize; @@ -113,21 +122,18 @@ where } #[inline] - /// Set max receive packets number + /// Number of inbound in-flight concurrent messages. /// - /// Number of in-flight incoming publish packets. By default receive max is set to 16 packets. - /// To disable in-flight limit set value to 0. + /// By default inbound is set to 16 messages To disable in-flight limit set value to 0. pub fn max_receive(mut self, val: u16) -> Self { self.max_receive = val as usize; self } - #[inline] - /// Max incoming packet size. - /// - /// To disable max size limit set value to 0. + #[deprecated] + #[doc(hidden)] pub fn max_packet_size(mut self, val: u32) -> Self { - self.max_packet_size = val; + self.max_size = val; self } @@ -184,9 +190,9 @@ where pkt: self.pkt, address: self.address, config: self.config, + max_size: self.max_size, max_send: self.max_send, max_receive: self.max_receive, - max_packet_size: self.max_packet_size, handshake_timeout: self.handshake_timeout, pool: self.pool, } @@ -216,7 +222,7 @@ where let config = self.config.clone(); let pool = self.pool.clone(); let codec = codec::Codec::new(); - codec.set_max_size(self.max_packet_size); + codec.set_max_size(self.max_size); io.encode(pkt.into(), &codec)?; diff --git a/src/v3/dispatcher.rs b/src/v3/dispatcher.rs index 3a655e0..3187939 100644 --- a/src/v3/dispatcher.rs +++ b/src/v3/dispatcher.rs @@ -16,8 +16,8 @@ use super::{codec, publish::Publish, shared::Ack, shared::MqttShared, Session}; pub(super) fn factory( publish: T, control: C, - inflight: u16, - inflight_size: usize, + inbound: u16, + inbound_size: usize, max_qos: QoS, handle_qos_after_disconnect: Option, ) -> impl ServiceFactory< @@ -62,8 +62,8 @@ where Ok( // limit number of in-flight messages crate::inflight::InFlightService::new( - inflight, - inflight_size, + inbound, + inbound_size, Dispatcher::<_, _, E>::new( sink, publish, diff --git a/src/v3/handshake.rs b/src/v3/handshake.rs index 5ae0c71..1bf6c95 100644 --- a/src/v3/handshake.rs +++ b/src/v3/handshake.rs @@ -66,7 +66,7 @@ impl Handshake { keepalive, session_present, session: Some(st), - outgoing: None, + max_send: None, return_code: mqtt::ConnectAckReason::ConnectionAccepted, } } @@ -79,7 +79,7 @@ impl Handshake { session: None, session_present: false, keepalive: DEFAULT_KEEPALIVE, - outgoing: None, + max_send: None, return_code: mqtt::ConnectAckReason::IdentifierRejected, } } @@ -91,7 +91,7 @@ impl Handshake { shared: self.shared, session: None, session_present: false, - outgoing: None, + max_send: None, keepalive: DEFAULT_KEEPALIVE, return_code: mqtt::ConnectAckReason::BadUserNameOrPassword, } @@ -104,7 +104,7 @@ impl Handshake { shared: self.shared, session: None, session_present: false, - outgoing: None, + max_send: None, keepalive: DEFAULT_KEEPALIVE, return_code: mqtt::ConnectAckReason::NotAuthorized, } @@ -117,7 +117,7 @@ impl Handshake { shared: self.shared, session: None, session_present: false, - outgoing: None, + max_send: None, keepalive: DEFAULT_KEEPALIVE, return_code: mqtt::ConnectAckReason::ServiceUnavailable, } @@ -138,7 +138,7 @@ pub struct HandshakeAck { pub(crate) return_code: mqtt::ConnectAckReason, pub(crate) shared: Rc, pub(crate) keepalive: Seconds, - pub(crate) outgoing: Option, + pub(crate) max_send: Option, } impl HandshakeAck { @@ -153,8 +153,15 @@ impl HandshakeAck { /// Number of outgoing concurrent messages. /// /// By default outgoing is set to 16 messages + pub fn max_send(mut self, val: u16) -> Self { + self.max_send = Some(val); + self + } + + #[deprecated] + #[doc(hidden)] pub fn max_outgoing(mut self, val: u16) -> Self { - self.outgoing = Some(val); + self.max_send = Some(val); self } } diff --git a/src/v3/server.rs b/src/v3/server.rs index bc4c0a3..f228b6c 100644 --- a/src/v3/server.rs +++ b/src/v3/server.rs @@ -46,10 +46,10 @@ pub struct MqttServer { publish: P, max_qos: QoS, max_size: u32, - max_inflight: u16, - max_inflight_size: usize, - max_outgoing: u16, - max_outgoing_size: (u32, u32), + max_receive: u16, + max_receive_size: usize, + max_send: u16, + max_send_size: (u32, u32), handle_qos_after_disconnect: Option, connect_timeout: Seconds, config: DispatcherConfig, @@ -79,10 +79,10 @@ where publish: DefaultPublishService::default(), max_qos: QoS::AtLeastOnce, max_size: 0, - max_inflight: 16, - max_inflight_size: 65535, - max_outgoing: 16, - max_outgoing_size: (65535, 512), + max_receive: 16, + max_receive_size: 65535, + max_send: 16, + max_send_size: (65535, 512), handle_qos_after_disconnect: None, connect_timeout: Seconds::ZERO, pool: Default::default(), @@ -155,35 +155,49 @@ where self } - /// Number of in-flight concurrent messages. + /// Number of inbound in-flight concurrent messages. /// - /// By default in-flight is set to 16 messages + /// By default inbound is set to 16 messages + pub fn max_receive(mut self, val: u16) -> Self { + self.max_receive = val; + self + } + + #[deprecated] + #[doc(hidden)] pub fn max_inflight(mut self, val: u16) -> Self { - self.max_inflight = val; + self.max_receive = val; self } - /// Total size of in-flight messages. + /// Total size of inbound in-flight messages. /// - /// By default total in-flight size is set to 64Kb + /// By default total inbound in-flight size is set to 64Kb + pub fn max_receive_size(mut self, val: usize) -> Self { + self.max_receive_size = val; + self + } + + #[deprecated] + #[doc(hidden)] pub fn max_inflight_size(mut self, val: usize) -> Self { - self.max_inflight_size = val; + self.max_receive_size = val; self } /// Number of outgoing concurrent messages. /// /// By default outgoing is set to 16 messages - pub fn max_outgoing(mut self, val: u16) -> Self { - self.max_outgoing = val; + pub fn max_send(mut self, val: u16) -> Self { + self.max_send = val; self } /// Total size of outgoing messages. /// /// By default total outgoing size is set to 64Kb - pub fn max_outgoing_size(mut self, val: u32) -> Self { - self.max_outgoing_size = (val, val / 10); + pub fn max_send_size(mut self, val: u32) -> Self { + self.max_send_size = (val, val / 10); self } @@ -230,10 +244,10 @@ where config: self.config, max_qos: self.max_qos, max_size: self.max_size, - max_inflight: self.max_inflight, - max_inflight_size: self.max_inflight_size, - max_outgoing: self.max_outgoing, - max_outgoing_size: self.max_outgoing_size, + max_receive: self.max_receive, + max_receive_size: self.max_receive_size, + max_send: self.max_send, + max_send_size: self.max_send_size, handle_qos_after_disconnect: self.handle_qos_after_disconnect, connect_timeout: self.connect_timeout, pool: self.pool, @@ -255,10 +269,10 @@ where config: self.config, max_qos: self.max_qos, max_size: self.max_size, - max_inflight: self.max_inflight, - max_inflight_size: self.max_inflight_size, - max_outgoing: self.max_outgoing, - max_outgoing_size: self.max_outgoing_size, + max_receive: self.max_receive, + max_receive_size: self.max_receive_size, + max_send: self.max_send, + max_send_size: self.max_send_size, handle_qos_after_disconnect: self.handle_qos_after_disconnect, connect_timeout: self.connect_timeout, pool: self.pool, @@ -290,8 +304,8 @@ where HandshakeFactory { factory: self.handshake, max_size: self.max_size, - max_outgoing: self.max_outgoing, - max_outgoing_size: self.max_outgoing_size, + max_send: self.max_send, + max_send_size: self.max_send_size, connect_timeout: self.connect_timeout, pool: self.pool.clone(), _t: PhantomData, @@ -299,8 +313,8 @@ where factory( self.publish, self.control, - self.max_inflight, - self.max_inflight_size, + self.max_receive, + self.max_receive_size, self.max_qos, self.handle_qos_after_disconnect, ), @@ -312,8 +326,8 @@ where struct HandshakeFactory { factory: H, max_size: u32, - max_outgoing: u16, - max_outgoing_size: (u32, u32), + max_send: u16, + max_send_size: (u32, u32), connect_timeout: Seconds, pool: Rc, _t: PhantomData, @@ -333,8 +347,8 @@ where async fn create(&self, _: ()) -> Result { Ok(HandshakeService { max_size: self.max_size, - max_outgoing: self.max_outgoing, - max_outgoing_size: self.max_outgoing_size, + max_send: self.max_send, + max_send_size: self.max_send_size, pool: self.pool.clone(), service: self.factory.create(()).await?, connect_timeout: self.connect_timeout.into(), @@ -346,8 +360,8 @@ where struct HandshakeService { service: H, max_size: u32, - max_outgoing: u16, - max_outgoing_size: (u32, u32), + max_send: u16, + max_send_size: (u32, u32), pool: Rc, connect_timeout: Millis, _t: PhantomData, @@ -371,7 +385,7 @@ where ) -> Result { log::trace!("Starting mqtt v3 handshake"); - let (h, l) = self.max_outgoing_size; + let (h, l) = self.max_send_size; io.memory_pool().set_write_params(h, l); let codec = mqtt::Codec::default(); @@ -408,7 +422,7 @@ where log::trace!("Sending success handshake ack: {:#?}", pkt); - ack.shared.set_cap(ack.outgoing.unwrap_or(self.max_outgoing) as usize); + ack.shared.set_cap(ack.max_send.unwrap_or(self.max_send) as usize); ack.io.encode(pkt, &ack.shared.codec)?; Ok(( ack.io, diff --git a/src/v5/client/connector.rs b/src/v5/client/connector.rs index 7b8a400..f0ebba1 100644 --- a/src/v5/client/connector.rs +++ b/src/v5/client/connector.rs @@ -118,7 +118,7 @@ where /// /// Number of in-flight incoming publish packets. By default receive max is set to 16 packets. /// To disable in-flight limit set value to 0. - pub fn receive_max(mut self, val: u16) -> Self { + pub fn max_receive(mut self, val: u16) -> Self { if let Some(val) = NonZeroU16::new(val) { self.pkt.receive_max = Some(val); } else { @@ -127,6 +127,12 @@ where self } + #[deprecated] + #[doc(hidden)] + pub fn receive_max(self, val: u16) -> Self { + self.max_receive(val) + } + #[inline] /// Update connect user properties pub fn properties(mut self, f: F) -> Self diff --git a/src/v5/server.rs b/src/v5/server.rs index 265c778..6860e8d 100644 --- a/src/v5/server.rs +++ b/src/v5/server.rs @@ -19,10 +19,10 @@ pub struct MqttServer { handshake: C, srv_control: Cn, srv_publish: P, + max_qos: QoS, max_size: u32, max_receive: u16, - max_qos: QoS, - max_inflight_size: usize, + max_receive_size: usize, max_topic_alias: u16, handle_qos_after_disconnect: Option, connect_timeout: Seconds, @@ -50,10 +50,10 @@ where handshake: handshake.into_factory(), srv_control: DefaultControlService::default(), srv_publish: DefaultPublishService::default(), + max_qos: QoS::AtLeastOnce, max_size: 0, max_receive: 15, - max_qos: QoS::AtLeastOnce, - max_inflight_size: 65535, + max_receive_size: 65535, max_topic_alias: 32, handle_qos_after_disconnect: None, connect_timeout: Seconds::ZERO, @@ -121,6 +121,21 @@ where /// /// Number of in-flight publish packets. By default receive max is set to 15 packets. /// To disable timeout set value to 0. + pub fn max_receive(mut self, val: u16) -> Self { + self.max_receive = val; + self + } + + /// Total size of received in-flight messages. + /// + /// By default total in-flight size is set to 64Kb + pub fn max_receive_size(mut self, val: usize) -> Self { + self.max_receive_size = val; + self + } + + #[deprecated] + #[doc(hidden)] pub fn receive_max(mut self, val: u16) -> Self { self.max_receive = val; self @@ -142,11 +157,10 @@ where self } - /// Total size of in-flight messages. - /// - /// By default total in-flight size is set to 64Kb + #[deprecated] + #[doc(hidden)] pub fn max_inflight_size(mut self, val: usize) -> Self { - self.max_inflight_size = val; + self.max_receive_size = val; self } @@ -192,7 +206,7 @@ where max_receive: self.max_receive, max_topic_alias: self.max_topic_alias, max_qos: self.max_qos, - max_inflight_size: self.max_inflight_size, + max_receive_size: self.max_receive_size, handle_qos_after_disconnect: self.handle_qos_after_disconnect, connect_timeout: self.connect_timeout, pool: self.pool, @@ -218,7 +232,7 @@ where max_receive: self.max_receive, max_topic_alias: self.max_topic_alias, max_qos: self.max_qos, - max_inflight_size: self.max_inflight_size, + max_receive_size: self.max_receive_size, handle_qos_after_disconnect: self.handle_qos_after_disconnect, connect_timeout: self.connect_timeout, pool: self.pool, @@ -275,7 +289,7 @@ where factory( self.srv_publish, self.srv_control, - self.max_inflight_size, + self.max_receive_size, self.handle_qos_after_disconnect, ), self.config, diff --git a/tests/test_server_v5.rs b/tests/test_server_v5.rs index a42a702..fda298a 100644 --- a/tests/test_server_v5.rs +++ b/tests/test_server_v5.rs @@ -491,7 +491,7 @@ async fn test_dups() { async fn test_max_receive() { let srv = server::test_server(move || { MqttServer::new(handshake) - .receive_max(1) + .max_receive(1) .max_qos(codec::QoS::AtLeastOnce) .publish(|p: Publish| async move { sleep(Duration::from_millis(10000)).await;