Skip to content

Commit

Permalink
cloudevents#9 Encoders for MQTT
Browse files Browse the repository at this point in the history
  • Loading branch information
sbcd90 committed Dec 9, 2020
1 parent a3ad7a9 commit 6405dbb
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 48 deletions.
7 changes: 2 additions & 5 deletions cloudevents-sdk-paho-mqtt/src/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,8 @@ lazy_static! {
}

pub(crate) static SPEC_VERSION_HEADER: &'static str = "ce_specversion";
pub(crate) static MQTT_VERSION_HEADER: &'static str = "ce_mqttversion";
pub(crate) static CLOUDEVENTS_JSON_HEADER: &'static str = "application/cloudevents+json";
pub(crate) static CONTENT_TYPE: &'static str = "content-type";

pub enum MqttVersion {
V3_1,
V3_1_1,
V5,
}
pub(crate) static MQTT_V5_BINARY: &'static str = "V5_BINARY";
1 change: 0 additions & 1 deletion cloudevents-sdk-paho-mqtt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,5 @@ pub use mqtt_consumer_record::record_to_event;
pub use mqtt_consumer_record::ConsumerMessageDeserializer;
pub use mqtt_consumer_record::MessageExt;

pub use headers::MqttVersion;
pub use mqtt_producer_record::MessageBuilderExt;
pub use mqtt_producer_record::MessageRecord;
55 changes: 29 additions & 26 deletions cloudevents-sdk-paho-mqtt/src/mqtt_consumer_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use cloudevents::message::{
BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer,
Result, StructuredDeserializer, StructuredSerializer,
};
use cloudevents::{message, Event};
use cloudevents::{message, Data, Event};
use paho_mqtt::{Message, PropertyCode};
use std::collections::HashMap;
use std::convert::TryFrom;
Expand All @@ -16,7 +16,7 @@ pub struct ConsumerMessageDeserializer {
}

impl ConsumerMessageDeserializer {
fn get_mqtt_headers(message: &Message) -> Result<HashMap<String, Vec<u8>>> {
fn get_mqtt_headers(message: &Message) -> HashMap<String, Vec<u8>> {
let mut hm = HashMap::new();
let prop_iterator = message.properties().iter(PropertyCode::UserProperty);

Expand All @@ -25,12 +25,12 @@ impl ConsumerMessageDeserializer {
hm.insert(header.0.to_string(), Vec::from(header.1));
}

Ok(hm)
hm
}

pub fn new(message: &Message) -> Result<ConsumerMessageDeserializer> {
Ok(ConsumerMessageDeserializer {
headers: Self::get_mqtt_headers(message)?,
headers: Self::get_mqtt_headers(message),
payload: Some(message.payload()).map(|s| Vec::from(s)),
})
}
Expand Down Expand Up @@ -110,22 +110,27 @@ impl MessageDeserializer for ConsumerMessageDeserializer {
fn encoding(&self) -> Encoding {
match (
self.headers
.get("content-type")
.get(headers::CONTENT_TYPE)
.map(|s| String::from_utf8(s.to_vec()).ok())
.flatten()
.map(|s| s.starts_with(headers::CLOUDEVENTS_JSON_HEADER))
.unwrap_or(false),
self.headers.get(headers::SPEC_VERSION_HEADER),
self.headers.get(headers::MQTT_VERSION_HEADER)
.map(|s| String::from_utf8(s.to_vec()).ok())
.flatten()
.map(|s| s.eq(headers::MQTT_V5_BINARY))
.unwrap_or(false),
) {
(true, _) => Encoding::STRUCTURED,
(_, Some(_)) => Encoding::BINARY,
_ => Encoding::UNKNOWN,
(true, true) => Encoding::STRUCTURED,
(_, true) => Encoding::BINARY,
_ => Encoding::STRUCTURED,
}
}
}

pub fn record_to_event(msg: &Message, version: headers::MqttVersion) -> Result<Event> {
match version {
pub fn record_to_event(msg: &Message) -> Result<Event> {
MessageDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?)
/* match version {
headers::MqttVersion::V5 => {
BinaryDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?)
}
Expand All @@ -135,16 +140,16 @@ pub fn record_to_event(msg: &Message, version: headers::MqttVersion) -> Result<E
headers::MqttVersion::V3_1_1 => {
StructuredDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?)
}
}
}*/
}

pub trait MessageExt {
fn to_event(&self, version: headers::MqttVersion) -> Result<Event>;
fn to_event(&self) -> Result<Event>;
}

impl MessageExt for Message {
fn to_event(&self, version: headers::MqttVersion) -> Result<Event> {
record_to_event(self, version)
fn to_event(&self) -> Result<Event> {
record_to_event(self)
}
}

Expand All @@ -155,7 +160,6 @@ mod tests {

use crate::MessageBuilderExt;
use chrono::Utc;
use cloudevents::event::Data;
use cloudevents::{EventBuilder, EventBuilderV10};
use paho_mqtt::MessageBuilder;
use serde_json::json;
Expand All @@ -170,10 +174,10 @@ mod tests {
.time(time)
.source("http://localhost")
.data(
"application/json",
Data::Binary(String::from("{\"hello\":\"world\"}").into_bytes()),
"application/octet-stream",
Data::Binary(String::from("hello rust").into_bytes()),
)
.extension("someint", "10")
.extension("mqttversion", headers::MQTT_V5_BINARY)
.build()
.unwrap();

Expand All @@ -183,11 +187,12 @@ mod tests {
.ty("example.test")
.time(time)
.source("http://localhost")
.extension("someint", "10")
.data("application/json", json!({"hello": "world"}))
.extension("mqttversion", headers::MQTT_V5_BINARY)
.data(
"application/octet-stream",
Data::Binary(String::from("hello rust").into_bytes()))
.build()
.unwrap(),
headers::MqttVersion::V5,
)
.unwrap();

Expand All @@ -197,7 +202,7 @@ mod tests {
.qos(1)
.finalize();

assert_eq!(msg.to_event(headers::MqttVersion::V5).unwrap(), expected)
assert_eq!(msg.to_event().unwrap(), expected)
}

#[test]
Expand All @@ -209,7 +214,6 @@ mod tests {
.ty("example.test")
.source("http://localhost")
.data("application/cloudevents+json", j.clone())
.extension("someint", "10")
.build()
.unwrap();

Expand All @@ -218,7 +222,6 @@ mod tests {
.ty("example.test")
.source("http://localhost")
.data("application/cloudevents+json", j.clone())
.extension("someint", "10")
.build()
.unwrap();

Expand All @@ -232,7 +235,7 @@ mod tests {
.finalize();

assert_eq!(
msg.to_event(headers::MqttVersion::V3_1_1).unwrap(),
msg.to_event().unwrap(),
expected
)
}
Expand Down
17 changes: 6 additions & 11 deletions cloudevents-sdk-paho-mqtt/src/mqtt_producer_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,12 @@ impl MessageRecord {
}
}

pub fn from_event(event: Event, version: headers::MqttVersion) -> Result<Self> {
match version {
headers::MqttVersion::V5 => {
BinaryDeserializer::deserialize_binary(event, MessageRecord::new())
}
headers::MqttVersion::V3_1 => {
StructuredDeserializer::deserialize_structured(event, MessageRecord::new())
}
headers::MqttVersion::V3_1_1 => {
StructuredDeserializer::deserialize_structured(event, MessageRecord::new())
}
pub fn from_event(event: Event) -> Result<Self> {
match event.extension(headers::MQTT_VERSION_HEADER)
.map(|e| e.to_string().eq(headers::MQTT_V5_BINARY))
.unwrap_or(false) {
true => BinaryDeserializer::deserialize_binary(event, MessageRecord::new()),
_ => StructuredDeserializer::deserialize_structured(event, MessageRecord::new())
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions example-projects/paho-mqtt-example/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ fn consume_v3(broker: &str, topic_name: &str) {

while let Some(msg_opt) = strm.next().await {
if let Some(msg) = msg_opt {
let event = msg.to_event(MqttVersion::V3_1_1).unwrap();
let event = msg.to_event().unwrap();
println!("Received Event: {:#?}", event);
}
else {
Expand Down Expand Up @@ -90,7 +90,7 @@ fn consume_v5(broker: &str, topic_name: &str) {

while let Some(msg_opt) = strm.next().await {
if let Some(msg) = msg_opt {
let event = msg.to_event(MqttVersion::V5).unwrap();
let event = msg.to_event().unwrap();
println!("Received Event: {:#?}", event);
}
else {
Expand Down Expand Up @@ -131,12 +131,12 @@ fn produce_v3(broker: &str, topic_name: &str) {
.id("1".to_string())
.ty("example.test")
.source("http://localhost/")
.data("application/json", json!({"hello": "world"}))
.data("application/cloudevents+json", json!({"hello": "world"}))
.build()
.unwrap();

let message_record =
MessageRecord::from_event(event, MqttVersion::V3_1_1).expect("error while serializing the event");
MessageRecord::from_event(event).expect("error while serializing the event");

// Create a message and publish it
let msg = mqtt::MessageBuilder::new()
Expand Down Expand Up @@ -185,7 +185,7 @@ fn produce_v5(broker: &str, topic_name: &str) {
.unwrap();

let message_record =
MessageRecord::from_event(event, MqttVersion::V5).expect("error while serializing the event");
MessageRecord::from_event(event).expect("error while serializing the event");

// Create a message and publish it
let msg = mqtt::MessageBuilder::new()
Expand Down

0 comments on commit 6405dbb

Please sign in to comment.