Skip to content

Commit

Permalink
cloudevents#9 Encoders for MQTT
Browse files Browse the repository at this point in the history
  • Loading branch information
sbcd90 committed Dec 22, 2020
1 parent dc04020 commit fe06610
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 27 deletions.
38 changes: 14 additions & 24 deletions cloudevents-sdk-paho-mqtt/src/mqtt_consumer_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ impl MessageExt for Message {
#[cfg(test)]
mod tests {
use super::*;
use crate::mqtt_producer_record::MessageRecord;

use crate::headers::MqttVersion::{MQTT_3, MQTT_5};
use crate::MessageBuilderExt;
use chrono::Utc;
use cloudevents::event::Data;
Expand All @@ -127,27 +127,23 @@ mod tests {
.build()
.unwrap();

let message_record = MessageRecord::from_event(
EventBuilderV10::new()
.id("0001")
.ty("example.test")
.time(time)
.source("http://localhost")
.extension("someint", "10")
.data("application/json", json!({"hello": "world"}))
.build()
.unwrap(),
headers::MqttVersion::V5,
)
.unwrap();
let event = EventBuilderV10::new()
.id("0001")
.ty("example.test")
.time(time)
.source("http://localhost")
.extension("someint", "10")
.data("application/json", json!({"hello": "world"}))
.build()
.unwrap();

let msg = MessageBuilder::new()
.topic("test")
.message_record(&message_record)
.event(event, MQTT_5)
.qos(1)
.finalize();

assert_eq!(msg.to_event(headers::MqttVersion::V5).unwrap(), expected)
assert_eq!(msg.to_event().unwrap(), expected)
}

#[test]
Expand All @@ -172,18 +168,12 @@ mod tests {
.build()
.unwrap();

let serialized_event =
StructuredDeserializer::deserialize_structured(input, MessageRecord::new()).unwrap();

let msg = MessageBuilder::new()
.topic("test")
.message_record(&serialized_event)
.event(input, MQTT_3)
.qos(1)
.finalize();

assert_eq!(
msg.to_event(headers::MqttVersion::V3_1_1).unwrap(),
expected
)
assert_eq!(msg.to_event().unwrap(), expected)
}
}
12 changes: 9 additions & 3 deletions cloudevents-sdk-paho-mqtt/src/mqtt_producer_record.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::headers;
use crate::headers::MqttVersion::MQTT_5;
use cloudevents::event::SpecVersion;
use cloudevents::message::{
BinaryDeserializer, BinarySerializer, Error, MessageAttributeValue, Result,
Expand All @@ -22,7 +23,7 @@ impl MessageRecord {
}
}

pub fn from_event(event: Event, version: headers::MqttVersion) -> Result<Self> {
pub fn from_event(event: Event, version: &headers::MqttVersion) -> Result<Self> {
match version {
headers::MqttVersion::MQTT_5 => {
BinaryDeserializer::deserialize_binary(event, MessageRecord::new())
Expand Down Expand Up @@ -130,9 +131,14 @@ pub trait MessageBuilderExt {
impl MessageBuilderExt for MessageBuilder {
fn event(mut self, event: Event, version: headers::MqttVersion) -> MessageBuilder {
let message_record =
MessageRecord::from_event(event, version).expect("error while serializing the event");
MessageRecord::from_event(event, &version).expect("error while serializing the event");

self = self.properties(message_record.headers.clone());
match version {
MQTT_5 => {
self = self.properties(message_record.headers.clone());
}
_ => (),
}

if let Some(s) = message_record.payload.as_ref() {
self = self.payload(s.to_vec());
Expand Down

0 comments on commit fe06610

Please sign in to comment.