Skip to content

Commit

Permalink
Simplified PeekReader for MAVLink and made it friendly
Browse files Browse the repository at this point in the history
  • Loading branch information
Tiago Marques authored and patrickelectric committed Mar 28, 2024
1 parent ad39b28 commit 033b069
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 122 deletions.
6 changes: 2 additions & 4 deletions mavlink-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ use utils::{remove_trailing_zeroes, RustDefault};
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};

#[cfg(feature = "std")]
pub mod peek_reader;
#[cfg(feature = "std")]
use peek_reader::PeekReader;

use crate::{bytes::Bytes, error::ParserError};
Expand Down Expand Up @@ -366,7 +364,7 @@ impl MAVLinkV1MessageRaw {
/// V1 maximum size is 263 bytes: `<https://mavlink.io/en/guide/serialization.html>`
pub fn read_v1_raw_message<M: Message, R: Read>(
reader: &mut PeekReader<R>,
) -> Result<MAVLinkV1MessageRaw, std::io::Error> {
) -> Result<MAVLinkV1MessageRaw, error::MessageReadError> {
loop {
loop {
// search for the magic framing value indicating start of mavlink message
Expand Down Expand Up @@ -594,7 +592,7 @@ impl MAVLinkV2MessageRaw {
/// V2 maximum size is 280 bytes: `<https://mavlink.io/en/guide/serialization.html>`
pub fn read_v2_raw_message<M: Message, R: Read>(
reader: &mut PeekReader<R>,
) -> Result<MAVLinkV2MessageRaw, std::io::Error> {
) -> Result<MAVLinkV2MessageRaw, error::MessageReadError> {
loop {
loop {
// search for the magic framing value indicating start of mavlink message
Expand Down
169 changes: 51 additions & 118 deletions mavlink-core/src/peek_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//!
//! The purpose of the buffered/peekable reader is to allow for backtracking parsers.
//!
//! A reader implementing the standard librairy's [`std::io::BufRead`] trait seems like a good fit, but
//! A reader implementing the standard library's [`std::io::BufRead`] trait seems like a good fit, but
//! it does not allow for peeking a specific number of bytes, so it provides no way to request
//! more data from the underlying reader without consuming the existing data.
//!
Expand All @@ -11,11 +11,13 @@
//! The main type `PeekReader`does not implement [`std::io::Read`] itself, as there is no added benefit
//! in doing so.
//!
#[cfg(feature = "embedded")]
use crate::embedded::Read;

use std::io::{self, ErrorKind, Read};
#[cfg(feature = "std")]
use std::io::Read;

// The default chunk size to read from the underlying reader
const DEFAULT_CHUNK_SIZE: usize = 1024;
use crate::error::MessageReadError;

/// A buffered/peekable reader
///
Expand All @@ -24,51 +26,31 @@ const DEFAULT_CHUNK_SIZE: usize = 1024;
/// It allows the user to `peek` a specified number of bytes (without consuming them),
/// to `read` bytes (consuming them), or to `consume` them after `peek`ing.
///
pub struct PeekReader<R> {
/// NOTE: This reader is generic over the size of the buffer, defaulting to MAVLink's current largest
/// possible message size of 280 bytes
///
pub struct PeekReader<R, const BUFFER_SIZE: usize = 280> {
// Internal buffer
buffer: Vec<u8>,
// The position of the next byte to read in the buffer.
buffer: [u8; BUFFER_SIZE],
// The position of the next byte to read from the buffer.
cursor: usize,
// The preferred chunk size. This is just a hint.
preferred_chunk_size: usize,
// The position of the next byte to read into the buffer.
top: usize,
// The wrapped reader.
reader: R,
// Stashed error, if any.
error: Option<io::Error>,
// Whether we hit EOF on the underlying reader.
eof: bool,
}

impl<R: Read> PeekReader<R> {
/// Instanciates a new [`PeekReader`], wrapping the provided [`std::io::Read`]er and using the default chunk size
impl<R: Read, const BUFFER_SIZE: usize> PeekReader<R, BUFFER_SIZE> {
/// Instantiates a new [`PeekReader`], wrapping the provided [`std::io::Read`]er and using the default chunk size
pub fn new(reader: R) -> Self {
Self::with_chunk_size(reader, DEFAULT_CHUNK_SIZE)
}

/// Instanciates a new [`PeekReader`], wrapping the provided [`std::io::Read`]er and using the supplied chunk size
pub fn with_chunk_size(reader: R, preferred_chunk_size: usize) -> Self {
Self {
buffer: Vec::with_capacity(preferred_chunk_size),
buffer: [0; BUFFER_SIZE],
cursor: 0,
preferred_chunk_size,
top: 0,
reader,
error: None,
eof: false,
}
}

/// Peeks a specified amount of bytes from the internal buffer
///
/// If the internal buffer does not contain enough data, this function will read
/// from the underlying [`std::io::Read`]er until it does, an error occurs or no more data can be read (EOF).
///
/// This function does not consume data from the buffer, so subsequent calls to `peek` or `read` functions
/// will still return the peeked data.
///
pub fn peek(&mut self, amount: usize) -> io::Result<&[u8]> {
self.fetch(amount, false, false)
}

/// Peeks an exact amount of bytes from the internal buffer
///
/// If the internal buffer does not contain enough data, this function will read
Expand All @@ -79,29 +61,9 @@ impl<R: Read> PeekReader<R> {
/// This function does not consume data from the buffer, so subsequent calls to `peek` or `read` functions
/// will still return the peeked data.
///
pub fn peek_exact(&mut self, amount: usize) -> io::Result<&[u8]> {
self.fetch(amount, true, false)
}

/// Consumes a specified amount of bytes from the buffer
///
/// If the internal buffer does not contain enough data, this function will consume as much data as is buffered.
///
pub fn consume(&mut self, amount: usize) -> usize {
let amount = amount.min(self.buffer.len() - self.cursor);
self.cursor += amount;
amount
}

/// Reads a specified amount of bytes from the internal buffer
///
/// If the internal buffer does not contain enough data, this function will read
/// from the underlying [`std::io::Read`]er until it does, an error occurs or no more data can be read (EOF).
///
/// This function consumes the data from the buffer, unless an error occurs, in which case no data is consumed.
///
pub fn read(&mut self, amount: usize) -> io::Result<&[u8]> {
self.fetch(amount, false, true)
pub fn peek_exact(&mut self, amount: usize) -> Result<&[u8], MessageReadError> {
let result = self.fetch(amount, false);
result
}

/// Reads a specified amount of bytes from the internal buffer
Expand All @@ -113,8 +75,8 @@ impl<R: Read> PeekReader<R> {
///
/// This function consumes the data from the buffer, unless an error occurs, in which case no data is consumed.
///
pub fn read_exact(&mut self, amount: usize) -> io::Result<&[u8]> {
self.fetch(amount, true, true)
pub fn read_exact(&mut self, amount: usize) -> Result<&[u8], MessageReadError> {
self.fetch(amount, true)
}

/// Reads a byte from the internal buffer
Expand All @@ -126,11 +88,21 @@ impl<R: Read> PeekReader<R> {
///
/// This function consumes the data from the buffer, unless an error occurs, in which case no data is consumed.
///
pub fn read_u8(&mut self) -> io::Result<u8> {
pub fn read_u8(&mut self) -> Result<u8, MessageReadError> {
let buf = self.read_exact(1)?;
Ok(buf[0])
}

/// Consumes a specified amount of bytes from the buffer
///
/// If the internal buffer does not contain enough data, this function will consume as much data as is buffered.
///
pub fn consume(&mut self, amount: usize) -> usize {
let amount = amount.min(self.top - self.cursor);
self.cursor += amount;
amount
}

/// Returns an immutable reference to the underlying [`std::io::Read`]er
///
/// Reading directly from the underlying stream will cause data loss
Expand All @@ -146,72 +118,33 @@ impl<R: Read> PeekReader<R> {
}

/// Internal function to fetch data from the internal buffer and/or reader
fn fetch(&mut self, amount: usize, exact: bool, consume: bool) -> io::Result<&[u8]> {
let previous_len = self.buffer.len();
let mut buffered = previous_len - self.cursor;
fn fetch(&mut self, amount: usize, consume: bool) -> Result<&[u8], MessageReadError> {
let buffered = self.top - self.cursor;

// the caller requested more bytes tha we have buffered, fetch them from the reader
// the caller requested more bytes than we have buffered, fetch them from the reader
if buffered < amount {
// if we got an earlier EOF, return it
if self.eof {
return Err(io::Error::new(
ErrorKind::UnexpectedEof,
"Unexpected EOF already returned in previous call to reader",
));
}
// if we have a stashed error, return it (and clear it)
if let Some(e) = self.error.take() {
if e.kind() == ErrorKind::UnexpectedEof {
self.eof = true;
}
return Err(e);
}

let needed = amount - buffered;
let chunk_size = self.preferred_chunk_size.max(needed);
let mut buf = vec![0u8; chunk_size];
let bytes_read = amount - buffered;
assert!(bytes_read < BUFFER_SIZE);
let mut buf = [0u8; BUFFER_SIZE];

// read needed bytes from reader
let mut read = 0;
while read < needed {
match self.reader.read(&mut buf[read..]) {
Ok(n) => {
if n == 0 {
break;
}
read += n;
}
Err(e) => {
self.error = Some(e);
break;
}
}
}
self.reader.read_exact(&mut buf[..bytes_read])?;

// if some bytes were read, add them to the buffer
if read > 0 {
if self.buffer.capacity() - previous_len < read {
// reallocate
self.buffer
.copy_within(self.cursor..self.cursor + buffered, 0);
self.buffer.truncate(buffered);
self.cursor = 0;
}
self.buffer.extend_from_slice(&buf[..read]);
buffered += read;
}

if buffered == 0 && self.error.is_some() {
return Err(self.error.take().unwrap());
if self.buffer.len() - self.top < bytes_read {
// reallocate
self.buffer.copy_within(self.cursor..self.top, 0);
self.cursor = 0;
self.top = buffered;
}
}
if exact && buffered < amount {
return Err(io::Error::new(ErrorKind::UnexpectedEof, "Unexpected EOF"));
self.buffer[self.top..self.top + bytes_read].copy_from_slice(&buf[..bytes_read]);
self.top += bytes_read;
}

let result_len = amount.min(buffered);
let result = &self.buffer[self.cursor..self.cursor + result_len];
let result = &self.buffer[self.cursor..self.cursor + amount];
if consume {
self.cursor += result_len;
self.cursor += amount;
}
Ok(result)
}
Expand Down

0 comments on commit 033b069

Please sign in to comment.