Skip to content

Commit

Permalink
Merge pull request #2 from bschwind/topic-filters
Browse files Browse the repository at this point in the history
Topic filters
  • Loading branch information
bschwind authored Jan 26, 2020
2 parents 0aff95b + 3bc9acc commit 0b684bf
Show file tree
Hide file tree
Showing 7 changed files with 987 additions and 20 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,9 @@ Testing
-------

$ cargo test

Code Format
-----------
The formatting options currently use nightly-only options.

$ cargo +nightly fmt
8 changes: 4 additions & 4 deletions src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -743,7 +743,7 @@ fn decode_publish_complete(
fn decode_subscribe(
bytes: &mut Cursor<&mut BytesMut>,
remaining_packet_length: u32,
protocol_version: &ProtocolVersion,
protocol_version: ProtocolVersion,
) -> Result<Option<Packet>, DecodeError> {
let start_cursor_pos = bytes.position();

Expand All @@ -752,7 +752,7 @@ fn decode_subscribe(
let mut subscription_identifier = None;
let mut user_properties = vec![];

if *protocol_version == ProtocolVersion::V500 {
if protocol_version == ProtocolVersion::V500 {
return_if_none!(decode_properties(bytes, |property| {
match property {
Property::SubscriptionIdentifier(p) => subscription_identifier = Some(p),
Expand Down Expand Up @@ -1016,7 +1016,7 @@ fn decode_authenticate(
}

fn decode_packet(
protocol_version: &ProtocolVersion,
protocol_version: ProtocolVersion,
packet_type: &PacketType,
bytes: &mut Cursor<&mut BytesMut>,
remaining_packet_length: u32,
Expand All @@ -1043,7 +1043,7 @@ fn decode_packet(

pub fn decode_mqtt(
bytes: &mut BytesMut,
protocol_version: &ProtocolVersion,
protocol_version: ProtocolVersion,
) -> Result<Option<Packet>, DecodeError> {
let mut bytes = Cursor::new(bytes);
let first_byte = read_u8!(bytes);
Expand Down
30 changes: 15 additions & 15 deletions src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ mod tests {

let mut bytes = BytesMut::new();
encode_mqtt(&packet, &mut bytes);
let decoded = decode_mqtt(&mut bytes, &ProtocolVersion::V500).unwrap().unwrap();
let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();

assert_eq!(packet, decoded);
}
Expand Down Expand Up @@ -562,7 +562,7 @@ mod tests {

let mut bytes = BytesMut::new();
encode_mqtt(&packet, &mut bytes);
let decoded = decode_mqtt(&mut bytes, &ProtocolVersion::V500).unwrap().unwrap();
let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();

assert_eq!(packet, decoded);
}
Expand Down Expand Up @@ -591,7 +591,7 @@ mod tests {

let mut bytes = BytesMut::new();
encode_mqtt(&packet, &mut bytes);
let decoded = decode_mqtt(&mut bytes, &ProtocolVersion::V500).unwrap().unwrap();
let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();

assert_eq!(packet, decoded);
}
Expand All @@ -608,7 +608,7 @@ mod tests {

let mut bytes = BytesMut::new();
encode_mqtt(&packet, &mut bytes);
let decoded = decode_mqtt(&mut bytes, &ProtocolVersion::V500).unwrap().unwrap();
let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();

assert_eq!(packet, decoded);
}
Expand All @@ -625,7 +625,7 @@ mod tests {

let mut bytes = BytesMut::new();
encode_mqtt(&packet, &mut bytes);
let decoded = decode_mqtt(&mut bytes, &ProtocolVersion::V500).unwrap().unwrap();
let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();

assert_eq!(packet, decoded);
}
Expand All @@ -642,7 +642,7 @@ mod tests {

let mut bytes = BytesMut::new();
encode_mqtt(&packet, &mut bytes);
let decoded = decode_mqtt(&mut bytes, &ProtocolVersion::V500).unwrap().unwrap();
let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();

assert_eq!(packet, decoded);
}
Expand All @@ -659,7 +659,7 @@ mod tests {

let mut bytes = BytesMut::new();
encode_mqtt(&packet, &mut bytes);
let decoded = decode_mqtt(&mut bytes, &ProtocolVersion::V500).unwrap().unwrap();
let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();

assert_eq!(packet, decoded);
}
Expand All @@ -683,7 +683,7 @@ mod tests {

let mut bytes = BytesMut::new();
encode_mqtt(&packet, &mut bytes);
let decoded = decode_mqtt(&mut bytes, &ProtocolVersion::V500).unwrap().unwrap();
let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();

assert_eq!(packet, decoded);
}
Expand All @@ -701,7 +701,7 @@ mod tests {

let mut bytes = BytesMut::new();
encode_mqtt(&packet, &mut bytes);
let decoded = decode_mqtt(&mut bytes, &ProtocolVersion::V500).unwrap().unwrap();
let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();

assert_eq!(packet, decoded);
}
Expand All @@ -718,7 +718,7 @@ mod tests {

let mut bytes = BytesMut::new();
encode_mqtt(&packet, &mut bytes);
let decoded = decode_mqtt(&mut bytes, &ProtocolVersion::V500).unwrap().unwrap();
let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();

assert_eq!(packet, decoded);
}
Expand All @@ -736,7 +736,7 @@ mod tests {

let mut bytes = BytesMut::new();
encode_mqtt(&packet, &mut bytes);
let decoded = decode_mqtt(&mut bytes, &ProtocolVersion::V500).unwrap().unwrap();
let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();

assert_eq!(packet, decoded);
}
Expand All @@ -746,7 +746,7 @@ mod tests {
let packet = Packet::PingRequest;
let mut bytes = BytesMut::new();
encode_mqtt(&packet, &mut bytes);
let decoded = decode_mqtt(&mut bytes, &ProtocolVersion::V500).unwrap().unwrap();
let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();

assert_eq!(packet, decoded);
}
Expand All @@ -756,7 +756,7 @@ mod tests {
let packet = Packet::PingResponse;
let mut bytes = BytesMut::new();
encode_mqtt(&packet, &mut bytes);
let decoded = decode_mqtt(&mut bytes, &ProtocolVersion::V500).unwrap().unwrap();
let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();

assert_eq!(packet, decoded);
}
Expand All @@ -773,7 +773,7 @@ mod tests {
});
let mut bytes = BytesMut::new();
encode_mqtt(&packet, &mut bytes);
let decoded = decode_mqtt(&mut bytes, &ProtocolVersion::V500).unwrap().unwrap();
let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();

assert_eq!(packet, decoded);
}
Expand All @@ -790,7 +790,7 @@ mod tests {
});
let mut bytes = BytesMut::new();
encode_mqtt(&packet, &mut bytes);
let decoded = decode_mqtt(&mut bytes, &ProtocolVersion::V500).unwrap().unwrap();
let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();

assert_eq!(packet, decoded);
}
Expand Down
3 changes: 2 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ mod broker;
mod client;
mod decoder;
mod encoder;
mod topic;
mod types;

pub struct MqttCodec {
Expand All @@ -28,7 +29,7 @@ impl MqttCodec {

pub fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Packet>, DecodeError> {
// TODO - Ideally we should keep a state machine to store the data we've read so far.
let packet = decoder::decode_mqtt(buf, &self.version);
let packet = decoder::decode_mqtt(buf, self.version);

if let Ok(Some(Packet::Connect(packet))) = &packet {
self.version = packet.protocol_version;
Expand Down
Loading

0 comments on commit 0b684bf

Please sign in to comment.