Skip to content

Commit

Permalink
Drop unneeded HandshakeError::Server
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Sep 25, 2023
1 parent 2e6fc91 commit 5b29020
Show file tree
Hide file tree
Showing 10 changed files with 42 additions and 27 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [0.12.2] - 2023-09-25

* Drop unneeded HandshakeError::Server

## [0.12.1] - 2023-09-25

* Change handshake timeout behavior (renamed to connect timeout).
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-mqtt"
version = "0.12.1"
version = "0.12.2"
authors = ["ntex contributors <[email protected]>"]
description = "Client and Server framework for MQTT v5 and v3.1.1 protocols"
documentation = "https://docs.rs/ntex-mqtt"
Expand Down
20 changes: 12 additions & 8 deletions examples/mqtt-ws-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use ntex::io::{Filter, Io};
use ntex::service::{chain_factory, ServiceFactory};
use ntex::util::{variant, Ready};
use ntex::ws;
use ntex_mqtt::{v3, v5, HandshakeError, MqttError, MqttServer};
use ntex_mqtt::{v3, v5, HandshakeError, MqttError, MqttServer, ProtocolError};
use ntex_tls::openssl::Acceptor;
use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod};

Expand Down Expand Up @@ -101,9 +101,11 @@ async fn main() -> std::io::Result<()> {
return match result {
Some(Protocol::Mqtt) => Ok(variant::Variant2::V1(io)),
Some(Protocol::Http) => Ok(variant::Variant2::V2(io)),
Some(Protocol::Unknown) => Err(MqttError::Handshake(
HandshakeError::Server("Unsupported protocol"),
)),
Some(Protocol::Unknown) => {
Err(MqttError::Handshake(HandshakeError::Protocol(
ProtocolError::generic_violation("Unsupported protocol"),
)))
}
None => {
// need to read more data
io.read_ready().await?;
Expand Down Expand Up @@ -140,8 +142,10 @@ async fn main() -> std::io::Result<()> {
)
.await?;
return Err(MqttError::Handshake(
HandshakeError::Server(
"WebSockets handshake error",
HandshakeError::Protocol(
ProtocolError::generic_violation(
"WebSockets handshake error",
),
),
));
}
Expand Down Expand Up @@ -178,8 +182,8 @@ async fn main() -> std::io::Result<()> {
// adapt service error to mqtt error
.map_err(|e| {
log::info!("Http server error: {:?}", e);
MqttError::Handshake(HandshakeError::Server(
"Http server error",
MqttError::Handshake(HandshakeError::Protocol(
ProtocolError::generic_violation("Http server error"),
))
})),
)
Expand Down
5 changes: 1 addition & 4 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ pub enum HandshakeError<E> {
/// Peer disconnect
#[error("Peer is disconnected, error: {:?}", _0)]
Disconnected(Option<io::Error>),
/// Server error
#[error("Server error: {}", _0)]
Server(&'static str),
}

/// Protocol level errors
Expand Down Expand Up @@ -81,7 +78,7 @@ impl ProtocolError {
inner: ViolationInner::Common { reason, message },
})
}
pub(crate) fn generic_violation(message: &'static str) -> Self {
pub fn generic_violation(message: &'static str) -> Self {
Self::violation(DisconnectReasonCode::ProtocolError, message)
}

Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ mod session;
mod types;
mod version;

pub use self::error::{HandshakeError, MqttError};
pub use self::error::{HandshakeError, MqttError, ProtocolError};
pub use self::server::MqttServer;
pub use self::session::Session;
pub use self::topic::{TopicFilter, TopicFilterError, TopicFilterLevel};
Expand Down
8 changes: 5 additions & 3 deletions src/v3/client/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ where
if !inner.inflight.borrow_mut().insert(pid) {
log::trace!("Duplicated packet id for publish packet: {:?}", pid);
return Either::Right(Either::Left(Ready::Err(MqttError::Handshake(
HandshakeError::Server("Duplicated packet id for publish packet"),
HandshakeError::Protocol(
ProtocolError::generic_violation("PUBLISH received with packet id that is already in use [MQTT-2.2.1-3]"))
))));
}
}
Expand Down Expand Up @@ -382,8 +383,9 @@ mod tests {
))));
let err = f.await.err().unwrap();
match err {
MqttError::Handshake(HandshakeError::Server(msg)) => {
assert!(msg == "Duplicated packet id for publish packet")
MqttError::Handshake(HandshakeError::Protocol(msg)) => {
assert!(format!("{}", msg)
.contains("PUBLISH received with packet id that is already in use"))
}
_ => panic!(),
}
Expand Down
6 changes: 3 additions & 3 deletions src/v3/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ where
BufferServiceError::Service(e) => {
MqttError::Handshake(HandshakeError::Service(E::from(e)))
}
BufferServiceError::RequestCanceled => MqttError::Handshake(
HandshakeError::Server("Request handling has been canceled"),
),
BufferServiceError::RequestCanceled => {
MqttError::Handshake(HandshakeError::Disconnected(None))
}
});

Ok(
Expand Down
7 changes: 5 additions & 2 deletions src/v3/selector.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{fmt, future::Future, marker, rc::Rc, task::Context, task::Poll};
use std::{fmt, future::Future, io, marker, rc::Rc, task::Context, task::Poll};

use ntex::io::{Filter, Io, IoBoxed};
use ntex::service::{boxed, Service, ServiceCtx, ServiceFactory};
Expand Down Expand Up @@ -325,7 +325,10 @@ where
}
}
log::error!("Cannot handle CONNECT packet {:?}", item.packet());
Err(MqttError::Handshake(HandshakeError::Server("Cannot handle CONNECT packet")))
Err(MqttError::Handshake(HandshakeError::Disconnected(Some(io::Error::new(
io::ErrorKind::Other,
"Cannot handle CONNECT packet",
)))))
})
}
}
6 changes: 3 additions & 3 deletions src/v5/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ where
BufferServiceError::Service(e) => {
MqttError::Handshake(HandshakeError::Service(E::from(e)))
}
BufferServiceError::RequestCanceled => MqttError::Handshake(
HandshakeError::Server("Request handling has been canceled"),
),
BufferServiceError::RequestCanceled => {
MqttError::Handshake(HandshakeError::Disconnected(None))
}
});

Ok(crate::inflight::InFlightService::new(
Expand Down
9 changes: 7 additions & 2 deletions src/v5/selector.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use std::{convert::TryFrom, fmt, future::Future, marker, rc::Rc, task::Context, task::Poll};
use std::{
convert::TryFrom, fmt, future::Future, io, marker, rc::Rc, task::Context, task::Poll,
};

use ntex::io::{Filter, Io, IoBoxed};
use ntex::service::{boxed, Service, ServiceCtx, ServiceFactory};
Expand Down Expand Up @@ -328,7 +330,10 @@ where
}
}
log::error!("Cannot handle CONNECT packet {:?}", item);
Err(MqttError::Handshake(HandshakeError::Server("Cannot handle CONNECT packet")))
Err(MqttError::Handshake(HandshakeError::Disconnected(Some(io::Error::new(
io::ErrorKind::Other,
"Cannot handle CONNECT packet",
)))))
})
}
}

0 comments on commit 5b29020

Please sign in to comment.