Skip to content

Commit

Permalink
Add subscribe and publish to plugin trait
Browse files Browse the repository at this point in the history
  • Loading branch information
flxo committed May 2, 2022
1 parent 8211692 commit c3df8c0
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 107 deletions.
124 changes: 66 additions & 58 deletions mqtt-v5-broker/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@ use mqtt_v5::{
types::{
properties::{AssignedClientIdentifier, SessionExpiryInterval},
AuthenticatePacket, ConnectAckPacket, ConnectPacket, ConnectReason, DisconnectReason,
FinalWill, Packet, ProtocolVersion, PublishAckPacket, PublishAckReason,
PublishCompletePacket, PublishCompleteReason, PublishPacket, PublishReceivedPacket,
PublishReceivedReason, PublishReleasePacket, PublishReleaseReason, QoS, SubscribeAckPacket,
SubscribeAckReason, SubscribePacket, UnsubscribeAckPacket, UnsubscribeAckReason,
UnsubscribePacket,
FinalWill, Packet, ProtocolVersion, PublishAckPacket, PublishCompletePacket,
PublishCompleteReason, PublishPacket, PublishReceivedPacket, PublishReleasePacket,
PublishReleaseReason, QoS, SubscribeAckPacket, SubscribeAckReason, SubscribePacket,
UnsubscribeAckPacket, UnsubscribeAckReason, UnsubscribePacket,
},
};
use std::{
Expand Down Expand Up @@ -384,10 +383,22 @@ impl<A: Plugin> Broker<A> {
let subscriptions = &mut self.subscriptions;

if let Some(session) = self.sessions.get_mut(&client_id) {
let plugin_ack = self.plugin.on_subscribe(&packet);

// If a Server receives a SUBSCRIBE packet containing a Topic Filter that
// is identical to a Non‑shared Subscription’s Topic Filter for the current
// Session, then it MUST replace that existing Subscription with a new Subscription.
for topic in &packet.subscription_topics {
for (topic, plugin_code) in
packet.subscription_topics.iter().zip(&plugin_ack.reason_codes)
{
// Check only subscriptions that the plugin didn't reject.
match plugin_code {
SubscribeAckReason::GrantedQoSZero
| SubscribeAckReason::GrantedQoSOne
| SubscribeAckReason::GrantedQoSTwo => (),
_ => continue,
}

let topic = &topic.topic_filter;
// Unsubscribe the old session from all topics it subscribed to.
session.subscription_tokens.retain(|(session_topic, token)| {
Expand All @@ -405,27 +416,28 @@ impl<A: Plugin> Broker<A> {
let granted_qos_values = packet
.subscription_topics
.into_iter()
.map(|topic| {
let session_subscription = SessionSubscription {
client_id: client_id.clone(),
maximum_qos: topic.maximum_qos,
};
let token = subscriptions.insert(&topic.topic_filter, session_subscription);

session.subscription_tokens.push((topic.topic_filter.clone(), token));

match topic.maximum_qos {
QoS::AtMostOnce => SubscribeAckReason::GrantedQoSZero,
QoS::AtLeastOnce => SubscribeAckReason::GrantedQoSOne,
QoS::ExactlyOnce => SubscribeAckReason::GrantedQoSTwo,
}
.zip(plugin_ack.reason_codes)
.map(|(topic, plugin_reason)| match plugin_reason {
SubscribeAckReason::GrantedQoSZero
| SubscribeAckReason::GrantedQoSOne
| SubscribeAckReason::GrantedQoSTwo => {
let session_subscription = SessionSubscription {
client_id: client_id.clone(),
maximum_qos: topic.maximum_qos,
};
let token = subscriptions.insert(&topic.topic_filter, session_subscription);

session.subscription_tokens.push((topic.topic_filter.clone(), token));
plugin_reason
},
reason => reason,
})
.collect();

let subscribe_ack = SubscribeAckPacket {
packet_id: packet.packet_id,
reason_string: None,
user_properties: vec![],
reason_string: plugin_ack.reason_string,
user_properties: plugin_ack.user_properties,
reason_codes: granted_qos_values,
};

Expand Down Expand Up @@ -565,55 +577,51 @@ impl<A: Plugin> Broker<A> {
}

async fn handle_publish(&mut self, client_id: String, packet: PublishPacket) {
let mut is_dup = false;

// For QoS2, ensure this packet isn't delivered twice. So if we have an outgoing
// publish receive with the same ID, just send the publish receive again but don't forward
// the message.
match packet.qos {
QoS::AtMostOnce => {},
QoS::AtMostOnce => {
if self.plugin.on_publish_received_qos0(&packet) {
self.publish_message(packet).await;
}
},
QoS::AtLeastOnce => {
if let Some(session) = self.sessions.get_mut(&client_id) {
let publish_ack = PublishAckPacket {
packet_id: packet
.packet_id
.expect("Packet with QoS 1 should have a packet ID"),
reason_code: PublishAckReason::Success,
reason_string: None,
user_properties: vec![],
};

session.send(ClientMessage::Packet(Packet::PublishAck(publish_ack))).await;
let (publish, publish_ack) = self.plugin.on_publish_received_qos1(&packet);
if let Some(publish_ack) = publish_ack {
session.send(ClientMessage::Packet(Packet::PublishAck(publish_ack))).await;
}
if publish {
self.publish_message(packet).await;
}
}
},
// For QoS2, ensure this packet isn't delivered twice. So if we have an outgoing
// publish receive with the same ID, just send the publish receive again but don't forward
// the message.
QoS::ExactlyOnce => {
if let Some(session) = self.sessions.get_mut(&client_id) {
let packet_id = packet.packet_id.unwrap();
is_dup = session.outgoing_publish_receives.contains(&packet_id);
let (mut publish, publish_rec) = self.plugin.on_publish_received_qos2(&packet);

if let Some(publish_recv) = publish_rec {
let packet_id = publish_recv.packet_id;
let is_dup = session.outgoing_publish_receives.contains(&packet_id);

publish = publish && !is_dup;

if !is_dup {
session.outgoing_publish_receives.push(packet_id)
if !is_dup {
session.outgoing_publish_receives.push(packet_id)
}

session
.send(ClientMessage::Packet(Packet::PublishReceived(publish_recv)))
.await;
}

let publish_recv = PublishReceivedPacket {
packet_id: packet
.packet_id
.expect("Packet with QoS 2 should have a packet ID"),
reason_code: PublishReceivedReason::Success,
reason_string: None,
user_properties: vec![],
};

session
.send(ClientMessage::Packet(Packet::PublishReceived(publish_recv)))
.await;
if publish {
self.publish_message(packet).await;
}
}
},
}

if !is_dup {
self.publish_message(packet).await;
}
}

fn handle_publish_ack(&mut self, client_id: String, packet: PublishAckPacket) {
Expand Down
38 changes: 3 additions & 35 deletions mqtt-v5-broker/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
use std::{env, io};

use futures::future::try_join_all;
use log::{debug, info, trace};
use mqtt_v5::types::{
properties::{AuthenticationData, AuthenticationMethod},
AuthenticatePacket, PublishPacket, SubscribePacket,
};
use log::{debug, info};
use mqtt_v5_broker::{
broker::{Broker, BrokerMessage},
client,
plugin::{self},
plugin::Noop,
};
use tokio::{net::TcpListener, sync::mpsc::Sender, task};

Expand Down Expand Up @@ -49,39 +45,11 @@ fn init_logging() {
}
}

#[derive(Default)]
struct TracePlugin;

impl plugin::Plugin for TracePlugin {
fn on_connect(
&mut self,
_: Option<&AuthenticationMethod>,
_: Option<&AuthenticationData>,
) -> plugin::AuthentificationResult {
plugin::AuthentificationResult::Success
}

fn on_authenticate(&mut self, packet: &AuthenticatePacket) -> plugin::AuthentificationResult {
trace!("Authenticate packet received: {:?}", packet);
plugin::AuthentificationResult::Success
}

fn on_subscribe(&mut self, packet: &SubscribePacket) -> plugin::SubscribeResult {
trace!("Subscribe packet received: {:?}", packet);
plugin::SubscribeResult::Placeholder
}

fn on_publish_received(&mut self, packet: &PublishPacket) -> plugin::PublishReceivedResult {
trace!("Publish packet received: {:?}", packet);
plugin::PublishReceivedResult::Placeholder
}
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
init_logging();

let broker = Broker::with_plugin(TracePlugin::default());
let broker = Broker::with_plugin(Noop);
let broker_tx = broker.sender();
let broker = task::spawn(async {
broker.run().await;
Expand Down
90 changes: 76 additions & 14 deletions mqtt-v5-broker/src/plugin.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use log::{trace, warn};
use mqtt_v5::types::{
properties::{AuthenticationData, AuthenticationMethod},
*,
Expand All @@ -15,14 +16,6 @@ pub enum AuthentificationResult {
Packet(AuthenticatePacket),
}

pub enum PublishReceivedResult {
Placeholder,
}

pub enum SubscribeResult {
Placeholder,
}

/// Broker plugin
pub trait Plugin {
/// Called on connect packet reception
Expand All @@ -35,8 +28,22 @@ pub trait Plugin {
/// Called on authenticate packet reception
fn on_authenticate(&mut self, packet: &AuthenticatePacket) -> AuthentificationResult;

fn on_subscribe(&mut self, packet: &SubscribePacket) -> SubscribeResult;
fn on_publish_received(&mut self, packet: &PublishPacket) -> PublishReceivedResult;
/// Called on subscribe packets reception
fn on_subscribe(&mut self, packet: &SubscribePacket) -> SubscribeAckPacket;
/// Called on publish packets reception for QoS 0. Return true if the packet should be published to the clients.
fn on_publish_received_qos0(&mut self, packet: &PublishPacket) -> bool;
/// Called on publish packets reception for QoS 1. Return if the packet should be published to the clients and
/// the publish ack packet to be sent to the publisher.
fn on_publish_received_qos1(
&mut self,
packet: &PublishPacket,
) -> (bool, Option<PublishAckPacket>);
/// Called on publish packets reception for QoS 2. Return if the packet should be published to the clients and
/// the publish received packet to be sent to the publisher.
fn on_publish_received_qos2(
&mut self,
packet: &PublishPacket,
) -> (bool, Option<PublishReceivedPacket>);
}

/// Default noop authenticator
Expand All @@ -53,11 +60,66 @@ impl Plugin for Noop {
AuthentificationResult::Success
}

fn on_subscribe(&mut self, _: &SubscribePacket) -> SubscribeResult {
SubscribeResult::Placeholder
fn on_subscribe(&mut self, packet: &SubscribePacket) -> SubscribeAckPacket {
SubscribeAckPacket {
packet_id: packet.packet_id,
reason_string: None,
user_properties: vec![],
reason_codes: packet
.subscription_topics
.iter()
.inspect(|filter| {
trace!("Granting subscribe to {}", filter.topic_filter);
})
.map(|filter| match filter.maximum_qos {
QoS::AtMostOnce => SubscribeAckReason::GrantedQoSZero,
QoS::AtLeastOnce => SubscribeAckReason::GrantedQoSOne,
QoS::ExactlyOnce => SubscribeAckReason::GrantedQoSTwo,
})
.collect(),
}
}

fn on_publish_received(&mut self, _: &PublishPacket) -> PublishReceivedResult {
PublishReceivedResult::Placeholder
fn on_publish_received_qos0(&mut self, packet: &PublishPacket) -> bool {
trace!("Granting QoS 0 publish on topic \"{}\"", packet.topic);
true
}

fn on_publish_received_qos1(
&mut self,
packet: &PublishPacket,
) -> (bool, Option<PublishAckPacket>) {
if let Some(packet_id) = packet.packet_id {
let ack = PublishAckPacket {
packet_id,
reason_code: PublishAckReason::Success,
reason_string: None,
user_properties: Vec::with_capacity(0),
};
trace!("Granting QoS 1 publish on topic \"{}\"", packet.topic);
(true, Some(ack))
} else {
warn!("Publish packet with QoS 1 without packet id");
(false, None)
}
}

fn on_publish_received_qos2(
&mut self,
packet: &PublishPacket,
) -> (bool, Option<PublishReceivedPacket>) {
if let Some(packet_id) = packet.packet_id {
let ack = PublishReceivedPacket {
packet_id,
reason_code: PublishReceivedReason::Success,
reason_string: None,
user_properties: Vec::with_capacity(0),
};
trace!("Granting QoS 2 publish on topic \"{}\"", packet.topic);
(true, Some(ack))
} else {
warn!("Publish packet with QoS 2 without packet id");
(false, None)
}
}
}

0 comments on commit c3df8c0

Please sign in to comment.