diff --git a/mavlink-core/src/peek_reader.rs b/mavlink-core/src/peek_reader.rs index 5a5a8a11c5..47eb874cb1 100644 --- a/mavlink-core/src/peek_reader.rs +++ b/mavlink-core/src/peek_reader.rs @@ -119,16 +119,20 @@ impl PeekReader { /// Internal function to fetch data from the internal buffer and/or reader fn fetch(&mut self, amount: usize, consume: bool) -> Result<&[u8], MessageReadError> { - let buffered = self.top - self.cursor; + loop { + let buffered = self.top - self.cursor; - // the caller requested more bytes than we have buffered, fetch them from the reader - if buffered < amount { - let bytes_read = amount - buffered; - assert!(bytes_read < BUFFER_SIZE); + if buffered >= amount { + break; + } + + // the caller requested more bytes than we have buffered, fetch them from the reader + let bytes_to_read = amount - buffered; + assert!(bytes_to_read < BUFFER_SIZE); let mut buf = [0u8; BUFFER_SIZE]; // read needed bytes from reader - self.reader.read_exact(&mut buf[..bytes_read])?; + let bytes_read = self.reader.read(&mut buf[..bytes_to_read])?; // if some bytes were read, add them to the buffer diff --git a/mavlink/tests/test_shared/mod.rs b/mavlink/tests/test_shared/mod.rs index 90f019cebd..fea4f0e991 100644 --- a/mavlink/tests/test_shared/mod.rs +++ b/mavlink/tests/test_shared/mod.rs @@ -110,3 +110,39 @@ pub fn get_apm_mount_status() -> mavlink::ardupilotmega::MOUNT_STATUS_DATA { target_component: 3, } } + +pub struct BlockyReader<'a> { + block_next_read: bool, + data: &'a [u8], + index: usize, +} + +impl<'a> BlockyReader<'a> { + pub fn new(data: &'a [u8]) -> Self { + BlockyReader { + block_next_read: true, + data, + index: 0, + } + } +} + +impl<'a> std::io::Read for BlockyReader<'a> { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + use std::io::{Error, ErrorKind, Result}; + + if self.block_next_read { + self.block_next_read = false; + Result::Err(Error::new(ErrorKind::WouldBlock, "Test Block")) + } else { + let read = self + .data + .get(self.index) + .ok_or(Error::new(ErrorKind::UnexpectedEof, "EOF")); + buf[0] = *read?; + self.index += 1; + self.block_next_read = true; + Ok(1) + } + } +} diff --git a/mavlink/tests/v1_encode_decode_tests.rs b/mavlink/tests/v1_encode_decode_tests.rs index 048e97c8e3..7224393686 100644 --- a/mavlink/tests/v1_encode_decode_tests.rs +++ b/mavlink/tests/v1_encode_decode_tests.rs @@ -101,4 +101,24 @@ mod test_v1_encode_decode { assert_eq!(raw_msg.raw_bytes(), HEARTBEAT_V1); assert!(raw_msg.has_valid_crc::()); } + + #[test] + pub fn test_read_error() { + use std::io::ErrorKind; + + use mavlink_core::error::MessageReadError; + + let mut reader = PeekReader::new(crate::test_shared::BlockyReader::new(HEARTBEAT_V1)); + + loop { + match mavlink::read_v1_msg::(&mut reader) { + Ok((header, _)) => { + assert_eq!(header, crate::test_shared::COMMON_MSG_HEADER); + break; + } + Err(MessageReadError::Io(err)) if err.kind() == ErrorKind::WouldBlock => {} + Err(err) => panic!("{err}"), + } + } + } } diff --git a/mavlink/tests/v2_encode_decode_tests.rs b/mavlink/tests/v2_encode_decode_tests.rs index f83eb127a1..31f0d4d4fc 100644 --- a/mavlink/tests/v2_encode_decode_tests.rs +++ b/mavlink/tests/v2_encode_decode_tests.rs @@ -169,4 +169,24 @@ mod test_v2_encode_decode { assert_eq!(raw_msg.raw_bytes(), HEARTBEAT_V2); assert!(raw_msg.has_valid_crc::()); } + + #[test] + pub fn test_read_error() { + use std::io::ErrorKind; + + use mavlink_core::error::MessageReadError; + + let mut reader = PeekReader::new(crate::test_shared::BlockyReader::new(HEARTBEAT_V2)); + + loop { + match mavlink::read_v2_msg::(&mut reader) { + Ok((header, _)) => { + assert_eq!(header, crate::test_shared::COMMON_MSG_HEADER); + break; + } + Err(MessageReadError::Io(err)) if err.kind() == ErrorKind::WouldBlock => {} + Err(err) => panic!("{err}"), + } + } + } }