Skip to content

Commit

Permalink
feat: implement non-blocking read handling in PeekReader and add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
kusstas committed Dec 22, 2024
1 parent f96dacd commit f801bea
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 6 deletions.
16 changes: 10 additions & 6 deletions mavlink-core/src/peek_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,16 +119,20 @@ impl<R: Read, const BUFFER_SIZE: usize> PeekReader<R, BUFFER_SIZE> {

/// Internal function to fetch data from the internal buffer and/or reader
fn fetch(&mut self, amount: usize, consume: bool) -> Result<&[u8], MessageReadError> {
let buffered = self.top - self.cursor;
loop {
let buffered = self.top - self.cursor;

// the caller requested more bytes than we have buffered, fetch them from the reader
if buffered < amount {
let bytes_read = amount - buffered;
assert!(bytes_read < BUFFER_SIZE);
if buffered >= amount {
break;
}

// the caller requested more bytes than we have buffered, fetch them from the reader
let bytes_to_read = amount - buffered;
assert!(bytes_to_read < BUFFER_SIZE);
let mut buf = [0u8; BUFFER_SIZE];

// read needed bytes from reader
self.reader.read_exact(&mut buf[..bytes_read])?;
let bytes_read = self.reader.read(&mut buf[..bytes_to_read])?;

// if some bytes were read, add them to the buffer

Expand Down
36 changes: 36 additions & 0 deletions mavlink/tests/test_shared/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,39 @@ pub fn get_apm_mount_status() -> mavlink::ardupilotmega::MOUNT_STATUS_DATA {
target_component: 3,
}
}

pub struct BlockyReader<'a> {
block_next_read: bool,
data: &'a [u8],
index: usize,
}

impl<'a> BlockyReader<'a> {
pub fn new(data: &'a [u8]) -> Self {
BlockyReader {
block_next_read: true,
data,
index: 0,
}
}
}

impl<'a> std::io::Read for BlockyReader<'a> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
use std::io::{Error, ErrorKind, Result};

if self.block_next_read {
self.block_next_read = false;
Result::Err(Error::new(ErrorKind::WouldBlock, "Test Block"))
} else {
let read = self
.data
.get(self.index)
.ok_or(Error::new(ErrorKind::UnexpectedEof, "EOF"));
buf[0] = *read?;
self.index += 1;
self.block_next_read = true;
Ok(1)
}
}
}
20 changes: 20 additions & 0 deletions mavlink/tests/v1_encode_decode_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,24 @@ mod test_v1_encode_decode {
assert_eq!(raw_msg.raw_bytes(), HEARTBEAT_V1);
assert!(raw_msg.has_valid_crc::<mavlink::common::MavMessage>());
}

#[test]
pub fn test_read_error() {
use std::io::ErrorKind;

use mavlink_core::error::MessageReadError;

let mut reader = PeekReader::new(crate::test_shared::BlockyReader::new(HEARTBEAT_V1));

loop {
match mavlink::read_v1_msg::<mavlink::common::MavMessage, _>(&mut reader) {
Ok((header, _)) => {
assert_eq!(header, crate::test_shared::COMMON_MSG_HEADER);
break;
}
Err(MessageReadError::Io(err)) if err.kind() == ErrorKind::WouldBlock => {}
Err(err) => panic!("{err}"),
}
}
}
}
20 changes: 20 additions & 0 deletions mavlink/tests/v2_encode_decode_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,4 +169,24 @@ mod test_v2_encode_decode {
assert_eq!(raw_msg.raw_bytes(), HEARTBEAT_V2);
assert!(raw_msg.has_valid_crc::<mavlink::common::MavMessage>());
}

#[test]
pub fn test_read_error() {
use std::io::ErrorKind;

use mavlink_core::error::MessageReadError;

let mut reader = PeekReader::new(crate::test_shared::BlockyReader::new(HEARTBEAT_V2));

loop {
match mavlink::read_v2_msg::<mavlink::common::MavMessage, _>(&mut reader) {
Ok((header, _)) => {
assert_eq!(header, crate::test_shared::COMMON_MSG_HEADER);
break;
}
Err(MessageReadError::Io(err)) if err.kind() == ErrorKind::WouldBlock => {}
Err(err) => panic!("{err}"),
}
}
}
}

0 comments on commit f801bea

Please sign in to comment.