From f96dacdc4dc3104d74ac37ffb995390aba5fc7bf Mon Sep 17 00:00:00 2001 From: Stanislav Kusovskyi Date: Thu, 19 Dec 2024 23:06:56 +0000 Subject: [PATCH 1/5] Fix read error leads to skip message for blocking API --- mavlink-core/src/lib.rs | 40 ++++++++++++++++++++-------------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/mavlink-core/src/lib.rs b/mavlink-core/src/lib.rs index 2b5626a263..6d4827a392 100644 --- a/mavlink-core/src/lib.rs +++ b/mavlink-core/src/lib.rs @@ -449,22 +449,20 @@ pub fn read_v1_raw_message( reader: &mut PeekReader, ) -> Result { loop { - loop { - // search for the magic framing value indicating start of mavlink message - if reader.read_u8()? == MAV_STX { - break; - } + // search for the magic framing value indicating start of mavlink message + while reader.peek_exact(1)?[0] != MAV_STX { + reader.consume(1); } let mut message = MAVLinkV1MessageRaw::new(); + let whole_header_size = MAVLinkV1MessageRaw::HEADER_SIZE + 1; message.0[0] = MAV_STX; - let header = &reader.peek_exact(MAVLinkV1MessageRaw::HEADER_SIZE)? - [..MAVLinkV1MessageRaw::HEADER_SIZE]; + let header = &reader.peek_exact(whole_header_size)?[1..whole_header_size]; message.mut_header().copy_from_slice(header); - let packet_length = message.raw_bytes().len() - 1; + let packet_length = message.raw_bytes().len(); let payload_and_checksum = - &reader.peek_exact(packet_length)?[MAVLinkV1MessageRaw::HEADER_SIZE..packet_length]; + &reader.peek_exact(packet_length)?[whole_header_size..packet_length]; message .mut_payload_and_checksum() .copy_from_slice(payload_and_checksum); @@ -472,9 +470,11 @@ pub fn read_v1_raw_message( // retry if CRC failed after previous STX // (an STX byte may appear in the middle of a message) if message.has_valid_crc::() { - reader.consume(message.raw_bytes().len() - 1); + reader.consume(message.raw_bytes().len()); return Ok(message); } + + reader.consume(1); } } @@ -913,36 +913,36 @@ fn read_v2_raw_message_inner( signing_data: Option<&SigningData>, ) -> Result { loop { - loop { - // search for the magic framing value indicating start of mavlink message - if reader.read_u8()? == MAV_STX_V2 { - break; - } + // search for the magic framing value indicating start of mavlink message + while reader.peek_exact(1)?[0] != MAV_STX_V2 { + reader.consume(1); } let mut message = MAVLinkV2MessageRaw::new(); + let whole_header_size = MAVLinkV2MessageRaw::HEADER_SIZE + 1; message.0[0] = MAV_STX_V2; - let header = &reader.peek_exact(MAVLinkV2MessageRaw::HEADER_SIZE)? - [..MAVLinkV2MessageRaw::HEADER_SIZE]; + let header = &reader.peek_exact(whole_header_size)?[1..whole_header_size]; message.mut_header().copy_from_slice(header); if message.incompatibility_flags() & !MAVLINK_SUPPORTED_IFLAGS > 0 { // if there are incompatibility flags set that we do not know discard the message + reader.consume(1); continue; } - let packet_length = message.raw_bytes().len() - 1; + let packet_length = message.raw_bytes().len(); let payload_and_checksum_and_sign = - &reader.peek_exact(packet_length)?[MAVLinkV2MessageRaw::HEADER_SIZE..packet_length]; + &reader.peek_exact(packet_length)?[whole_header_size..packet_length]; message .mut_payload_and_checksum_and_sign() .copy_from_slice(payload_and_checksum_and_sign); if message.has_valid_crc::() { // even if the signature turn out to be invalid the valid crc shows that the received data presents a valid message as opposed to random bytes - reader.consume(message.raw_bytes().len() - 1); + reader.consume(message.raw_bytes().len()); } else { + reader.consume(1); continue; } From f801beaed450543203d1bed23447e28c01dc1342 Mon Sep 17 00:00:00 2001 From: Stanislav Kusovskyi Date: Sun, 22 Dec 2024 15:16:38 +0000 Subject: [PATCH 2/5] feat: implement non-blocking read handling in PeekReader and add tests --- mavlink-core/src/peek_reader.rs | 16 ++++++----- mavlink/tests/test_shared/mod.rs | 36 +++++++++++++++++++++++++ mavlink/tests/v1_encode_decode_tests.rs | 20 ++++++++++++++ mavlink/tests/v2_encode_decode_tests.rs | 20 ++++++++++++++ 4 files changed, 86 insertions(+), 6 deletions(-) diff --git a/mavlink-core/src/peek_reader.rs b/mavlink-core/src/peek_reader.rs index 5a5a8a11c5..47eb874cb1 100644 --- a/mavlink-core/src/peek_reader.rs +++ b/mavlink-core/src/peek_reader.rs @@ -119,16 +119,20 @@ impl PeekReader { /// Internal function to fetch data from the internal buffer and/or reader fn fetch(&mut self, amount: usize, consume: bool) -> Result<&[u8], MessageReadError> { - let buffered = self.top - self.cursor; + loop { + let buffered = self.top - self.cursor; - // the caller requested more bytes than we have buffered, fetch them from the reader - if buffered < amount { - let bytes_read = amount - buffered; - assert!(bytes_read < BUFFER_SIZE); + if buffered >= amount { + break; + } + + // the caller requested more bytes than we have buffered, fetch them from the reader + let bytes_to_read = amount - buffered; + assert!(bytes_to_read < BUFFER_SIZE); let mut buf = [0u8; BUFFER_SIZE]; // read needed bytes from reader - self.reader.read_exact(&mut buf[..bytes_read])?; + let bytes_read = self.reader.read(&mut buf[..bytes_to_read])?; // if some bytes were read, add them to the buffer diff --git a/mavlink/tests/test_shared/mod.rs b/mavlink/tests/test_shared/mod.rs index 90f019cebd..fea4f0e991 100644 --- a/mavlink/tests/test_shared/mod.rs +++ b/mavlink/tests/test_shared/mod.rs @@ -110,3 +110,39 @@ pub fn get_apm_mount_status() -> mavlink::ardupilotmega::MOUNT_STATUS_DATA { target_component: 3, } } + +pub struct BlockyReader<'a> { + block_next_read: bool, + data: &'a [u8], + index: usize, +} + +impl<'a> BlockyReader<'a> { + pub fn new(data: &'a [u8]) -> Self { + BlockyReader { + block_next_read: true, + data, + index: 0, + } + } +} + +impl<'a> std::io::Read for BlockyReader<'a> { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + use std::io::{Error, ErrorKind, Result}; + + if self.block_next_read { + self.block_next_read = false; + Result::Err(Error::new(ErrorKind::WouldBlock, "Test Block")) + } else { + let read = self + .data + .get(self.index) + .ok_or(Error::new(ErrorKind::UnexpectedEof, "EOF")); + buf[0] = *read?; + self.index += 1; + self.block_next_read = true; + Ok(1) + } + } +} diff --git a/mavlink/tests/v1_encode_decode_tests.rs b/mavlink/tests/v1_encode_decode_tests.rs index 048e97c8e3..7224393686 100644 --- a/mavlink/tests/v1_encode_decode_tests.rs +++ b/mavlink/tests/v1_encode_decode_tests.rs @@ -101,4 +101,24 @@ mod test_v1_encode_decode { assert_eq!(raw_msg.raw_bytes(), HEARTBEAT_V1); assert!(raw_msg.has_valid_crc::()); } + + #[test] + pub fn test_read_error() { + use std::io::ErrorKind; + + use mavlink_core::error::MessageReadError; + + let mut reader = PeekReader::new(crate::test_shared::BlockyReader::new(HEARTBEAT_V1)); + + loop { + match mavlink::read_v1_msg::(&mut reader) { + Ok((header, _)) => { + assert_eq!(header, crate::test_shared::COMMON_MSG_HEADER); + break; + } + Err(MessageReadError::Io(err)) if err.kind() == ErrorKind::WouldBlock => {} + Err(err) => panic!("{err}"), + } + } + } } diff --git a/mavlink/tests/v2_encode_decode_tests.rs b/mavlink/tests/v2_encode_decode_tests.rs index f83eb127a1..31f0d4d4fc 100644 --- a/mavlink/tests/v2_encode_decode_tests.rs +++ b/mavlink/tests/v2_encode_decode_tests.rs @@ -169,4 +169,24 @@ mod test_v2_encode_decode { assert_eq!(raw_msg.raw_bytes(), HEARTBEAT_V2); assert!(raw_msg.has_valid_crc::()); } + + #[test] + pub fn test_read_error() { + use std::io::ErrorKind; + + use mavlink_core::error::MessageReadError; + + let mut reader = PeekReader::new(crate::test_shared::BlockyReader::new(HEARTBEAT_V2)); + + loop { + match mavlink::read_v2_msg::(&mut reader) { + Ok((header, _)) => { + assert_eq!(header, crate::test_shared::COMMON_MSG_HEADER); + break; + } + Err(MessageReadError::Io(err)) if err.kind() == ErrorKind::WouldBlock => {} + Err(err) => panic!("{err}"), + } + } + } } From 771564feca07faaab5a3079dfc2672d30b75ab79 Mon Sep 17 00:00:00 2001 From: Stanislav Kusovskyi Date: Sun, 22 Dec 2024 17:44:27 +0000 Subject: [PATCH 3/5] fix: handle unexpected EOF in PeekReader in case zero bytes has read --- mavlink-core/src/peek_reader.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/mavlink-core/src/peek_reader.rs b/mavlink-core/src/peek_reader.rs index 47eb874cb1..8b316d7ad0 100644 --- a/mavlink-core/src/peek_reader.rs +++ b/mavlink-core/src/peek_reader.rs @@ -134,6 +134,12 @@ impl PeekReader { // read needed bytes from reader let bytes_read = self.reader.read(&mut buf[..bytes_to_read])?; + if bytes_read == 0 { + return Err(MessageReadError::Io( + std::io::ErrorKind::UnexpectedEof.into(), + )); + } + // if some bytes were read, add them to the buffer if self.buffer.len() - self.top < bytes_read { From b18d43c0a7c3a8f47ef89094b86468a02268c026 Mon Sep 17 00:00:00 2001 From: Stanislav Kusovskyi Date: Mon, 23 Dec 2024 21:42:23 +0000 Subject: [PATCH 4/5] fix: change reader_ref method to return an immutable reference --- mavlink-core/src/peek_reader.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mavlink-core/src/peek_reader.rs b/mavlink-core/src/peek_reader.rs index 8b316d7ad0..b67da6a32b 100644 --- a/mavlink-core/src/peek_reader.rs +++ b/mavlink-core/src/peek_reader.rs @@ -106,7 +106,7 @@ impl PeekReader { /// Returns an immutable reference to the underlying [`std::io::Read`]er /// /// Reading directly from the underlying stream will cause data loss - pub fn reader_ref(&mut self) -> &R { + pub fn reader_ref(&self) -> &R { &self.reader } From e481084a24572016d383271cba7c16f7daac3b15 Mon Sep 17 00:00:00 2001 From: Stanislav Kusovskyi Date: Tue, 24 Dec 2024 13:17:51 +0000 Subject: [PATCH 5/5] feat: add read method to Read trait for embedded traits --- mavlink-core/src/embedded.rs | 4 ++++ mavlink-core/src/error.rs | 9 +++++++++ mavlink-core/src/peek_reader.rs | 4 +--- 3 files changed, 14 insertions(+), 3 deletions(-) diff --git a/mavlink-core/src/embedded.rs b/mavlink-core/src/embedded.rs index 96917e506c..3d1e7e5e14 100644 --- a/mavlink-core/src/embedded.rs +++ b/mavlink-core/src/embedded.rs @@ -5,6 +5,10 @@ const _: () = panic!("Only one of 'embedded' and 'embedded-hal-02' features can /// Replacement for std::io::Read + byteorder::ReadBytesExt in no_std envs pub trait Read { + fn read(&mut self, buf: &mut [u8]) -> Result { + self.read_exact(buf).map(|_| buf.len()) + } + fn read_exact(&mut self, buf: &mut [u8]) -> Result<(), MessageReadError>; } diff --git a/mavlink-core/src/error.rs b/mavlink-core/src/error.rs index 49394f8d64..08e0bed744 100644 --- a/mavlink-core/src/error.rs +++ b/mavlink-core/src/error.rs @@ -37,6 +37,15 @@ pub enum MessageReadError { Parse(ParserError), } +impl MessageReadError { + pub fn eof() -> Self { + #[cfg(feature = "std")] + return Self::Io(std::io::ErrorKind::UnexpectedEof.into()); + #[cfg(any(feature = "embedded", feature = "embedded-hal-02"))] + return Self::Io; + } +} + impl Display for MessageReadError { fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { match self { diff --git a/mavlink-core/src/peek_reader.rs b/mavlink-core/src/peek_reader.rs index b67da6a32b..ec3c1de20b 100644 --- a/mavlink-core/src/peek_reader.rs +++ b/mavlink-core/src/peek_reader.rs @@ -135,9 +135,7 @@ impl PeekReader { let bytes_read = self.reader.read(&mut buf[..bytes_to_read])?; if bytes_read == 0 { - return Err(MessageReadError::Io( - std::io::ErrorKind::UnexpectedEof.into(), - )); + return Err(MessageReadError::eof()); } // if some bytes were read, add them to the buffer