From c3df8c07ded9c8faa1ff32065a9ffec6bd0f21af Mon Sep 17 00:00:00 2001 From: Felix Obenhuber Date: Mon, 2 May 2022 09:26:00 +0200 Subject: [PATCH] Add subscribe and publish to plugin trait --- mqtt-v5-broker/src/broker.rs | 124 +++++++++++++++++++---------------- mqtt-v5-broker/src/main.rs | 38 +---------- mqtt-v5-broker/src/plugin.rs | 90 +++++++++++++++++++++---- 3 files changed, 145 insertions(+), 107 deletions(-) diff --git a/mqtt-v5-broker/src/broker.rs b/mqtt-v5-broker/src/broker.rs index f3554e1..cc56375 100644 --- a/mqtt-v5-broker/src/broker.rs +++ b/mqtt-v5-broker/src/broker.rs @@ -9,11 +9,10 @@ use mqtt_v5::{ types::{ properties::{AssignedClientIdentifier, SessionExpiryInterval}, AuthenticatePacket, ConnectAckPacket, ConnectPacket, ConnectReason, DisconnectReason, - FinalWill, Packet, ProtocolVersion, PublishAckPacket, PublishAckReason, - PublishCompletePacket, PublishCompleteReason, PublishPacket, PublishReceivedPacket, - PublishReceivedReason, PublishReleasePacket, PublishReleaseReason, QoS, SubscribeAckPacket, - SubscribeAckReason, SubscribePacket, UnsubscribeAckPacket, UnsubscribeAckReason, - UnsubscribePacket, + FinalWill, Packet, ProtocolVersion, PublishAckPacket, PublishCompletePacket, + PublishCompleteReason, PublishPacket, PublishReceivedPacket, PublishReleasePacket, + PublishReleaseReason, QoS, SubscribeAckPacket, SubscribeAckReason, SubscribePacket, + UnsubscribeAckPacket, UnsubscribeAckReason, UnsubscribePacket, }, }; use std::{ @@ -384,10 +383,22 @@ impl Broker { let subscriptions = &mut self.subscriptions; if let Some(session) = self.sessions.get_mut(&client_id) { + let plugin_ack = self.plugin.on_subscribe(&packet); + // If a Server receives a SUBSCRIBE packet containing a Topic Filter that // is identical to a Non‑shared Subscription’s Topic Filter for the current // Session, then it MUST replace that existing Subscription with a new Subscription. - for topic in &packet.subscription_topics { + for (topic, plugin_code) in + packet.subscription_topics.iter().zip(&plugin_ack.reason_codes) + { + // Check only subscriptions that the plugin didn't reject. + match plugin_code { + SubscribeAckReason::GrantedQoSZero + | SubscribeAckReason::GrantedQoSOne + | SubscribeAckReason::GrantedQoSTwo => (), + _ => continue, + } + let topic = &topic.topic_filter; // Unsubscribe the old session from all topics it subscribed to. session.subscription_tokens.retain(|(session_topic, token)| { @@ -405,27 +416,28 @@ impl Broker { let granted_qos_values = packet .subscription_topics .into_iter() - .map(|topic| { - let session_subscription = SessionSubscription { - client_id: client_id.clone(), - maximum_qos: topic.maximum_qos, - }; - let token = subscriptions.insert(&topic.topic_filter, session_subscription); - - session.subscription_tokens.push((topic.topic_filter.clone(), token)); - - match topic.maximum_qos { - QoS::AtMostOnce => SubscribeAckReason::GrantedQoSZero, - QoS::AtLeastOnce => SubscribeAckReason::GrantedQoSOne, - QoS::ExactlyOnce => SubscribeAckReason::GrantedQoSTwo, - } + .zip(plugin_ack.reason_codes) + .map(|(topic, plugin_reason)| match plugin_reason { + SubscribeAckReason::GrantedQoSZero + | SubscribeAckReason::GrantedQoSOne + | SubscribeAckReason::GrantedQoSTwo => { + let session_subscription = SessionSubscription { + client_id: client_id.clone(), + maximum_qos: topic.maximum_qos, + }; + let token = subscriptions.insert(&topic.topic_filter, session_subscription); + + session.subscription_tokens.push((topic.topic_filter.clone(), token)); + plugin_reason + }, + reason => reason, }) .collect(); let subscribe_ack = SubscribeAckPacket { packet_id: packet.packet_id, - reason_string: None, - user_properties: vec![], + reason_string: plugin_ack.reason_string, + user_properties: plugin_ack.user_properties, reason_codes: granted_qos_values, }; @@ -565,55 +577,51 @@ impl Broker { } async fn handle_publish(&mut self, client_id: String, packet: PublishPacket) { - let mut is_dup = false; - - // For QoS2, ensure this packet isn't delivered twice. So if we have an outgoing - // publish receive with the same ID, just send the publish receive again but don't forward - // the message. match packet.qos { - QoS::AtMostOnce => {}, + QoS::AtMostOnce => { + if self.plugin.on_publish_received_qos0(&packet) { + self.publish_message(packet).await; + } + }, QoS::AtLeastOnce => { if let Some(session) = self.sessions.get_mut(&client_id) { - let publish_ack = PublishAckPacket { - packet_id: packet - .packet_id - .expect("Packet with QoS 1 should have a packet ID"), - reason_code: PublishAckReason::Success, - reason_string: None, - user_properties: vec![], - }; - - session.send(ClientMessage::Packet(Packet::PublishAck(publish_ack))).await; + let (publish, publish_ack) = self.plugin.on_publish_received_qos1(&packet); + if let Some(publish_ack) = publish_ack { + session.send(ClientMessage::Packet(Packet::PublishAck(publish_ack))).await; + } + if publish { + self.publish_message(packet).await; + } } }, + // For QoS2, ensure this packet isn't delivered twice. So if we have an outgoing + // publish receive with the same ID, just send the publish receive again but don't forward + // the message. QoS::ExactlyOnce => { if let Some(session) = self.sessions.get_mut(&client_id) { - let packet_id = packet.packet_id.unwrap(); - is_dup = session.outgoing_publish_receives.contains(&packet_id); + let (mut publish, publish_rec) = self.plugin.on_publish_received_qos2(&packet); + + if let Some(publish_recv) = publish_rec { + let packet_id = publish_recv.packet_id; + let is_dup = session.outgoing_publish_receives.contains(&packet_id); + + publish = publish && !is_dup; - if !is_dup { - session.outgoing_publish_receives.push(packet_id) + if !is_dup { + session.outgoing_publish_receives.push(packet_id) + } + + session + .send(ClientMessage::Packet(Packet::PublishReceived(publish_recv))) + .await; } - let publish_recv = PublishReceivedPacket { - packet_id: packet - .packet_id - .expect("Packet with QoS 2 should have a packet ID"), - reason_code: PublishReceivedReason::Success, - reason_string: None, - user_properties: vec![], - }; - - session - .send(ClientMessage::Packet(Packet::PublishReceived(publish_recv))) - .await; + if publish { + self.publish_message(packet).await; + } } }, } - - if !is_dup { - self.publish_message(packet).await; - } } fn handle_publish_ack(&mut self, client_id: String, packet: PublishAckPacket) { diff --git a/mqtt-v5-broker/src/main.rs b/mqtt-v5-broker/src/main.rs index 1a61081..e9125c5 100644 --- a/mqtt-v5-broker/src/main.rs +++ b/mqtt-v5-broker/src/main.rs @@ -1,15 +1,11 @@ use std::{env, io}; use futures::future::try_join_all; -use log::{debug, info, trace}; -use mqtt_v5::types::{ - properties::{AuthenticationData, AuthenticationMethod}, - AuthenticatePacket, PublishPacket, SubscribePacket, -}; +use log::{debug, info}; use mqtt_v5_broker::{ broker::{Broker, BrokerMessage}, client, - plugin::{self}, + plugin::Noop, }; use tokio::{net::TcpListener, sync::mpsc::Sender, task}; @@ -49,39 +45,11 @@ fn init_logging() { } } -#[derive(Default)] -struct TracePlugin; - -impl plugin::Plugin for TracePlugin { - fn on_connect( - &mut self, - _: Option<&AuthenticationMethod>, - _: Option<&AuthenticationData>, - ) -> plugin::AuthentificationResult { - plugin::AuthentificationResult::Success - } - - fn on_authenticate(&mut self, packet: &AuthenticatePacket) -> plugin::AuthentificationResult { - trace!("Authenticate packet received: {:?}", packet); - plugin::AuthentificationResult::Success - } - - fn on_subscribe(&mut self, packet: &SubscribePacket) -> plugin::SubscribeResult { - trace!("Subscribe packet received: {:?}", packet); - plugin::SubscribeResult::Placeholder - } - - fn on_publish_received(&mut self, packet: &PublishPacket) -> plugin::PublishReceivedResult { - trace!("Publish packet received: {:?}", packet); - plugin::PublishReceivedResult::Placeholder - } -} - #[tokio::main] async fn main() -> Result<(), Box> { init_logging(); - let broker = Broker::with_plugin(TracePlugin::default()); + let broker = Broker::with_plugin(Noop); let broker_tx = broker.sender(); let broker = task::spawn(async { broker.run().await; diff --git a/mqtt-v5-broker/src/plugin.rs b/mqtt-v5-broker/src/plugin.rs index ec98611..3152473 100644 --- a/mqtt-v5-broker/src/plugin.rs +++ b/mqtt-v5-broker/src/plugin.rs @@ -1,3 +1,4 @@ +use log::{trace, warn}; use mqtt_v5::types::{ properties::{AuthenticationData, AuthenticationMethod}, *, @@ -15,14 +16,6 @@ pub enum AuthentificationResult { Packet(AuthenticatePacket), } -pub enum PublishReceivedResult { - Placeholder, -} - -pub enum SubscribeResult { - Placeholder, -} - /// Broker plugin pub trait Plugin { /// Called on connect packet reception @@ -35,8 +28,22 @@ pub trait Plugin { /// Called on authenticate packet reception fn on_authenticate(&mut self, packet: &AuthenticatePacket) -> AuthentificationResult; - fn on_subscribe(&mut self, packet: &SubscribePacket) -> SubscribeResult; - fn on_publish_received(&mut self, packet: &PublishPacket) -> PublishReceivedResult; + /// Called on subscribe packets reception + fn on_subscribe(&mut self, packet: &SubscribePacket) -> SubscribeAckPacket; + /// Called on publish packets reception for QoS 0. Return true if the packet should be published to the clients. + fn on_publish_received_qos0(&mut self, packet: &PublishPacket) -> bool; + /// Called on publish packets reception for QoS 1. Return if the packet should be published to the clients and + /// the publish ack packet to be sent to the publisher. + fn on_publish_received_qos1( + &mut self, + packet: &PublishPacket, + ) -> (bool, Option); + /// Called on publish packets reception for QoS 2. Return if the packet should be published to the clients and + /// the publish received packet to be sent to the publisher. + fn on_publish_received_qos2( + &mut self, + packet: &PublishPacket, + ) -> (bool, Option); } /// Default noop authenticator @@ -53,11 +60,66 @@ impl Plugin for Noop { AuthentificationResult::Success } - fn on_subscribe(&mut self, _: &SubscribePacket) -> SubscribeResult { - SubscribeResult::Placeholder + fn on_subscribe(&mut self, packet: &SubscribePacket) -> SubscribeAckPacket { + SubscribeAckPacket { + packet_id: packet.packet_id, + reason_string: None, + user_properties: vec![], + reason_codes: packet + .subscription_topics + .iter() + .inspect(|filter| { + trace!("Granting subscribe to {}", filter.topic_filter); + }) + .map(|filter| match filter.maximum_qos { + QoS::AtMostOnce => SubscribeAckReason::GrantedQoSZero, + QoS::AtLeastOnce => SubscribeAckReason::GrantedQoSOne, + QoS::ExactlyOnce => SubscribeAckReason::GrantedQoSTwo, + }) + .collect(), + } } - fn on_publish_received(&mut self, _: &PublishPacket) -> PublishReceivedResult { - PublishReceivedResult::Placeholder + fn on_publish_received_qos0(&mut self, packet: &PublishPacket) -> bool { + trace!("Granting QoS 0 publish on topic \"{}\"", packet.topic); + true + } + + fn on_publish_received_qos1( + &mut self, + packet: &PublishPacket, + ) -> (bool, Option) { + if let Some(packet_id) = packet.packet_id { + let ack = PublishAckPacket { + packet_id, + reason_code: PublishAckReason::Success, + reason_string: None, + user_properties: Vec::with_capacity(0), + }; + trace!("Granting QoS 1 publish on topic \"{}\"", packet.topic); + (true, Some(ack)) + } else { + warn!("Publish packet with QoS 1 without packet id"); + (false, None) + } + } + + fn on_publish_received_qos2( + &mut self, + packet: &PublishPacket, + ) -> (bool, Option) { + if let Some(packet_id) = packet.packet_id { + let ack = PublishReceivedPacket { + packet_id, + reason_code: PublishReceivedReason::Success, + reason_string: None, + user_properties: Vec::with_capacity(0), + }; + trace!("Granting QoS 2 publish on topic \"{}\"", packet.topic); + (true, Some(ack)) + } else { + warn!("Publish packet with QoS 2 without packet id"); + (false, None) + } } }