Skip to content

Commit

Permalink
Extend and fix subscription identifier encoding and decoding (#46)
Browse files Browse the repository at this point in the history
* Fix subscription identifier parsing

The subscription identifier is encoded as var int according to
[MQTT-3.8.2.1.2]. Add a parsing test with and without subscription
identifier.

* Add subscription identifier decoding checks MQTT-3.8.2.1.2

Accorindng to MQTT-3.8.2.1.2 the subscription identifier must be set
exactly once and must not be 0. Extend the property parsing for
subscribe packets with those checks.

* Extend subscription identifiers

The Publish packet can have multiple subscription identifiers according
to MQTT-3.3.2.3.8. Subscription identifiers must not have a value 0.
  • Loading branch information
flxo authored Apr 29, 2022
1 parent f4c3349 commit 986a540
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 28 deletions.
141 changes: 118 additions & 23 deletions mqtt-v5/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ fn decode_property(
Ok(Some(Property::CorrelationData(CorrelationData(correlation_data))))
},
PropertyType::SubscriptionIdentifier => {
let subscription_identifier = read_u32!(bytes);
let subscription_identifier = read_variable_int!(bytes);
Ok(Some(Property::SubscriptionIdentifier(SubscriptionIdentifier(VariableByteInt(
subscription_identifier,
)))))
Expand Down Expand Up @@ -303,6 +303,16 @@ fn decode_property(
fn decode_properties<F: FnMut(Property)>(
bytes: &mut Cursor<&mut BytesMut>,
mut closure: F,
) -> Result<Option<()>, DecodeError> {
try_decode_properties(bytes, |property| {
closure(property);
Ok(())
})
}

fn try_decode_properties<F: FnMut(Property) -> Result<(), DecodeError>>(
bytes: &mut Cursor<&mut BytesMut>,
mut closure: F,
) -> Result<Option<()>, DecodeError> {
let property_length = read_variable_int!(bytes);

Expand All @@ -322,7 +332,7 @@ fn decode_properties<F: FnMut(Property)>(
}

let property = read_property!(bytes);
closure(property);
closure(property)?;
}

Ok(Some(()))
Expand Down Expand Up @@ -564,23 +574,48 @@ fn decode_publish(
let mut response_topic = None;
let mut correlation_data = None;
let mut user_properties = vec![];
let mut subscription_identifier = None;
let mut subscription_identifiers = None;
let mut content_type = None;

if protocol_version == ProtocolVersion::V500 {
return_if_none!(decode_properties(bytes, |property| {
match property {
Property::PayloadFormatIndicator(p) => payload_format_indicator = Some(p),
Property::MessageExpiryInterval(p) => message_expiry_interval = Some(p),
Property::TopicAlias(p) => topic_alias = Some(p),
Property::ResponseTopic(p) => response_topic = Some(p),
Property::CorrelationData(p) => correlation_data = Some(p),
Property::UserProperty(p) => user_properties.push(p),
Property::SubscriptionIdentifier(p) => subscription_identifier = Some(p),
Property::ContentType(p) => content_type = Some(p),
_ => {}, // Invalid property for packet
}
})?);
try_decode_properties(bytes, |property| match property {
Property::PayloadFormatIndicator(p) => {
payload_format_indicator = Some(p);
Ok(())
},
Property::MessageExpiryInterval(p) => {
message_expiry_interval = Some(p);
Ok(())
},
Property::TopicAlias(p) => {
topic_alias = Some(p);
Ok(())
},
Property::ResponseTopic(p) => {
response_topic = Some(p);
Ok(())
},
Property::CorrelationData(p) => {
correlation_data = Some(p);
Ok(())
},
Property::UserProperty(p) => {
user_properties.push(p);
Ok(())
},
Property::SubscriptionIdentifier(SubscriptionIdentifier(VariableByteInt(0))) => {
Err(DecodeError::InvalidSubscriptionIdentifier)
},
Property::SubscriptionIdentifier(p) => {
subscription_identifiers.get_or_insert(Vec::new()).push(p);
Ok(())
},
Property::ContentType(p) => {
content_type = Some(p);
Ok(())
},
_ => Err(DecodeError::InvalidPropertyForPacket),
})?;
}

let end_cursor_pos = bytes.position();
Expand All @@ -592,6 +627,8 @@ fn decode_publish(
}
let payload_size = remaining_packet_length - variable_header_size;
let payload = return_if_none!(decode_binary_data_with_size(bytes, payload_size as usize)?);
let subscription_identifiers =
subscription_identifiers.unwrap_or_else(|| Vec::with_capacity(0));

let packet = PublishPacket {
is_duplicate,
Expand All @@ -607,7 +644,7 @@ fn decode_publish(
response_topic,
correlation_data,
user_properties,
subscription_identifier,
subscription_identifiers,
content_type,

payload,
Expand Down Expand Up @@ -781,13 +818,27 @@ fn decode_subscribe(
let mut user_properties = vec![];

if protocol_version == ProtocolVersion::V500 {
return_if_none!(decode_properties(bytes, |property| {
try_decode_properties(bytes, |property| {
match property {
Property::SubscriptionIdentifier(p) => subscription_identifier = Some(p),
Property::UserProperty(p) => user_properties.push(p),
_ => {}, // Invalid property for packet
// [MQTT-3.8.2.1.2] The subscription identifier is allowed exactly once
Property::SubscriptionIdentifier(_) if subscription_identifier.is_some() => {
Err(DecodeError::InvalidSubscriptionIdentifier)
},
// [MQTT-3.8.2.1.2] The subscription identifier must not be 0
Property::SubscriptionIdentifier(SubscriptionIdentifier(VariableByteInt(0))) => {
Err(DecodeError::InvalidSubscriptionIdentifier)
},
Property::SubscriptionIdentifier(p) => {
subscription_identifier = Some(p);
Ok(())
},
Property::UserProperty(p) => {
user_properties.push(p);
Ok(())
},
_ => Err(DecodeError::InvalidPropertyForPacket),
}
})?);
})?;
}

let variable_header_size = (bytes.position() - start_cursor_pos) as u32;
Expand Down Expand Up @@ -1153,7 +1204,7 @@ pub fn decode_mqtt(

#[cfg(test)]
mod tests {
use crate::{decoder::*, types::*};
use crate::{decoder::*, topic::TopicFilter, types::*};
use bytes::BytesMut;

#[test]
Expand Down Expand Up @@ -1196,4 +1247,48 @@ mod tests {
normal_test(&[0x80, 0x80, 0x80, 0x01], 2097152);
normal_test(&[0xFF, 0xFF, 0xFF, 0x7F], 268435455);
}

#[test]
fn test_decode_subscribe() {
// Subscribe packet *without* Subscription Identifier
let mut without_subscription_identifier = BytesMut::from(
[0x82, 0x0a, 0x00, 0x01, 0x00, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74, 0x00].as_slice(),
);
let without_subscription_identifier_expected = Packet::Subscribe(SubscribePacket {
packet_id: 1,
subscription_identifier: None,
user_properties: vec![],
subscription_topics: vec![SubscriptionTopic {
topic_filter: TopicFilter::Concrete { filter: "test".into(), level_count: 1 },
maximum_qos: QoS::AtMostOnce,
no_local: false,
retain_as_published: false,
retain_handling: RetainHandling::SendAtSubscribeTime,
}],
});
let decoded = decode_mqtt(&mut without_subscription_identifier, ProtocolVersion::V500)
.unwrap()
.unwrap();
assert_eq!(without_subscription_identifier_expected, decoded);

// Subscribe packet with Subscription Identifier
let mut packet = BytesMut::from(
[0x82, 0x0c, 0xff, 0xf6, 0x02, 0x0b, 0x01, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74, 0x02]
.as_slice(),
);
let decoded = decode_mqtt(&mut packet, ProtocolVersion::V500).unwrap().unwrap();
let with_subscription_identifier_expected = Packet::Subscribe(SubscribePacket {
packet_id: 65526,
subscription_identifier: Some(SubscriptionIdentifier(VariableByteInt(1))),
user_properties: vec![],
subscription_topics: vec![SubscriptionTopic {
topic_filter: TopicFilter::Concrete { filter: "test".into(), level_count: 1 },
maximum_qos: QoS::ExactlyOnce,
no_local: false,
retain_as_published: false,
retain_handling: RetainHandling::SendAtSubscribeTime,
}],
});
assert_eq!(with_subscription_identifier_expected, decoded);
}
}
4 changes: 2 additions & 2 deletions mqtt-v5/src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ fn encode_publish(packet: &PublishPacket, bytes: &mut BytesMut, protocol_version
packet.response_topic.encode(bytes);
packet.correlation_data.encode(bytes);
packet.user_properties.encode(bytes);
packet.subscription_identifier.encode(bytes);
packet.subscription_identifiers.encode(bytes);
packet.content_type.encode(bytes);
}

Expand Down Expand Up @@ -660,7 +660,7 @@ mod tests {
response_topic: None,
correlation_data: None,
user_properties: vec![],
subscription_identifier: None,
subscription_identifiers: Vec::with_capacity(0),
content_type: None,

payload: vec![22; 100].into(),
Expand Down
20 changes: 17 additions & 3 deletions mqtt-v5/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub enum DecodeError {
InvalidPublishReleaseReason,
InvalidPublishCompleteReason,
InvalidSubscribeAckReason,
InvalidSubscriptionIdentifier,
InvalidUnsubscribeAckReason,
InvalidAuthenticateReason,
InvalidPropertyId,
Expand Down Expand Up @@ -129,6 +130,14 @@ impl Encode for Vec<UserProperty> {
}
}

impl Encode for Vec<SubscriptionIdentifier> {
fn encode(&self, bytes: &mut BytesMut) {
for identifier in self {
identifier.encode(bytes);
}
}
}

impl PacketSize for u16 {
fn calc_size(&self, _protocol_version: ProtocolVersion) -> u32 {
2
Expand Down Expand Up @@ -317,6 +326,11 @@ pub mod properties {
1 + self.0.calc_size(protocol_version)
}
}
impl PacketSize for Vec<SubscriptionIdentifier> {
fn calc_size(&self, protocol_version: ProtocolVersion) -> u32 {
self.iter().map(|x| x.calc_size(protocol_version)).sum()
}
}

#[derive(Debug, Clone, Copy, PartialEq)]
pub struct SessionExpiryInterval(pub u32);
Expand Down Expand Up @@ -917,7 +931,7 @@ pub struct PublishPacket {
pub response_topic: Option<ResponseTopic>,
pub correlation_data: Option<CorrelationData>,
pub user_properties: Vec<UserProperty>,
pub subscription_identifier: Option<SubscriptionIdentifier>,
pub subscription_identifiers: Vec<SubscriptionIdentifier>,
pub content_type: Option<ContentType>,

// Payload
Expand All @@ -933,7 +947,7 @@ impl PropertySize for PublishPacket {
property_size += self.response_topic.calc_size(protocol_version);
property_size += self.correlation_data.calc_size(protocol_version);
property_size += self.user_properties.calc_size(protocol_version);
property_size += self.subscription_identifier.calc_size(protocol_version);
property_size += self.subscription_identifiers.calc_size(protocol_version);
property_size += self.content_type.calc_size(protocol_version);

property_size
Expand All @@ -958,7 +972,7 @@ impl From<FinalWill> for PublishPacket {
response_topic: will.response_topic,
correlation_data: will.correlation_data,
user_properties: will.user_properties,
subscription_identifier: None,
subscription_identifiers: Vec::with_capacity(0),
content_type: will.content_type,

// Payload
Expand Down

0 comments on commit 986a540

Please sign in to comment.