diff --git a/atat/src/ingress.rs b/atat/src/ingress.rs index b99ca54..c2f8f87 100644 --- a/atat/src/ingress.rs +++ b/atat/src/ingress.rs @@ -58,6 +58,11 @@ pub trait AtatIngress { use embedded_io::Error; loop { let buf = self.write_buf(); + if buf.is_empty() { + warn!("Ingress buffer full, clearing"); + self.clear(); + continue; + } match serial.read(buf).await { Ok(received) => { if received > 0 { @@ -305,6 +310,7 @@ mod tests { self as atat, atat_derive::AtatUrc, digest::parser::take_until_including, response_slot::ResponseSlot, AtDigester, Response, UrcChannel, }; + use embedded_io::ErrorType; use super::*; @@ -449,4 +455,52 @@ mod tests { let response: &Response<100> = &response.borrow(); assert_eq!(&Response::default(), response); } + + #[tokio::test] + async fn read_from_can_recover_from_full_buffer() { + let res_slot = ResponseSlot::<30>::new(); + let urc_channel = UrcChannel::::new(); + let mut buf = [0; 30]; + + let mut ingress: Ingress<_, Urc, 30, 100, 1> = + Ingress::new(AtDigester::::new(), &mut buf, &res_slot, &urc_channel); + + let mut sub = urc_channel.subscribe().unwrap(); + + struct Reader { + data: &'static [u8], + pos: usize, + } + impl ErrorType for Reader { + type Error = embedded_io::ErrorKind; + } + impl embedded_io_async::Read for Reader { + async fn read(&mut self, buf: &mut [u8]) -> Result { + assert!(buf.len() > 0); + if self.pos >= self.data.len() { + // Simulate waiting on more data. + loop { + tokio::task::yield_now().await + } + } + let len = buf.len().min(self.data.len() - self.pos); + buf[..len].copy_from_slice(&self.data[self.pos..self.pos + len]); + self.pos += len; + Ok(len) + } + } + + let mut r = Reader { + // 35 bytes of garbage, followed by a valid URC + data: b"123456789012345678901234567890aaaaa\r\nCONNECT OK\r\n", + pos: 0, + }; + + tokio::select! { + _ = ingress.read_from(&mut r) => {} + m = sub.next_message_pure() => { + assert_eq!(Urc::ConnectOk, m); + } + } + } }