From aac2c2faab816b83e8a0ba4935537f83ad92cffa Mon Sep 17 00:00:00 2001 From: Rasmus Melchior Jacobsen Date: Thu, 7 Dec 2023 15:38:36 +0100 Subject: [PATCH 01/23] Let heapless be optional in serde_at --- serde_at/Cargo.toml | 5 +++-- serde_at/src/de/mod.rs | 3 ++- serde_at/src/lib.rs | 5 ++++- serde_at/src/ser/mod.rs | 30 ++++++++++++++++++++---------- 4 files changed, 29 insertions(+), 14 deletions(-) diff --git a/serde_at/Cargo.toml b/serde_at/Cargo.toml index 514c49ab..b7921389 100644 --- a/serde_at/Cargo.toml +++ b/serde_at/Cargo.toml @@ -12,9 +12,9 @@ name = "serde_at" version = "0.20.0" [dependencies] -heapless = { version = "^0.8", features = ["serde"] } +heapless = { version = "^0.8", features = ["serde"], optional = true } serde = { version = "^1", default-features = false } -heapless-bytes = { version = "0.3.0" } +heapless-bytes = { version = "0.3.0", optional = true } [dependencies.num-traits] version = "0.2" @@ -29,3 +29,4 @@ default = [] custom-error-messages = [] std = [] hex_str_arrays = [] +heapless = ["dep:heapless", "dep:heapless-bytes"] diff --git a/serde_at/src/de/mod.rs b/serde_at/src/de/mod.rs index bbbee439..74b97c96 100644 --- a/serde_at/src/de/mod.rs +++ b/serde_at/src/de/mod.rs @@ -10,6 +10,7 @@ use self::map::MapAccess; use self::seq::SeqAccess; mod enum_; +#[cfg(feature = "heapless")] pub mod length_delimited; mod map; mod seq; @@ -744,7 +745,7 @@ where from_slice(s.as_bytes()) } -#[cfg(test)] +#[cfg(all(test, feature = "heapless"))] mod tests { use super::length_delimited::LengthDelimited; use heapless::String; diff --git a/serde_at/src/lib.rs b/serde_at/src/lib.rs index 8ccc3ddb..eb768b2f 100644 --- a/serde_at/src/lib.rs +++ b/serde_at/src/lib.rs @@ -21,7 +21,10 @@ pub use serde; #[doc(inline)] pub use self::de::{from_slice, from_str, hex_str::HexStr}; #[doc(inline)] -pub use self::ser::{to_slice, to_string, to_vec, SerializeOptions}; +pub use self::ser::{to_slice, SerializeOptions}; + +#[cfg(feature = "heapless")] +pub use self::ser::{to_string, to_vec}; use core::mem::MaybeUninit; diff --git a/serde_at/src/ser/mod.rs b/serde_at/src/ser/mod.rs index 8cf8332a..a1b8b7af 100644 --- a/serde_at/src/ser/mod.rs +++ b/serde_at/src/ser/mod.rs @@ -1,12 +1,11 @@ //! Serialize a Rust data structure into AT Command strings -use core::fmt::{self, Write}; +use core::fmt; use serde::ser; -use heapless::{String, Vec}; - mod enum_; +#[cfg(feature = "heapless")] mod hex_str; mod struct_; @@ -167,15 +166,24 @@ macro_rules! serialize_signed { }}; } +#[cfg(feature = "heapless")] macro_rules! serialize_fmt { ($self:ident, $N:expr, $fmt:expr, $v:expr) => {{ - let mut s: String<$N> = String::new(); + use fmt::Write; + let mut s: heapless::String<$N> = heapless::String::new(); write!(&mut s, $fmt, $v).unwrap(); $self.extend_from_slice(s.as_bytes())?; Ok(()) }}; } +#[cfg(not(feature = "heapless"))] +macro_rules! serialize_fmt { + ($self:ident, $N:expr, $fmt:expr, $v:expr) => {{ + todo!() + }}; +} + impl<'a, 'b> ser::Serializer for &'a mut Serializer<'b> { type Ok = (); type Error = Error; @@ -388,29 +396,31 @@ impl<'a, 'b> ser::Serializer for &'a mut Serializer<'b> { } } +#[cfg(feature = "heapless")] /// Serializes the given data structure as a string pub fn to_string( value: &T, cmd: &str, options: SerializeOptions<'_>, -) -> Result> +) -> Result> where T: ser::Serialize + ?Sized, { - let vec: Vec = to_vec(value, cmd, options)?; - Ok(unsafe { String::from_utf8_unchecked(vec) }) + let vec: heapless::Vec = to_vec(value, cmd, options)?; + Ok(unsafe { heapless::String::from_utf8_unchecked(vec) }) } +#[cfg(feature = "heapless")] /// Serializes the given data structure as a byte vector pub fn to_vec( value: &T, cmd: &str, options: SerializeOptions<'_>, -) -> Result> +) -> Result> where T: ser::Serialize + ?Sized, { - let mut buf = Vec::new(); + let mut buf = heapless::Vec::new(); buf.resize_default(N).map_err(|_| Error::BufferFull)?; let len = to_slice(value, cmd, &mut buf, options)?; buf.truncate(len); @@ -505,7 +515,7 @@ impl ser::SerializeTuple for Unreachable { } } -#[cfg(test)] +#[cfg(all(test, feature = "heapless"))] mod tests { use super::*; use crate::HexStr; From bc7b0d4431a4faf33f7a92a1b7a7d98c7ba73630 Mon Sep 17 00:00:00 2001 From: Rasmus Melchior Jacobsen Date: Thu, 7 Dec 2023 15:40:56 +0100 Subject: [PATCH 02/23] Remove length const generic on AtatCmd --- atat/Cargo.toml | 3 +++ atat/src/asynch/client.rs | 39 ++++++++++++++++++++----------------- atat/src/asynch/mod.rs | 11 ++++------- atat/src/blocking/client.rs | 38 +++++++++++++++++++----------------- atat/src/blocking/mod.rs | 7 ++----- atat/src/buffers.rs | 18 +++++++++-------- atat/src/derive.rs | 24 ++++++++++++----------- atat/src/lib.rs | 18 ++++++++++------- atat/src/traits.rs | 30 ++++++++++++++-------------- atat_derive/src/cmd.rs | 16 +++------------ 10 files changed, 102 insertions(+), 102 deletions(-) diff --git a/atat/Cargo.toml b/atat/Cargo.toml index edb41143..dfc460f8 100644 --- a/atat/Cargo.toml +++ b/atat/Cargo.toml @@ -37,6 +37,9 @@ defmt = { version = "^0.3", optional = true } [dev-dependencies] embassy-time = { version = "0.2", features = ["std", "generic-queue"] } critical-section = { version = "1.1", features = ["std"] } +serde_at = { path = "../serde_at", version = "^0.20.0", features = [ + "heapless", +] } tokio = { version = "1", features = ["macros", "rt"] } [features] diff --git a/atat/src/asynch/client.rs b/atat/src/asynch/client.rs index 5bde8046..87e5a5f5 100644 --- a/atat/src/asynch/client.rs +++ b/atat/src/asynch/client.rs @@ -14,6 +14,7 @@ pub struct Client<'a, W: Write, const INGRESS_BUF_SIZE: usize> { res_channel: &'a ResponseChannel, config: Config, cooldown_timer: Option, + buf: &'a mut [u8], } impl<'a, W: Write, const INGRESS_BUF_SIZE: usize> Client<'a, W, INGRESS_BUF_SIZE> { @@ -21,19 +22,21 @@ impl<'a, W: Write, const INGRESS_BUF_SIZE: usize> Client<'a, W, INGRESS_BUF_SIZE writer: W, res_channel: &'a ResponseChannel, config: Config, + buf: &'a mut [u8], ) -> Self { Self { writer, res_channel, config, cooldown_timer: None, + buf, } } - async fn send_command(&mut self, cmd: &[u8]) -> Result<(), Error> { + async fn send_command(&mut self, len: usize) -> Result<(), Error> { self.wait_cooldown_timer().await; - self.send_inner(cmd).await?; + self.send_inner(len).await?; self.start_cooldown_timer(); Ok(()) @@ -41,13 +44,13 @@ impl<'a, W: Write, const INGRESS_BUF_SIZE: usize> Client<'a, W, INGRESS_BUF_SIZE async fn send_request( &mut self, - cmd: &[u8], + len: usize, timeout: Duration, ) -> Result, Error> { self.wait_cooldown_timer().await; let mut response_subscription = self.res_channel.subscriber().unwrap(); - self.send_inner(cmd).await?; + self.send_inner(len).await?; let response = self .with_timeout(timeout, response_subscription.next_message_pure()) @@ -58,14 +61,17 @@ impl<'a, W: Write, const INGRESS_BUF_SIZE: usize> Client<'a, W, INGRESS_BUF_SIZE response } - async fn send_inner(&mut self, cmd: &[u8]) -> Result<(), Error> { - if cmd.len() < 50 { - debug!("Sending command: {:?}", LossyStr(cmd)); + async fn send_inner(&mut self, len: usize) -> Result<(), Error> { + if len < 50 { + debug!("Sending command: {:?}", LossyStr(&self.buf[..len])); } else { - debug!("Sending command with long payload ({} bytes)", cmd.len(),); + debug!("Sending command with long payload ({} bytes)", len); } - self.writer.write_all(cmd).await.map_err(|_| Error::Write)?; + self.writer + .write_all(&self.buf[..len]) + .await + .map_err(|_| Error::Write)?; self.writer.flush().await.map_err(|_| Error::Write)?; Ok(()) } @@ -107,18 +113,14 @@ impl<'a, W: Write, const INGRESS_BUF_SIZE: usize> Client<'a, W, INGRESS_BUF_SIZE } impl AtatClient for Client<'_, W, INGRESS_BUF_SIZE> { - async fn send, const LEN: usize>( - &mut self, - cmd: &Cmd, - ) -> Result { - let cmd_vec = cmd.as_bytes(); - let cmd_slice = cmd.get_slice(&cmd_vec); + async fn send<'a, Cmd: AtatCmd>(&'a mut self, cmd: &'a Cmd) -> Result { + let len = cmd.write(&mut self.buf); if !Cmd::EXPECTS_RESPONSE_CODE { - self.send_command(cmd_slice).await?; + self.send_command(len).await?; cmd.parse(Ok(&[])) } else { let response = self - .send_request(cmd_slice, Duration::from_millis(Cmd::MAX_TIMEOUT_MS.into())) + .send_request(len, Duration::from_millis(Cmd::MAX_TIMEOUT_MS.into())) .await?; cmd.parse((&response).into()) } @@ -179,10 +181,11 @@ mod tests { static TX_CHANNEL: PubSubChannel, 1, 1, 1> = PubSubChannel::new(); static RES_CHANNEL: ResponseChannel = ResponseChannel::new(); + static mut BUF: [u8; 1000] = [0; 1000]; let tx_mock = crate::tx_mock::TxMock::new(TX_CHANNEL.publisher().unwrap()); let client: Client = - Client::new(tx_mock, &RES_CHANNEL, $config); + Client::new(tx_mock, &RES_CHANNEL, $config, unsafe { BUF.as_mut() }); ( client, TX_CHANNEL.subscriber().unwrap(), diff --git a/atat/src/asynch/mod.rs b/atat/src/asynch/mod.rs index d2978014..96976926 100644 --- a/atat/src/asynch/mod.rs +++ b/atat/src/asynch/mod.rs @@ -12,14 +12,11 @@ pub trait AtatClient { /// This function will also make sure that at least `self.config.cmd_cooldown` /// has passed since the last response or URC has been received, to allow /// the slave AT device time to deliver URC's. - async fn send, const LEN: usize>( - &mut self, - cmd: &Cmd, - ) -> Result; + async fn send<'a, Cmd: AtatCmd>(&'a mut self, cmd: &'a Cmd) -> Result; - async fn send_retry, const LEN: usize>( - &mut self, - cmd: &Cmd, + async fn send_retry<'a, Cmd: AtatCmd>( + &'a mut self, + cmd: &'a Cmd, ) -> Result { for attempt in 1..=Cmd::ATTEMPTS { if attempt > 1 { diff --git a/atat/src/blocking/client.rs b/atat/src/blocking/client.rs index 90ffddc9..c4c41f4d 100644 --- a/atat/src/blocking/client.rs +++ b/atat/src/blocking/client.rs @@ -19,6 +19,7 @@ where res_channel: &'a ResponseChannel, cooldown_timer: Option, config: Config, + buf: &'a mut [u8], } impl<'a, W, const INGRESS_BUF_SIZE: usize> Client<'a, W, INGRESS_BUF_SIZE> @@ -29,19 +30,21 @@ where writer: W, res_channel: &'a ResponseChannel, config: Config, + buf: &'a mut [u8], ) -> Self { Self { writer, res_channel, cooldown_timer: None, config, + buf, } } - fn send_command(&mut self, cmd: &[u8]) -> Result<(), Error> { + fn send_command(&mut self, len: usize) -> Result<(), Error> { self.wait_cooldown_timer(); - self.send_inner(cmd)?; + self.send_inner(len)?; self.start_cooldown_timer(); Ok(()) @@ -49,13 +52,13 @@ where fn send_request( &mut self, - cmd: &[u8], + len: usize, timeout: Duration, ) -> Result, Error> { self.wait_cooldown_timer(); let mut response_subscription = self.res_channel.subscriber().unwrap(); - self.send_inner(cmd)?; + self.send_inner(len)?; let response = self .with_timeout(timeout, || response_subscription.try_next_message_pure()) @@ -65,14 +68,16 @@ where response } - fn send_inner(&mut self, cmd: &[u8]) -> Result<(), Error> { - if cmd.len() < 50 { - debug!("Sending command: {:?}", LossyStr(cmd)); + fn send_inner(&mut self, len: usize) -> Result<(), Error> { + if len < 50 { + debug!("Sending command: {:?}", LossyStr(&self.buf[..len])); } else { - debug!("Sending command with long payload ({} bytes)", cmd.len(),); + debug!("Sending command with long payload ({} bytes)", len,); } - self.writer.write_all(cmd).map_err(|_| Error::Write)?; + self.writer + .write_all(&self.buf[..len]) + .map_err(|_| Error::Write)?; self.writer.flush().map_err(|_| Error::Write)?; Ok(()) } @@ -109,18 +114,14 @@ impl AtatClient for Client<'_, W, INGRESS_BUF_ where W: Write, { - fn send, const LEN: usize>( - &mut self, - cmd: &Cmd, - ) -> Result { - let cmd_vec = cmd.as_bytes(); - let cmd_slice = cmd.get_slice(&cmd_vec); + fn send(&mut self, cmd: &Cmd) -> Result { + let len = cmd.write(&mut self.buf); if !Cmd::EXPECTS_RESPONSE_CODE { - self.send_command(cmd_slice)?; + self.send_command(len)?; cmd.parse(Ok(&[])) } else { let response = - self.send_request(cmd_slice, Duration::from_millis(Cmd::MAX_TIMEOUT_MS.into()))?; + self.send_request(len, Duration::from_millis(Cmd::MAX_TIMEOUT_MS.into()))?; cmd.parse((&response).into()) } } @@ -263,10 +264,11 @@ mod test { static TX_CHANNEL: PubSubChannel, 1, 1, 1> = PubSubChannel::new(); static RES_CHANNEL: ResponseChannel = ResponseChannel::new(); + static mut BUF: [u8; 1000] = [0; 1000]; let tx_mock = crate::tx_mock::TxMock::new(TX_CHANNEL.publisher().unwrap()); let client: Client = - Client::new(tx_mock, &RES_CHANNEL, $config); + Client::new(tx_mock, &RES_CHANNEL, $config, unsafe { BUF.as_mut() }); ( client, TX_CHANNEL.subscriber().unwrap(), diff --git a/atat/src/blocking/mod.rs b/atat/src/blocking/mod.rs index 48566120..e98820f0 100644 --- a/atat/src/blocking/mod.rs +++ b/atat/src/blocking/mod.rs @@ -18,12 +18,9 @@ pub trait AtatClient { /// This function will also make sure that at least `self.config.cmd_cooldown` /// has passed since the last response or URC has been received, to allow /// the slave AT device time to deliver URC's. - fn send, const LEN: usize>(&mut self, cmd: &A) -> Result; + fn send(&mut self, cmd: &A) -> Result; - fn send_retry, const LEN: usize>( - &mut self, - cmd: &A, - ) -> Result { + fn send_retry(&mut self, cmd: &A) -> Result { for attempt in 1..=A::ATTEMPTS { if attempt > 1 { debug!("Attempt {}:", attempt); diff --git a/atat/src/buffers.rs b/atat/src/buffers.rs index 4e19ef56..de124863 100644 --- a/atat/src/buffers.rs +++ b/atat/src/buffers.rs @@ -38,14 +38,15 @@ impl< > Buffers { #[cfg(feature = "async")] - pub fn split( - &self, + pub fn split<'a, W: embedded_io_async::Write, D: Digester>( + &'a self, writer: W, digester: D, config: Config, + buf: &'a mut [u8], ) -> ( - Ingress, - crate::asynch::Client, + Ingress<'a, D, Urc, INGRESS_BUF_SIZE, URC_CAPACITY, URC_SUBSCRIBERS>, + crate::asynch::Client<'a, W, INGRESS_BUF_SIZE>, ) { ( Ingress::new( @@ -53,15 +54,16 @@ impl< self.res_channel.publisher().unwrap(), self.urc_channel.publisher(), ), - crate::asynch::Client::new(writer, &self.res_channel, config), + crate::asynch::Client::new(writer, &self.res_channel, config, buf), ) } - pub fn split_blocking( - &self, + pub fn split_blocking<'a, W: Write, D: Digester>( + &'a self, writer: W, digester: D, config: Config, + buf: &'a mut [u8], ) -> ( Ingress, crate::blocking::Client, @@ -72,7 +74,7 @@ impl< self.res_channel.publisher().unwrap(), self.urc_channel.publisher(), ), - crate::blocking::Client::new(writer, &self.res_channel, config), + crate::blocking::Client::new(writer, &self.res_channel, config, buf), ) } } diff --git a/atat/src/derive.rs b/atat/src/derive.rs index d48f71d9..16eb0ad3 100644 --- a/atat/src/derive.rs +++ b/atat/src/derive.rs @@ -204,18 +204,20 @@ mod tests { #[test] fn test_length_serialize() { + let mut buf = [0; 360]; + let len = LengthTester { + x: 8, + y: String::try_from("SomeString").unwrap(), + z: 2, + w: "whatup", + a: SimpleEnum::A, + b: SimpleEnumU32::A, + c: SimpleEnumU32::B, + // d: Vec::new() + } + .write(&mut buf); assert_eq!( - LengthTester { - x: 8, - y: String::try_from("SomeString").unwrap(), - z: 2, - w: "whatup", - a: SimpleEnum::A, - b: SimpleEnumU32::A, - c: SimpleEnumU32::B, - // d: Vec::new() - } - .as_bytes(), + &buf[..len], Vec::::from_slice(b"AT+CFUN=8,\"SomeString\",2,\"whatup\",0,0,1\r\n").unwrap() ); } diff --git a/atat/src/lib.rs b/atat/src/lib.rs index caec3ef7..4566c7dd 100644 --- a/atat/src/lib.rs +++ b/atat/src/lib.rs @@ -35,13 +35,14 @@ //! //! impl AtatResp for GreetingText {}; //! -//! impl<'a> AtatCmd<64> for SetGreetingText<'a> { +//! impl<'a> AtatCmd for SetGreetingText<'a> { //! type Response = NoResponse; //! -//! fn as_bytes(&self) -> Vec { -//! let mut buf: Vec = Vec::new(); +//! fn write(&self, mut buf: &mut [u8]) -> usize { +//! let buf_len = buf.len(); +//! use embedded_io::Write; //! write!(buf, "AT+CSGT={}", self.text); -//! buf +//! buf_len - buf.len() //! } //! //! fn parse(&self, resp: Result<&[u8], InternalError>) -> Result { @@ -49,11 +50,14 @@ //! } //! } //! -//! impl AtatCmd<8> for GetGreetingText { +//! impl AtatCmd for GetGreetingText { //! type Response = GreetingText; //! -//! fn as_bytes(&self) -> Vec { -//! Vec::from_slice(b"AT+CSGT?").unwrap() +//! fn write(&self, mut buf: &mut [u8]) -> usize { +//! let cmd = b"AT+CSGT?"; +//! let len = cmd.len(); +//! buf[..len].copy_from_slice(cmd); +//! len //! } //! //! fn parse(&self, resp: Result<&[u8], InternalError>) -> Result { diff --git a/atat/src/traits.rs b/atat/src/traits.rs index 1acbd3c5..dbd94cdb 100644 --- a/atat/src/traits.rs +++ b/atat/src/traits.rs @@ -43,13 +43,14 @@ pub trait AtatUrc { /// /// impl AtatResp for NoResponse {}; /// -/// impl<'a> AtatCmd<64> for SetGreetingText<'a> { +/// impl<'a> AtatCmd for SetGreetingText<'a> { /// type Response = NoResponse; /// -/// fn as_bytes(&self) -> Vec { -/// let mut buf: Vec = Vec::new(); +/// fn write(&self, mut buf: &mut [u8]) -> usize { +/// let buf_len = buf.len(); +/// use embedded_io::Write; /// write!(buf, "AT+CSGT={}", self.text); -/// buf +/// buf_len - buf.len() /// } /// /// fn parse(&self, resp: Result<&[u8], InternalError>) -> Result { @@ -57,7 +58,7 @@ pub trait AtatUrc { /// } /// } /// ``` -pub trait AtatCmd { +pub trait AtatCmd { /// The type of the response. Must implement the `AtatResp` trait. type Response: AtatResp; @@ -80,26 +81,25 @@ pub trait AtatCmd { /// Implemented to enhance expandability of ATAT const EXPECTS_RESPONSE_CODE: bool = true; - /// Return the command as a heapless `Vec` of bytes. - fn as_bytes(&self) -> Vec; - - fn get_slice<'a>(&'a self, bytes: &'a Vec) -> &'a [u8] { - bytes - } + /// Write the command and return the number of written bytes. + fn write(&self, buf: &mut [u8]) -> usize; /// Parse the response into a `Self::Response` or `Error` instance. fn parse(&self, resp: Result<&[u8], InternalError>) -> Result; } -impl AtatResp for Vec where T: AtatResp {} +impl<'a, T, const L: usize> AtatResp for Vec where T: AtatResp {} impl AtatResp for String {} -impl AtatCmd for String { +impl<'a, const L: usize> AtatCmd for String { type Response = String<256>; - fn as_bytes(&self) -> Vec { - self.clone().into_bytes() + fn write(&self, buf: &mut [u8]) -> usize { + let bytes = self.as_bytes(); + let len = bytes.len(); + buf[..len].copy_from_slice(bytes); + len } fn parse(&self, resp: Result<&[u8], InternalError>) -> Result { diff --git a/atat_derive/src/cmd.rs b/atat_derive/src/cmd.rs index 3a4212e0..29132582 100644 --- a/atat_derive/src/cmd.rs +++ b/atat_derive/src/cmd.rs @@ -69,16 +69,6 @@ pub fn atat_cmd(input: TokenStream) -> TokenStream { None => quote! {}, }; - // let quote_escape_strings = !matches!(quote_escape_strings, Some(false)); - - let mut cmd_len = cmd_prefix.len() + cmd.len() + termination.len(); - if value_sep { - cmd_len += 1; - } - if quote_escape_strings { - cmd_len += 2; - } - let (field_names, field_names_str): (Vec<_>, Vec<_>) = variants .iter() .map(|f| { @@ -100,7 +90,7 @@ pub fn atat_cmd(input: TokenStream) -> TokenStream { const #ident_len: usize = #struct_len; #[automatically_derived] - impl #impl_generics atat::AtatCmd<{ #ident_len + #cmd_len }> for #ident #ty_generics #where_clause { + impl #impl_generics atat::AtatCmd for #ident #ty_generics #where_clause { type Response = #resp; #timeout @@ -112,8 +102,8 @@ pub fn atat_cmd(input: TokenStream) -> TokenStream { #reattempt_on_parse_err #[inline] - fn as_bytes(&self) -> atat::heapless::Vec { - match atat::serde_at::to_vec(self, #cmd, atat::serde_at::SerializeOptions { + fn write(&self, buf: &mut [u8]) -> usize { + match atat::serde_at::to_slice(self, #cmd, buf, atat::serde_at::SerializeOptions { value_sep: #value_sep, cmd_prefix: #cmd_prefix, termination: #termination, From d3997adc6fe5339f114819a8a76f60b03dc86d7c Mon Sep 17 00:00:00 2001 From: Rasmus Melchior Jacobsen Date: Tue, 19 Dec 2023 16:08:15 +0100 Subject: [PATCH 03/23] Swap client parameters to put config in the end --- atat/src/asynch/client.rs | 8 ++++---- atat/src/blocking/client.rs | 8 ++++---- atat/src/traits.rs | 4 ++-- examples/Cargo.toml | 1 + examples/src/bin/embassy.rs | 8 +++++++- examples/src/bin/std-tokio.rs | 3 ++- 6 files changed, 20 insertions(+), 12 deletions(-) diff --git a/atat/src/asynch/client.rs b/atat/src/asynch/client.rs index 87e5a5f5..4579a5af 100644 --- a/atat/src/asynch/client.rs +++ b/atat/src/asynch/client.rs @@ -12,24 +12,24 @@ use futures::{ pub struct Client<'a, W: Write, const INGRESS_BUF_SIZE: usize> { writer: W, res_channel: &'a ResponseChannel, + buf: &'a mut [u8], config: Config, cooldown_timer: Option, - buf: &'a mut [u8], } impl<'a, W: Write, const INGRESS_BUF_SIZE: usize> Client<'a, W, INGRESS_BUF_SIZE> { pub fn new( writer: W, res_channel: &'a ResponseChannel, - config: Config, buf: &'a mut [u8], + config: Config, ) -> Self { Self { writer, res_channel, + buf, config, cooldown_timer: None, - buf, } } @@ -185,7 +185,7 @@ mod tests { let tx_mock = crate::tx_mock::TxMock::new(TX_CHANNEL.publisher().unwrap()); let client: Client = - Client::new(tx_mock, &RES_CHANNEL, $config, unsafe { BUF.as_mut() }); + Client::new(tx_mock, &RES_CHANNEL, unsafe { BUF.as_mut() }, $config); ( client, TX_CHANNEL.subscriber().unwrap(), diff --git a/atat/src/blocking/client.rs b/atat/src/blocking/client.rs index c4c41f4d..0beb02dc 100644 --- a/atat/src/blocking/client.rs +++ b/atat/src/blocking/client.rs @@ -17,9 +17,9 @@ where { writer: W, res_channel: &'a ResponseChannel, + buf: &'a mut [u8], cooldown_timer: Option, config: Config, - buf: &'a mut [u8], } impl<'a, W, const INGRESS_BUF_SIZE: usize> Client<'a, W, INGRESS_BUF_SIZE> @@ -29,15 +29,15 @@ where pub fn new( writer: W, res_channel: &'a ResponseChannel, - config: Config, buf: &'a mut [u8], + config: Config, ) -> Self { Self { writer, res_channel, + buf, cooldown_timer: None, config, - buf, } } @@ -268,7 +268,7 @@ mod test { let tx_mock = crate::tx_mock::TxMock::new(TX_CHANNEL.publisher().unwrap()); let client: Client = - Client::new(tx_mock, &RES_CHANNEL, $config, unsafe { BUF.as_mut() }); + Client::new(tx_mock, &RES_CHANNEL, unsafe { BUF.as_mut() }, $config); ( client, TX_CHANNEL.subscriber().unwrap(), diff --git a/atat/src/traits.rs b/atat/src/traits.rs index dbd94cdb..275c352d 100644 --- a/atat/src/traits.rs +++ b/atat/src/traits.rs @@ -88,11 +88,11 @@ pub trait AtatCmd { fn parse(&self, resp: Result<&[u8], InternalError>) -> Result; } -impl<'a, T, const L: usize> AtatResp for Vec where T: AtatResp {} +impl AtatResp for Vec where T: AtatResp {} impl AtatResp for String {} -impl<'a, const L: usize> AtatCmd for String { +impl AtatCmd for String { type Response = String<256>; fn write(&self, buf: &mut [u8]) -> usize { diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 20a0321a..35e13493 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -49,6 +49,7 @@ tokio = { version = "1.26", default-features = false, features = [ "macros", ], optional = true } tokio-serial = { version = "5.4.4", optional = true } +static_cell = "2" [features] embedded = [ diff --git a/examples/src/bin/embassy.rs b/examples/src/bin/embassy.rs index d6aa4481..e3190357 100644 --- a/examples/src/bin/embassy.rs +++ b/examples/src/bin/embassy.rs @@ -57,7 +57,13 @@ async fn main(spawner: Spawner) { &RES_CHANNEL, &URC_CHANNEL, ); - let mut client = Client::new(writer, RES_CHANNEL.subscriber(), atat::Config::default()); + let buf = StaticCell::make_static!([0; 1024]); + let mut client = Client::new( + writer, + RES_CHANNEL.subscriber(), + buf, + atat::Config::default(), + ); spawner.spawn(ingress_task(ingress, reader)).unwrap(); diff --git a/examples/src/bin/std-tokio.rs b/examples/src/bin/std-tokio.rs index f3ffd710..71eec9e7 100644 --- a/examples/src/bin/std-tokio.rs +++ b/examples/src/bin/std-tokio.rs @@ -27,7 +27,8 @@ async fn main() -> ! { &RES_CHANNEL, &URC_CHANNEL, ); - let mut client = Client::new(FromTokio::new(writer), &RES_CHANNEL, Config::default()); + let buf = StaticCell::make_static!([0; 1024]); + let mut client = Client::new(FromTokio::new(writer), &RES_CHANNEL, buf, Config::default()); tokio::spawn(ingress_task(ingress, FromTokio::new(reader))); From 28340c5d78d93b416f7f257201825d69d0482758 Mon Sep 17 00:00:00 2001 From: Rasmus Melchior Jacobsen Date: Tue, 19 Dec 2023 16:31:18 +0100 Subject: [PATCH 04/23] Add LEN as a const associated to AtatCmd --- atat/src/traits.rs | 6 ++++++ atat_derive/src/cmd.rs | 10 ++++++++++ 2 files changed, 16 insertions(+) diff --git a/atat/src/traits.rs b/atat/src/traits.rs index 275c352d..4880e201 100644 --- a/atat/src/traits.rs +++ b/atat/src/traits.rs @@ -45,8 +45,10 @@ pub trait AtatUrc { /// /// impl<'a> AtatCmd for SetGreetingText<'a> { /// type Response = NoResponse; +/// const LEN: usize = 64; /// /// fn write(&self, mut buf: &mut [u8]) -> usize { +/// assert!(buf.len() >= Self::LEN); /// let buf_len = buf.len(); /// use embedded_io::Write; /// write!(buf, "AT+CSGT={}", self.text); @@ -62,6 +64,9 @@ pub trait AtatCmd { /// The type of the response. Must implement the `AtatResp` trait. type Response: AtatResp; + /// The size of the buffer required to write the request. + const LEN: usize; + /// Whether or not this command can be aborted. const CAN_ABORT: bool = false; @@ -94,6 +99,7 @@ impl AtatResp for String {} impl AtatCmd for String { type Response = String<256>; + const LEN: usize = L; fn write(&self, buf: &mut [u8]) -> usize { let bytes = self.as_bytes(); diff --git a/atat_derive/src/cmd.rs b/atat_derive/src/cmd.rs index 29132582..09c820d0 100644 --- a/atat_derive/src/cmd.rs +++ b/atat_derive/src/cmd.rs @@ -69,6 +69,14 @@ pub fn atat_cmd(input: TokenStream) -> TokenStream { None => quote! {}, }; + let mut cmd_len = cmd_prefix.len() + cmd.len() + termination.len(); + if value_sep { + cmd_len += 1; + } + if quote_escape_strings { + cmd_len += 2; + } + let (field_names, field_names_str): (Vec<_>, Vec<_>) = variants .iter() .map(|f| { @@ -93,6 +101,8 @@ pub fn atat_cmd(input: TokenStream) -> TokenStream { impl #impl_generics atat::AtatCmd for #ident #ty_generics #where_clause { type Response = #resp; + const LEN: usize = { #ident_len + #cmd_len }; + #timeout #abortable From a1465bf6cd1c484a921b7b0b27e1dd6fade4917a Mon Sep 17 00:00:00 2001 From: Rasmus Melchior Jacobsen Date: Tue, 19 Dec 2023 17:15:53 +0100 Subject: [PATCH 05/23] Rename LEN to MAX_LEN --- atat/src/traits.rs | 8 ++++---- atat_derive/src/cmd.rs | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/atat/src/traits.rs b/atat/src/traits.rs index 4880e201..aa07d0d2 100644 --- a/atat/src/traits.rs +++ b/atat/src/traits.rs @@ -45,10 +45,10 @@ pub trait AtatUrc { /// /// impl<'a> AtatCmd for SetGreetingText<'a> { /// type Response = NoResponse; -/// const LEN: usize = 64; +/// const MAX_LEN: usize = 64; /// /// fn write(&self, mut buf: &mut [u8]) -> usize { -/// assert!(buf.len() >= Self::LEN); +/// assert!(buf.len() >= Self::MAX_LEN); /// let buf_len = buf.len(); /// use embedded_io::Write; /// write!(buf, "AT+CSGT={}", self.text); @@ -65,7 +65,7 @@ pub trait AtatCmd { type Response: AtatResp; /// The size of the buffer required to write the request. - const LEN: usize; + const MAX_LEN: usize; /// Whether or not this command can be aborted. const CAN_ABORT: bool = false; @@ -99,7 +99,7 @@ impl AtatResp for String {} impl AtatCmd for String { type Response = String<256>; - const LEN: usize = L; + const MAX_LEN: usize = L; fn write(&self, buf: &mut [u8]) -> usize { let bytes = self.as_bytes(); diff --git a/atat_derive/src/cmd.rs b/atat_derive/src/cmd.rs index 09c820d0..902d0f70 100644 --- a/atat_derive/src/cmd.rs +++ b/atat_derive/src/cmd.rs @@ -101,7 +101,7 @@ pub fn atat_cmd(input: TokenStream) -> TokenStream { impl #impl_generics atat::AtatCmd for #ident #ty_generics #where_clause { type Response = #resp; - const LEN: usize = { #ident_len + #cmd_len }; + const MAX_LEN: usize = { #ident_len + #cmd_len }; #timeout From 4c0277b4da521258e422c138e3103d2b0a2bc13e Mon Sep 17 00:00:00 2001 From: Rasmus Melchior Jacobsen Date: Wed, 20 Dec 2023 08:45:29 +0100 Subject: [PATCH 06/23] Rename ResponseChannel to ResponseSlot --- atat/src/asynch/client.rs | 73 ++++++++++++------------------ atat/src/blocking/client.rs | 84 +++++++++++++---------------------- atat/src/ingress.rs | 37 ++++++--------- atat/src/lib.rs | 4 +- atat/src/response_channel.rs | 15 ------- atat/src/response_slot.rs | 6 +++ examples/src/bin/embassy.rs | 13 ++---- examples/src/bin/std-tokio.rs | 8 ++-- 8 files changed, 89 insertions(+), 151 deletions(-) delete mode 100644 atat/src/response_channel.rs create mode 100644 atat/src/response_slot.rs diff --git a/atat/src/asynch/client.rs b/atat/src/asynch/client.rs index 4579a5af..b6f27453 100644 --- a/atat/src/asynch/client.rs +++ b/atat/src/asynch/client.rs @@ -1,7 +1,5 @@ use super::AtatClient; -use crate::{ - helpers::LossyStr, response_channel::ResponseChannel, AtatCmd, Config, Error, Response, -}; +use crate::{helpers::LossyStr, response_slot::ResponseSlot, AtatCmd, Config, Error, Response}; use embassy_time::{Duration, Instant, TimeoutError, Timer}; use embedded_io_async::Write; use futures::{ @@ -11,7 +9,7 @@ use futures::{ pub struct Client<'a, W: Write, const INGRESS_BUF_SIZE: usize> { writer: W, - res_channel: &'a ResponseChannel, + res_slot: &'a ResponseSlot, buf: &'a mut [u8], config: Config, cooldown_timer: Option, @@ -20,62 +18,51 @@ pub struct Client<'a, W: Write, const INGRESS_BUF_SIZE: usize> { impl<'a, W: Write, const INGRESS_BUF_SIZE: usize> Client<'a, W, INGRESS_BUF_SIZE> { pub fn new( writer: W, - res_channel: &'a ResponseChannel, + res_slot: &'a ResponseSlot, buf: &'a mut [u8], config: Config, ) -> Self { Self { writer, - res_channel, + res_slot, buf, config, cooldown_timer: None, } } - async fn send_command(&mut self, len: usize) -> Result<(), Error> { - self.wait_cooldown_timer().await; - - self.send_inner(len).await?; - - self.start_cooldown_timer(); - Ok(()) - } - - async fn send_request( - &mut self, - len: usize, - timeout: Duration, - ) -> Result, Error> { - self.wait_cooldown_timer().await; - - let mut response_subscription = self.res_channel.subscriber().unwrap(); - self.send_inner(len).await?; - - let response = self - .with_timeout(timeout, response_subscription.next_message_pure()) - .await - .map_err(|_| Error::Timeout); - - self.start_cooldown_timer(); - response - } - - async fn send_inner(&mut self, len: usize) -> Result<(), Error> { + async fn send_request(&mut self, len: usize) -> Result<(), Error> { if len < 50 { debug!("Sending command: {:?}", LossyStr(&self.buf[..len])); } else { debug!("Sending command with long payload ({} bytes)", len); } + self.wait_cooldown_timer().await; + + // Clear any pending response + self.res_slot.reset(); + + // Write request self.writer .write_all(&self.buf[..len]) .await .map_err(|_| Error::Write)?; self.writer.flush().await.map_err(|_| Error::Write)?; + + self.start_cooldown_timer(); Ok(()) } + async fn receive_response( + &mut self, + timeout: Duration, + ) -> Result, Error> { + self.with_timeout(timeout, self.res_slot.wait()) + .await + .map_err(|_| Error::Timeout) + } + async fn with_timeout( &self, timeout: Duration, @@ -115,12 +102,12 @@ impl<'a, W: Write, const INGRESS_BUF_SIZE: usize> Client<'a, W, INGRESS_BUF_SIZE impl AtatClient for Client<'_, W, INGRESS_BUF_SIZE> { async fn send<'a, Cmd: AtatCmd>(&'a mut self, cmd: &'a Cmd) -> Result { let len = cmd.write(&mut self.buf); + self.send_request(len).await?; if !Cmd::EXPECTS_RESPONSE_CODE { - self.send_command(len).await?; cmd.parse(Ok(&[])) } else { let response = self - .send_request(len, Duration::from_millis(Cmd::MAX_TIMEOUT_MS.into())) + .receive_response(Duration::from_millis(Cmd::MAX_TIMEOUT_MS.into())) .await?; cmd.parse((&response).into()) } @@ -180,17 +167,13 @@ mod tests { ($config:expr) => {{ static TX_CHANNEL: PubSubChannel, 1, 1, 1> = PubSubChannel::new(); - static RES_CHANNEL: ResponseChannel = ResponseChannel::new(); + static RES_SLOT: ResponseSlot = ResponseSlot::new(); static mut BUF: [u8; 1000] = [0; 1000]; let tx_mock = crate::tx_mock::TxMock::new(TX_CHANNEL.publisher().unwrap()); let client: Client = - Client::new(tx_mock, &RES_CHANNEL, unsafe { BUF.as_mut() }, $config); - ( - client, - TX_CHANNEL.subscriber().unwrap(), - RES_CHANNEL.publisher().unwrap(), - ) + Client::new(tx_mock, &RES_SLOT, unsafe { BUF.as_mut() }, $config); + (client, TX_CHANNEL.subscriber().unwrap(), &RES_SLOT) }}; } @@ -267,7 +250,7 @@ mod tests { tx.next_message_pure().await; // Emit response in the extended timeout timeframe Timer::after(Duration::from_millis(300)).await; - rx.try_publish(Response::default()).unwrap(); + rx.signal(Response::default()); }); let send = tokio::task::spawn(async move { diff --git a/atat/src/blocking/client.rs b/atat/src/blocking/client.rs index 0beb02dc..fad7bef1 100644 --- a/atat/src/blocking/client.rs +++ b/atat/src/blocking/client.rs @@ -2,9 +2,7 @@ use embassy_time::{Duration, Instant, TimeoutError}; use embedded_io::Write; use super::{blocking_timer::BlockingTimer, AtatClient}; -use crate::{ - helpers::LossyStr, response_channel::ResponseChannel, AtatCmd, Config, Error, Response, -}; +use crate::{helpers::LossyStr, response_slot::ResponseSlot, AtatCmd, Config, Error, Response}; /// Client responsible for handling send, receive and timeout from the /// userfacing side. The client is decoupled from the ingress-manager through @@ -16,7 +14,7 @@ where W: Write, { writer: W, - res_channel: &'a ResponseChannel, + res_slot: &'a ResponseSlot, buf: &'a mut [u8], cooldown_timer: Option, config: Config, @@ -28,60 +26,46 @@ where { pub fn new( writer: W, - res_channel: &'a ResponseChannel, + res_slot: &'a ResponseSlot, buf: &'a mut [u8], config: Config, ) -> Self { Self { writer, - res_channel, + res_slot, buf, cooldown_timer: None, config, } } - fn send_command(&mut self, len: usize) -> Result<(), Error> { - self.wait_cooldown_timer(); - - self.send_inner(len)?; - - self.start_cooldown_timer(); - Ok(()) - } - - fn send_request( - &mut self, - len: usize, - timeout: Duration, - ) -> Result, Error> { - self.wait_cooldown_timer(); - - let mut response_subscription = self.res_channel.subscriber().unwrap(); - self.send_inner(len)?; - - let response = self - .with_timeout(timeout, || response_subscription.try_next_message_pure()) - .map_err(|_| Error::Timeout); - - self.start_cooldown_timer(); - response - } - - fn send_inner(&mut self, len: usize) -> Result<(), Error> { + fn send_request(&mut self, len: usize) -> Result<(), Error> { if len < 50 { debug!("Sending command: {:?}", LossyStr(&self.buf[..len])); } else { debug!("Sending command with long payload ({} bytes)", len,); } + self.wait_cooldown_timer(); + + // Clear any pending response + self.res_slot.reset(); + + // Write request self.writer .write_all(&self.buf[..len]) .map_err(|_| Error::Write)?; self.writer.flush().map_err(|_| Error::Write)?; + + self.start_cooldown_timer(); Ok(()) } + fn receive_response(&mut self, timeout: Duration) -> Result, Error> { + self.with_timeout(timeout, || self.res_slot.try_take()) + .map_err(|_| Error::Timeout) + } + fn with_timeout( &self, timeout: Duration, @@ -116,12 +100,12 @@ where { fn send(&mut self, cmd: &Cmd) -> Result { let len = cmd.write(&mut self.buf); + self.send_request(len)?; if !Cmd::EXPECTS_RESPONSE_CODE { - self.send_command(len)?; cmd.parse(Ok(&[])) } else { let response = - self.send_request(len, Duration::from_millis(Cmd::MAX_TIMEOUT_MS.into()))?; + self.receive_response(Duration::from_millis(Cmd::MAX_TIMEOUT_MS.into()))?; cmd.parse((&response).into()) } } @@ -263,17 +247,13 @@ mod test { ($config:expr) => {{ static TX_CHANNEL: PubSubChannel, 1, 1, 1> = PubSubChannel::new(); - static RES_CHANNEL: ResponseChannel = ResponseChannel::new(); + static RES_SLOT: ResponseSlot = ResponseSlot::new(); static mut BUF: [u8; 1000] = [0; 1000]; let tx_mock = crate::tx_mock::TxMock::new(TX_CHANNEL.publisher().unwrap()); let client: Client = - Client::new(tx_mock, &RES_CHANNEL, unsafe { BUF.as_mut() }, $config); - ( - client, - TX_CHANNEL.subscriber().unwrap(), - RES_CHANNEL.publisher().unwrap(), - ) + Client::new(tx_mock, &RES_SLOT, unsafe { BUF.as_mut() }, $config); + (client, TX_CHANNEL.subscriber().unwrap(), &RES_SLOT) }}; } @@ -285,7 +265,7 @@ mod test { let sent = tokio::spawn(async move { tx.next_message_pure().await; - rx.try_publish(Err(InternalError::Error).into()).unwrap(); + rx.signal(Err(InternalError::Error).into()); }); tokio::task::spawn_blocking(move || { @@ -308,7 +288,7 @@ mod test { let sent = tokio::spawn(async move { tx.next_message_pure().await; - rx.try_publish(Err(InternalError::Error).into()).unwrap(); + rx.signal(Err(InternalError::Error).into()); }); tokio::task::spawn_blocking(move || { @@ -336,10 +316,10 @@ mod test { let sent = tokio::spawn(async move { let sent0 = tx.next_message_pure().await; - rx.try_publish(Response::default()).unwrap(); + rx.signal(Response::default()); let sent1 = tx.next_message_pure().await; - rx.try_publish(Response::default()).unwrap(); + rx.signal(Response::default()); (sent0, sent1) }); @@ -367,7 +347,7 @@ mod test { let sent = tokio::spawn(async move { let sent = tx.next_message_pure().await; - rx.try_publish(Response::default()).unwrap(); + rx.signal(Response::default()); sent }); @@ -402,10 +382,10 @@ mod test { let sent = tokio::spawn(async move { let sent0 = tx.next_message_pure().await; - rx.try_publish(Response::ok(response0)).unwrap(); + rx.signal(Response::ok(response0)); let sent1 = tx.next_message_pure().await; - rx.try_publish(Response::ok(response1)).unwrap(); + rx.signal(Response::ok(response1)); (sent0, sent1) }); @@ -446,7 +426,7 @@ mod test { let sent = tokio::spawn(async move { tx.next_message_pure().await; - rx.try_publish(Response::ok(b"+CUN: 22,16,22")).unwrap(); + rx.signal(Response::ok(b"+CUN: 22,16,22")); }); tokio::task::spawn_blocking(move || { @@ -531,7 +511,7 @@ mod test { tx.next_message_pure().await; // Emit response in the extended timeout timeframe Timer::after(Duration::from_millis(300)).await; - rx.try_publish(Response::default()).unwrap(); + rx.signal(Response::default()); }); tokio::task::spawn_blocking(move || { diff --git a/atat/src/ingress.rs b/atat/src/ingress.rs index 5bf46198..6e839dd3 100644 --- a/atat/src/ingress.rs +++ b/atat/src/ingress.rs @@ -1,12 +1,11 @@ use crate::{ - helpers::LossyStr, response_channel::ResponsePublisher, urc_channel::UrcPublisher, AtatUrc, - DigestResult, Digester, Response, ResponseChannel, UrcChannel, + helpers::LossyStr, urc_channel::UrcPublisher, AtatUrc, DigestResult, Digester, Response, + ResponseSlot, UrcChannel, }; #[derive(Debug, PartialEq)] #[cfg_attr(feature = "defmt", derive(defmt::Format))] pub enum Error { - ResponseQueueFull, UrcChannelFull, } @@ -89,7 +88,7 @@ pub struct Ingress< digester: D, buf: [u8; INGRESS_BUF_SIZE], pos: usize, - res_publisher: ResponsePublisher<'a, INGRESS_BUF_SIZE>, + res_slot: &'a ResponseSlot, urc_publisher: UrcPublisher<'a, Urc, URC_CAPACITY, URC_SUBSCRIBERS>, } @@ -104,14 +103,14 @@ impl< { pub fn new( digester: D, - res_channel: &'a ResponseChannel, + res_slot: &'a ResponseSlot, urc_channel: &'a UrcChannel, ) -> Self { Self { digester, buf: [0; INGRESS_BUF_SIZE], pos: 0, - res_publisher: res_channel.publisher().unwrap(), + res_slot, urc_publisher: urc_channel.0.publisher().unwrap(), } } @@ -150,9 +149,7 @@ impl< (DigestResult::Prompt(prompt), swallowed) => { debug!("Received prompt ({}/{})", swallowed, self.pos); - self.res_publisher - .try_publish(Response::Prompt(prompt)) - .map_err(|_| Error::ResponseQueueFull)?; + self.res_slot.signal(Response::Prompt(prompt)); swallowed } (DigestResult::Urc(urc_line), swallowed) => { @@ -195,9 +192,7 @@ impl< } } - self.res_publisher - .try_publish(resp.into()) - .map_err(|_| Error::ResponseQueueFull)?; + self.res_slot.signal(resp.into()); swallowed } }; @@ -235,9 +230,7 @@ impl< (DigestResult::Prompt(prompt), swallowed) => { debug!("Received prompt ({}/{})", swallowed, self.pos); - if let Err(frame) = self.res_publisher.try_publish(Response::Prompt(prompt)) { - self.res_publisher.publish(frame).await; - } + self.res_slot.signal(Response::Prompt(prompt)); swallowed } (DigestResult::Urc(urc_line), swallowed) => { @@ -280,9 +273,7 @@ impl< } } - if let Err(frame) = self.res_publisher.try_publish(resp.into()) { - self.res_publisher.publish(frame).await; - } + self.res_slot.signal(resp.into()); swallowed } }; @@ -304,8 +295,7 @@ impl< #[cfg(test)] mod tests { use crate::{ - self as atat, atat_derive::AtatUrc, response_channel::ResponseChannel, AtDigester, - UrcChannel, + self as atat, atat_derive::AtatUrc, response_slot::ResponseSlot, AtDigester, UrcChannel, }; use super::*; @@ -320,11 +310,10 @@ mod tests { #[test] fn advance_can_processes_multiple_digest_results() { - let res_channel = ResponseChannel::<100>::new(); - let mut res_subscription = res_channel.subscriber().unwrap(); + let res_slot = ResponseSlot::<100>::new(); let urc_channel = UrcChannel::::new(); let mut ingress: Ingress<_, Urc, 100, 10, 1> = - Ingress::new(AtDigester::::new(), &res_channel, &urc_channel); + Ingress::new(AtDigester::::new(), &res_slot, &urc_channel); let mut sub = urc_channel.subscribe().unwrap(); @@ -336,7 +325,7 @@ mod tests { assert_eq!(Urc::ConnectOk, sub.try_next_message_pure().unwrap()); assert_eq!(Urc::ConnectFail, sub.try_next_message_pure().unwrap()); - let response = res_subscription.try_next_message_pure().unwrap(); + let response = res_slot.try_take().unwrap(); assert_eq!(Response::default(), response); } } diff --git a/atat/src/lib.rs b/atat/src/lib.rs index 4bd6e7e3..4e5aa3f0 100644 --- a/atat/src/lib.rs +++ b/atat/src/lib.rs @@ -232,7 +232,7 @@ mod error; pub mod helpers; mod ingress; mod response; -pub mod response_channel; +pub mod response_slot; mod traits; #[cfg(test)] mod tx_mock; @@ -269,7 +269,7 @@ pub use digest::{AtDigester, AtDigester as DefaultDigester, DigestResult, Digest pub use error::{CmeError, CmsError, ConnectionError, Error, InternalError}; pub use ingress::{AtatIngress, Error as IngressError, Ingress}; pub use response::Response; -pub use response_channel::ResponseChannel; +pub use response_slot::ResponseSlot; pub use traits::{AtatCmd, AtatResp, AtatUrc}; pub use urc_channel::{UrcChannel, UrcSubscription}; diff --git a/atat/src/response_channel.rs b/atat/src/response_channel.rs deleted file mode 100644 index f6ec4d33..00000000 --- a/atat/src/response_channel.rs +++ /dev/null @@ -1,15 +0,0 @@ -use embassy_sync::{ - blocking_mutex::raw::CriticalSectionRawMutex, - pubsub::{PubSubChannel, Publisher, Subscriber}, -}; - -use crate::Response; - -pub type ResponseChannel = - PubSubChannel, 1, 1, 1>; - -pub type ResponsePublisher<'sub, const INGRESS_BUF_SIZE: usize> = - Publisher<'sub, CriticalSectionRawMutex, Response, 1, 1, 1>; - -pub type ResponseSubscription<'sub, const INGRESS_BUF_SIZE: usize> = - Subscriber<'sub, CriticalSectionRawMutex, Response, 1, 1, 1>; diff --git a/atat/src/response_slot.rs b/atat/src/response_slot.rs new file mode 100644 index 00000000..d7100768 --- /dev/null +++ b/atat/src/response_slot.rs @@ -0,0 +1,6 @@ +use embassy_sync::{blocking_mutex::raw::CriticalSectionRawMutex, signal::Signal}; + +use crate::Response; + +pub type ResponseSlot = + Signal>; diff --git a/examples/src/bin/embassy.rs b/examples/src/bin/embassy.rs index e3190357..a294969a 100644 --- a/examples/src/bin/embassy.rs +++ b/examples/src/bin/embassy.rs @@ -5,7 +5,7 @@ use atat::{ asynch::{AtatClient, Client}, - AtatIngress, DefaultDigester, Ingress, ResponseChannel, UrcChannel, + AtatIngress, DefaultDigester, Ingress, ResponseSlot, UrcChannel, }; use atat_examples::common; use embassy_executor::Spawner; @@ -50,20 +50,15 @@ async fn main(spawner: Spawner) { ); let (reader, writer) = uart.split(); - static RES_CHANNEL: ResponseChannel = ResponseChannel::new(); + static RES_SLOT: ResponseSlot = ResponseSlot::new(); static URC_CHANNEL: UrcChannel = UrcChannel::new(); let ingress = Ingress::new( DefaultDigester::::default(), - &RES_CHANNEL, + &RES_SLOT, &URC_CHANNEL, ); let buf = StaticCell::make_static!([0; 1024]); - let mut client = Client::new( - writer, - RES_CHANNEL.subscriber(), - buf, - atat::Config::default(), - ); + let mut client = Client::new(writer, &RES_SLOT, buf, atat::Config::default()); spawner.spawn(ingress_task(ingress, reader)).unwrap(); diff --git a/examples/src/bin/std-tokio.rs b/examples/src/bin/std-tokio.rs index 71eec9e7..f71339ca 100644 --- a/examples/src/bin/std-tokio.rs +++ b/examples/src/bin/std-tokio.rs @@ -4,7 +4,7 @@ use atat_examples::common; use atat::{ asynch::{AtatClient, Client}, - AtatIngress, Config, DefaultDigester, Ingress, ResponseChannel, UrcChannel, + AtatIngress, Config, DefaultDigester, Ingress, ResponseSlot, UrcChannel, }; use embedded_io_adapters::tokio_1::FromTokio; use std::process::exit; @@ -20,15 +20,15 @@ async fn main() -> ! { let (reader, writer) = SerialStream::pair().expect("Failed to create serial pair"); - static RES_CHANNEL: ResponseChannel = ResponseChannel::new(); + static RES_SLOT: ResponseSlot = ResponseSlot::new(); static URC_CHANNEL: UrcChannel = UrcChannel::new(); let ingress = Ingress::new( DefaultDigester::::default(), - &RES_CHANNEL, + &RES_SLOT, &URC_CHANNEL, ); let buf = StaticCell::make_static!([0; 1024]); - let mut client = Client::new(FromTokio::new(writer), &RES_CHANNEL, buf, Config::default()); + let mut client = Client::new(FromTokio::new(writer), &RES_SLOT, buf, Config::default()); tokio::spawn(ingress_task(ingress, FromTokio::new(reader))); From a7f4578c27307237b0af7e289ff865d15eb92bd9 Mon Sep 17 00:00:00 2001 From: Rasmus Melchior Jacobsen Date: Wed, 20 Dec 2023 09:02:22 +0100 Subject: [PATCH 07/23] Add missing MAX_LEN in documentation --- atat/src/lib.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/atat/src/lib.rs b/atat/src/lib.rs index 4e5aa3f0..f9989cdc 100644 --- a/atat/src/lib.rs +++ b/atat/src/lib.rs @@ -37,6 +37,7 @@ //! //! impl<'a> AtatCmd for SetGreetingText<'a> { //! type Response = NoResponse; +//! const MAX_LEN: usize = 64; //! //! fn write(&self, mut buf: &mut [u8]) -> usize { //! let buf_len = buf.len(); @@ -52,6 +53,7 @@ //! //! impl AtatCmd for GetGreetingText { //! type Response = GreetingText; +//! const MAX_LEN: usize = 8; //! //! fn write(&self, mut buf: &mut [u8]) -> usize { //! let cmd = b"AT+CSGT?"; From caadee31b5b8bd6e6dc30e13e3fc8bbddc7e804d Mon Sep 17 00:00:00 2001 From: Rasmus Melchior Jacobsen Date: Wed, 20 Dec 2023 11:15:12 +0100 Subject: [PATCH 08/23] Zero copy response in the response slot --- atat/src/asynch/client.rs | 18 ++++----- atat/src/blocking/client.rs | 43 ++++++++++++--------- atat/src/ingress.rs | 30 ++++++++++----- atat/src/response_slot.rs | 76 +++++++++++++++++++++++++++++++++++-- 4 files changed, 127 insertions(+), 40 deletions(-) diff --git a/atat/src/asynch/client.rs b/atat/src/asynch/client.rs index b6f27453..8ae7321e 100644 --- a/atat/src/asynch/client.rs +++ b/atat/src/asynch/client.rs @@ -40,7 +40,7 @@ impl<'a, W: Write, const INGRESS_BUF_SIZE: usize> Client<'a, W, INGRESS_BUF_SIZE self.wait_cooldown_timer().await; - // Clear any pending response + // Clear any pending response signal self.res_slot.reset(); // Write request @@ -54,10 +54,7 @@ impl<'a, W: Write, const INGRESS_BUF_SIZE: usize> Client<'a, W, INGRESS_BUF_SIZE Ok(()) } - async fn receive_response( - &mut self, - timeout: Duration, - ) -> Result, Error> { + async fn wait_response(&mut self, timeout: Duration) -> Result<(), Error> { self.with_timeout(timeout, self.res_slot.wait()) .await .map_err(|_| Error::Timeout) @@ -106,10 +103,11 @@ impl AtatClient for Client<'_, W, INGRE if !Cmd::EXPECTS_RESPONSE_CODE { cmd.parse(Ok(&[])) } else { - let response = self - .receive_response(Duration::from_millis(Cmd::MAX_TIMEOUT_MS.into())) + self.wait_response(Duration::from_millis(Cmd::MAX_TIMEOUT_MS.into())) .await?; - cmd.parse((&response).into()) + let response = self.res_slot.get(); + let response: &Response = &response.borrow(); + cmd.parse(response.into()) } } } @@ -119,7 +117,7 @@ mod tests { use super::*; use crate as atat; use crate::atat_derive::{AtatCmd, AtatEnum, AtatResp}; - use crate::{Error, Response}; + use crate::Error; use core::sync::atomic::{AtomicU64, Ordering}; use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; use embassy_sync::pubsub::PubSubChannel; @@ -250,7 +248,7 @@ mod tests { tx.next_message_pure().await; // Emit response in the extended timeout timeframe Timer::after(Duration::from_millis(300)).await; - rx.signal(Response::default()); + rx.signal_response(Ok(&[])).unwrap(); }); let send = tokio::task::spawn(async move { diff --git a/atat/src/blocking/client.rs b/atat/src/blocking/client.rs index fad7bef1..844808ed 100644 --- a/atat/src/blocking/client.rs +++ b/atat/src/blocking/client.rs @@ -48,7 +48,7 @@ where self.wait_cooldown_timer(); - // Clear any pending response + // Clear any pending response signal self.res_slot.reset(); // Write request @@ -61,9 +61,15 @@ where Ok(()) } - fn receive_response(&mut self, timeout: Duration) -> Result, Error> { - self.with_timeout(timeout, || self.res_slot.try_take()) - .map_err(|_| Error::Timeout) + fn wait_response(&mut self, timeout: Duration) -> Result<(), Error> { + self.with_timeout(timeout, || { + if self.res_slot.available() { + Some(()) + } else { + None + } + }) + .map_err(|_| Error::Timeout) } fn with_timeout( @@ -104,9 +110,10 @@ where if !Cmd::EXPECTS_RESPONSE_CODE { cmd.parse(Ok(&[])) } else { - let response = - self.receive_response(Duration::from_millis(Cmd::MAX_TIMEOUT_MS.into()))?; - cmd.parse((&response).into()) + self.wait_response(Duration::from_millis(Cmd::MAX_TIMEOUT_MS.into()))?; + let response = self.res_slot.get(); + let response: &Response = &response.borrow(); + cmd.parse(response.into()) } } } @@ -115,7 +122,7 @@ where mod test { use super::*; use crate::atat_derive::{AtatCmd, AtatEnum, AtatResp, AtatUrc}; - use crate::{self as atat, InternalError, Response}; + use crate::{self as atat, InternalError}; use core::sync::atomic::{AtomicU64, Ordering}; use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; use embassy_sync::pubsub::PubSubChannel; @@ -265,7 +272,8 @@ mod test { let sent = tokio::spawn(async move { tx.next_message_pure().await; - rx.signal(Err(InternalError::Error).into()); + rx.signal_response(Err(InternalError::Error).into()) + .unwrap(); }); tokio::task::spawn_blocking(move || { @@ -288,7 +296,8 @@ mod test { let sent = tokio::spawn(async move { tx.next_message_pure().await; - rx.signal(Err(InternalError::Error).into()); + rx.signal_response(Err(InternalError::Error).into()) + .unwrap(); }); tokio::task::spawn_blocking(move || { @@ -316,10 +325,10 @@ mod test { let sent = tokio::spawn(async move { let sent0 = tx.next_message_pure().await; - rx.signal(Response::default()); + rx.signal_response(Ok(&[])).unwrap(); let sent1 = tx.next_message_pure().await; - rx.signal(Response::default()); + rx.signal_response(Ok(&[])).unwrap(); (sent0, sent1) }); @@ -347,7 +356,7 @@ mod test { let sent = tokio::spawn(async move { let sent = tx.next_message_pure().await; - rx.signal(Response::default()); + rx.signal_response(Ok(&[])).unwrap(); sent }); @@ -382,10 +391,10 @@ mod test { let sent = tokio::spawn(async move { let sent0 = tx.next_message_pure().await; - rx.signal(Response::ok(response0)); + rx.signal_response(Ok(response0)).unwrap(); let sent1 = tx.next_message_pure().await; - rx.signal(Response::ok(response1)); + rx.signal_response(Ok(response1)).unwrap(); (sent0, sent1) }); @@ -426,7 +435,7 @@ mod test { let sent = tokio::spawn(async move { tx.next_message_pure().await; - rx.signal(Response::ok(b"+CUN: 22,16,22")); + rx.signal_response(Ok(b"+CUN: 22,16,22")).unwrap(); }); tokio::task::spawn_blocking(move || { @@ -511,7 +520,7 @@ mod test { tx.next_message_pure().await; // Emit response in the extended timeout timeframe Timer::after(Duration::from_millis(300)).await; - rx.signal(Response::default()); + rx.signal_response(Ok(&[])).unwrap(); }); tokio::task::spawn_blocking(move || { diff --git a/atat/src/ingress.rs b/atat/src/ingress.rs index 6e839dd3..57c10948 100644 --- a/atat/src/ingress.rs +++ b/atat/src/ingress.rs @@ -1,11 +1,12 @@ use crate::{ - helpers::LossyStr, urc_channel::UrcPublisher, AtatUrc, DigestResult, Digester, Response, - ResponseSlot, UrcChannel, + helpers::LossyStr, urc_channel::UrcPublisher, AtatUrc, DigestResult, Digester, ResponseSlot, + UrcChannel, }; #[derive(Debug, PartialEq)] #[cfg_attr(feature = "defmt", derive(defmt::Format))] pub enum Error { + ResponseSlotBusy, UrcChannelFull, } @@ -149,7 +150,10 @@ impl< (DigestResult::Prompt(prompt), swallowed) => { debug!("Received prompt ({}/{})", swallowed, self.pos); - self.res_slot.signal(Response::Prompt(prompt)); + if self.res_slot.signal_prompt(prompt).is_err() { + error!("Received prompt but a response is already pending"); + } + swallowed } (DigestResult::Urc(urc_line), swallowed) => { @@ -192,7 +196,9 @@ impl< } } - self.res_slot.signal(resp.into()); + if self.res_slot.signal_response(resp).is_err() { + error!("Received response but a response is already pending"); + } swallowed } }; @@ -230,7 +236,9 @@ impl< (DigestResult::Prompt(prompt), swallowed) => { debug!("Received prompt ({}/{})", swallowed, self.pos); - self.res_slot.signal(Response::Prompt(prompt)); + if self.res_slot.signal_prompt(prompt).is_err() { + error!("Received prompt but a response is already pending"); + } swallowed } (DigestResult::Urc(urc_line), swallowed) => { @@ -273,7 +281,9 @@ impl< } } - self.res_slot.signal(resp.into()); + if self.res_slot.signal_response(resp).is_err() { + error!("Received response but a response is already pending"); + } swallowed } }; @@ -295,7 +305,8 @@ impl< #[cfg(test)] mod tests { use crate::{ - self as atat, atat_derive::AtatUrc, response_slot::ResponseSlot, AtDigester, UrcChannel, + self as atat, atat_derive::AtatUrc, response_slot::ResponseSlot, AtDigester, Response, + UrcChannel, }; use super::*; @@ -325,7 +336,8 @@ mod tests { assert_eq!(Urc::ConnectOk, sub.try_next_message_pure().unwrap()); assert_eq!(Urc::ConnectFail, sub.try_next_message_pure().unwrap()); - let response = res_slot.try_take().unwrap(); - assert_eq!(Response::default(), response); + let response = res_slot.get(); + let response: &Response<100> = &response.borrow(); + assert_eq!(&Response::default(), response); } } diff --git a/atat/src/response_slot.rs b/atat/src/response_slot.rs index d7100768..1cd24330 100644 --- a/atat/src/response_slot.rs +++ b/atat/src/response_slot.rs @@ -1,6 +1,74 @@ -use embassy_sync::{blocking_mutex::raw::CriticalSectionRawMutex, signal::Signal}; +use core::cell::RefCell; +use embassy_sync::{ + blocking_mutex::raw::CriticalSectionRawMutex, + mutex::{Mutex, MutexGuard}, + signal::Signal, +}; +use heapless::Vec; -use crate::Response; +use crate::{InternalError, Response}; -pub type ResponseSlot = - Signal>; +pub struct ResponseSlot( + Mutex>>, + Signal, +); + +pub type ResponseSlotGuard<'a, const N: usize> = + MutexGuard<'a, CriticalSectionRawMutex, RefCell>>; + +#[derive(Debug)] +pub struct SlotInUseError; + +impl ResponseSlot { + pub const fn new() -> Self { + Self( + Mutex::new(RefCell::new(Response::Ok(Vec::new()))), + Signal::new(), + ) + } + + /// Reset the current response slot + pub fn reset(&self) { + self.1.reset(); + } + + /// Wait for a response to become available + pub async fn wait(&self) { + self.1.wait().await + } + + /// Get whether a response is available + pub fn available(&self) -> bool { + self.1.signaled() + } + + /// Get a guard to response + pub fn get<'a>(&'a self) -> ResponseSlotGuard<'a, N> { + self.0.try_lock().unwrap() + } + + pub(crate) fn signal_prompt(&self, prompt: u8) -> Result<(), SlotInUseError> { + if self.1.signaled() { + return Err(SlotInUseError); + } + let buf = self.0.try_lock().unwrap(); + let mut res = buf.borrow_mut(); + *res = Response::Prompt(prompt); + self.1.signal(()); + Ok(()) + } + + pub(crate) fn signal_response( + &self, + response: Result<&[u8], InternalError>, + ) -> Result<(), SlotInUseError> { + if self.1.signaled() { + return Err(SlotInUseError); + } + let buf = self.0.try_lock().unwrap(); + let mut res = buf.borrow_mut(); + *res = response.into(); + self.1.signal(()); + Ok(()) + } +} From 408b8408a189acd196230cfc98677e61636f54ed Mon Sep 17 00:00:00 2001 From: Rasmus Melchior Jacobsen Date: Thu, 21 Dec 2023 11:00:37 +0100 Subject: [PATCH 09/23] Fix missing float formatting in serde_at --- .github/workflows/ci.yml | 2 +- examples/Cargo.toml | 2 +- examples/src/bin/embassy.rs | 16 ++----- examples/src/bin/std-tokio.rs | 4 +- serde_at/src/ser/mod.rs | 85 +++++++++++++++++++++++++++++------ 5 files changed, 79 insertions(+), 30 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index da700bfe..f9469bd9 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -142,4 +142,4 @@ jobs: uses: actions-rs/cargo@v1 with: command: test - args: --all --features std \ No newline at end of file + args: --all --features std heapless \ No newline at end of file diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 35e13493..9397f666 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -49,7 +49,7 @@ tokio = { version = "1.26", default-features = false, features = [ "macros", ], optional = true } tokio-serial = { version = "5.4.4", optional = true } -static_cell = "2" +static_cell = { version = "2", features = ["nightly"] } [features] embedded = [ diff --git a/examples/src/bin/embassy.rs b/examples/src/bin/embassy.rs index a294969a..be3af143 100644 --- a/examples/src/bin/embassy.rs +++ b/examples/src/bin/embassy.rs @@ -9,7 +9,6 @@ use atat::{ }; use atat_examples::common; use embassy_executor::Spawner; -use embassy_executor::_export::StaticCell; use embassy_rp::{ interrupt, peripherals::UART0, @@ -17,15 +16,6 @@ use embassy_rp::{ }; use {defmt_rtt as _, panic_probe as _}; -macro_rules! singleton { - ($val:expr) => {{ - type T = impl Sized; - static STATIC_CELL: StaticCell = StaticCell::new(); - let (x,) = STATIC_CELL.init(($val,)); - x - }}; -} - const INGRESS_BUF_SIZE: usize = 1024; const URC_CAPACITY: usize = 128; const URC_SUBSCRIBERS: usize = 3; @@ -37,8 +27,8 @@ async fn main(spawner: Spawner) { let (tx_pin, rx_pin, uart) = (p.PIN_0, p.PIN_1, p.UART0); let irq = interrupt::take!(UART0_IRQ); - let tx_buf = &mut singleton!([0u8; 16])[..]; - let rx_buf = &mut singleton!([0u8; 16])[..]; + let tx_buf = static_cell::make_static!([0u8; 16]); + let rx_buf = static_cell::make_static!([0u8; 16]); let uart = BufferedUart::new( uart, irq, @@ -57,7 +47,7 @@ async fn main(spawner: Spawner) { &RES_SLOT, &URC_CHANNEL, ); - let buf = StaticCell::make_static!([0; 1024]); + let buf = static_cell::make_static!([0; 1024]); let mut client = Client::new(writer, &RES_SLOT, buf, atat::Config::default()); spawner.spawn(ingress_task(ingress, reader)).unwrap(); diff --git a/examples/src/bin/std-tokio.rs b/examples/src/bin/std-tokio.rs index f71339ca..d9ab83b6 100644 --- a/examples/src/bin/std-tokio.rs +++ b/examples/src/bin/std-tokio.rs @@ -1,4 +1,4 @@ -#![feature(async_fn_in_trait)] +#![feature(type_alias_impl_trait)] #![allow(incomplete_features)] use atat_examples::common; @@ -27,7 +27,7 @@ async fn main() -> ! { &RES_SLOT, &URC_CHANNEL, ); - let buf = StaticCell::make_static!([0; 1024]); + let buf = static_cell::make_static!([0; 1024]); let mut client = Client::new(FromTokio::new(writer), &RES_SLOT, buf, Config::default()); tokio::spawn(ingress_task(ingress, FromTokio::new(reader))); diff --git a/serde_at/src/ser/mod.rs b/serde_at/src/ser/mod.rs index a1b8b7af..d1e85e0b 100644 --- a/serde_at/src/ser/mod.rs +++ b/serde_at/src/ser/mod.rs @@ -100,6 +100,19 @@ impl<'a> Serializer<'a> { Err(Error::BufferFull) } } + + fn write_buf(&mut self) -> &mut [u8] { + &mut self.buf[self.written..] + } + + fn commit(&mut self, amount: usize) -> Result<()> { + if self.written + amount <= self.buf.len() { + self.written += amount; + Ok(()) + } else { + Err(Error::BufferFull) + } + } } // NOTE(serialize_*signed) This is basically the numtoa implementation minus the lookup tables, @@ -166,21 +179,49 @@ macro_rules! serialize_signed { }}; } -#[cfg(feature = "heapless")] -macro_rules! serialize_fmt { - ($self:ident, $N:expr, $fmt:expr, $v:expr) => {{ - use fmt::Write; - let mut s: heapless::String<$N> = heapless::String::new(); - write!(&mut s, $fmt, $v).unwrap(); - $self.extend_from_slice(s.as_bytes())?; +struct FmtWrapper<'a> { + buf: &'a mut [u8], + offset: usize, +} + +impl<'a> FmtWrapper<'a> { + fn new(buf: &'a mut [u8]) -> Self { + FmtWrapper { + buf: buf, + offset: 0, + } + } +} + +impl<'a> fmt::Write for FmtWrapper<'a> { + fn write_str(&mut self, s: &str) -> fmt::Result { + let bytes = s.as_bytes(); + + // Skip over already-copied data + let remainder = &mut self.buf[self.offset..]; + // Check if there is space remaining (return error instead of panicking) + if remainder.len() < bytes.len() { + return Err(core::fmt::Error); + } + // Make the two slices the same length + let remainder = &mut remainder[..bytes.len()]; + // Copy + remainder.copy_from_slice(bytes); + + // Update offset to avoid overwriting + self.offset += bytes.len(); + Ok(()) - }}; + } } -#[cfg(not(feature = "heapless"))] macro_rules! serialize_fmt { - ($self:ident, $N:expr, $fmt:expr, $v:expr) => {{ - todo!() + ($self:ident, $fmt:expr, $v:expr) => {{ + use fmt::Write; + let mut wrapper = FmtWrapper::new($self.write_buf()); + write!(wrapper, $fmt, $v).unwrap(); + let written = wrapper.offset; + $self.commit(written) }}; } @@ -246,11 +287,11 @@ impl<'a, 'b> ser::Serializer for &'a mut Serializer<'b> { } fn serialize_f32(self, v: f32) -> Result { - serialize_fmt!(self, 16, "{:e}", v) + serialize_fmt!(self, "{}", v) } fn serialize_f64(self, v: f64) -> Result { - serialize_fmt!(self, 32, "{:e}", v) + serialize_fmt!(self, "{}", v) } fn serialize_char(self, v: char) -> Result { @@ -650,6 +691,24 @@ mod tests { assert_eq!(s, String::<32>::try_from("AT+CMD=\"value\"\r\n").unwrap()); } + #[test] + fn fmt_float() { + #[derive(Clone, PartialEq, Serialize)] + pub struct Floats { + f32: f32, + f64: f64, + } + + let value = Floats { + f32: 1.23, + f64: 4.56, + }; + + let s: String<32> = to_string(&value, "+CMD", SerializeOptions::default()).unwrap(); + + assert_eq!(s, String::<32>::try_from("AT+CMD=1.23,4.56\r\n").unwrap()); + } + #[test] fn hex_str_serialize() { #[derive(Clone, PartialEq, Serialize)] From eb1ccebc02af68c9a57ef36b6ac731cb6e6ef6cd Mon Sep 17 00:00:00 2001 From: Rasmus Melchior Jacobsen Date: Thu, 21 Dec 2023 11:04:04 +0100 Subject: [PATCH 10/23] Add static_cell std for examples --- examples/Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 9397f666..411a84ac 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -72,4 +72,5 @@ std = [ "critical-section/std", "embedded-io-adapters", "embedded-io-adapters/tokio-1", + "static_cell/std", ] From 8792ee9091d26592d33b16f03a8fff78d79d8fde Mon Sep 17 00:00:00 2001 From: Rasmus Melchior Jacobsen Date: Thu, 21 Dec 2023 11:07:01 +0100 Subject: [PATCH 11/23] std should be on portable-atomic --- examples/Cargo.toml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 411a84ac..992a0951 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -50,6 +50,7 @@ tokio = { version = "1.26", default-features = false, features = [ ], optional = true } tokio-serial = { version = "5.4.4", optional = true } static_cell = { version = "2", features = ["nightly"] } +portable-atomic = "1.6.0" [features] embedded = [ @@ -72,5 +73,5 @@ std = [ "critical-section/std", "embedded-io-adapters", "embedded-io-adapters/tokio-1", - "static_cell/std", + "portable-atomic/std", ] From 76b6fe454ef6c621d93ce8fd0ca71c19c9e1f054 Mon Sep 17 00:00:00 2001 From: Rasmus Melchior Jacobsen Date: Thu, 21 Dec 2023 11:10:04 +0100 Subject: [PATCH 12/23] Add critical-section to portablable-atomic features for embedded example --- examples/Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 992a0951..5b88f354 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -62,6 +62,7 @@ embedded = [ "dep:embassy-executor", "embassy-rp?/defmt", "atat/defmt", + "portable-atomic/critical-section", ] std = [ "dep:env_logger", From b6757aeb21be6f67cf1ac7151ec829df483c3e33 Mon Sep 17 00:00:00 2001 From: Rasmus Melchior Jacobsen Date: Thu, 21 Dec 2023 11:19:49 +0100 Subject: [PATCH 13/23] Always set embedded feature for thumb target --- .github/workflows/ci.yml | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f9469bd9..75eaab4d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -103,10 +103,12 @@ jobs: include: - target: "x86_64-unknown-linux-gnu" features: "derive" - std: ", std" + std: ",std" - target: "x86_64-unknown-linux-gnu" features: "derive, log" - std: ", std" + std: ",std" + - target: "thumbv6m-none-eabi" + embedded: ",embedded" steps: - name: Checkout source code uses: actions/checkout@v2 @@ -123,7 +125,7 @@ jobs: uses: actions-rs/cargo@v1 with: command: build - args: --all --target '${{ matrix.target }}' --features '${{ matrix.features }}${{ matrix.std }}' + args: --all --target '${{ matrix.target }}' --features '${{ matrix.features }}${{ matrix.std }}${{ matrix.embedded }}' test: name: Test From 4326ae2e5e404142a7ba93e875724c1b85abf716 Mon Sep 17 00:00:00 2001 From: Rasmus Melchior Jacobsen Date: Thu, 21 Dec 2023 11:33:50 +0100 Subject: [PATCH 14/23] Update ci.yml --- .github/workflows/ci.yml | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 75eaab4d..223a1a1f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -97,18 +97,14 @@ jobs: [ "", "derive", - "derive, log", - "derive, defmt", ] include: - target: "x86_64-unknown-linux-gnu" - features: "derive" - std: ",std" + extra_features: "std,log" - target: "x86_64-unknown-linux-gnu" - features: "derive, log" - std: ",std" + extra_features: "std,defmt" - target: "thumbv6m-none-eabi" - embedded: ",embedded" + extra_features: "embedded" steps: - name: Checkout source code uses: actions/checkout@v2 @@ -125,7 +121,7 @@ jobs: uses: actions-rs/cargo@v1 with: command: build - args: --all --target '${{ matrix.target }}' --features '${{ matrix.features }}${{ matrix.std }}${{ matrix.embedded }}' + args: --all --target '${{ matrix.target }}' --features '${{ matrix.features }},${{ matrix.extra_features }}' test: name: Test From 33a7bf2437d4d070b4494ce1ee7dc92d08bd8a87 Mon Sep 17 00:00:00 2001 From: Rasmus Melchior Jacobsen Date: Thu, 21 Dec 2023 11:49:23 +0100 Subject: [PATCH 15/23] Update ci.yml --- .github/workflows/ci.yml | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 223a1a1f..84d1dbf7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -97,6 +97,8 @@ jobs: [ "", "derive", + "async", + "derive,async", ] include: - target: "x86_64-unknown-linux-gnu" @@ -121,7 +123,7 @@ jobs: uses: actions-rs/cargo@v1 with: command: build - args: --all --target '${{ matrix.target }}' --features '${{ matrix.features }},${{ matrix.extra_features }}' + args: --workspace --target '${{ matrix.target }}' --features '${{ matrix.features }},${{ matrix.extra_features }}' test: name: Test @@ -140,4 +142,4 @@ jobs: uses: actions-rs/cargo@v1 with: command: test - args: --all --features std heapless \ No newline at end of file + args: --workspace --features std,heapless \ No newline at end of file From d4f079f72c16374a39518d297382cc536b83b428 Mon Sep 17 00:00:00 2001 From: Rasmus Melchior Jacobsen Date: Thu, 21 Dec 2023 11:51:51 +0100 Subject: [PATCH 16/23] Update ci.yml --- .github/workflows/ci.yml | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 84d1dbf7..f6d61b3c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -103,10 +103,8 @@ jobs: include: - target: "x86_64-unknown-linux-gnu" extra_features: "std,log" - - target: "x86_64-unknown-linux-gnu" - extra_features: "std,defmt" - target: "thumbv6m-none-eabi" - extra_features: "embedded" + extra_features: "embedded,defmt" steps: - name: Checkout source code uses: actions/checkout@v2 From 4d67fcd09da6a8b60d7cc65c0a350e2c8cf6039c Mon Sep 17 00:00:00 2001 From: Rasmus Melchior Jacobsen Date: Thu, 21 Dec 2023 12:11:20 +0100 Subject: [PATCH 17/23] Simplify ResponseSlot and ensure that lock is not taken when signal is emitted --- atat/src/asynch/client.rs | 6 ++--- atat/src/blocking/client.rs | 12 +++------ atat/src/response_slot.rs | 49 +++++++++++++++++++++++++------------ 3 files changed, 39 insertions(+), 28 deletions(-) diff --git a/atat/src/asynch/client.rs b/atat/src/asynch/client.rs index 8ae7321e..7d3da310 100644 --- a/atat/src/asynch/client.rs +++ b/atat/src/asynch/client.rs @@ -4,7 +4,7 @@ use embassy_time::{Duration, Instant, TimeoutError, Timer}; use embedded_io_async::Write; use futures::{ future::{select, Either}, - pin_mut, Future, + pin_mut, Future, FutureExt, }; pub struct Client<'a, W: Write, const INGRESS_BUF_SIZE: usize> { @@ -55,7 +55,7 @@ impl<'a, W: Write, const INGRESS_BUF_SIZE: usize> Client<'a, W, INGRESS_BUF_SIZE } async fn wait_response(&mut self, timeout: Duration) -> Result<(), Error> { - self.with_timeout(timeout, self.res_slot.wait()) + self.with_timeout(timeout, self.res_slot.wait().map(|_| ())) .await .map_err(|_| Error::Timeout) } @@ -105,7 +105,7 @@ impl AtatClient for Client<'_, W, INGRE } else { self.wait_response(Duration::from_millis(Cmd::MAX_TIMEOUT_MS.into())) .await?; - let response = self.res_slot.get(); + let response = self.res_slot.try_get().unwrap(); let response: &Response = &response.borrow(); cmd.parse(response.into()) } diff --git a/atat/src/blocking/client.rs b/atat/src/blocking/client.rs index 844808ed..ae05bfae 100644 --- a/atat/src/blocking/client.rs +++ b/atat/src/blocking/client.rs @@ -62,14 +62,8 @@ where } fn wait_response(&mut self, timeout: Duration) -> Result<(), Error> { - self.with_timeout(timeout, || { - if self.res_slot.available() { - Some(()) - } else { - None - } - }) - .map_err(|_| Error::Timeout) + self.with_timeout(timeout, || self.res_slot.try_get().map(|_| ())) + .map_err(|_| Error::Timeout) } fn with_timeout( @@ -111,7 +105,7 @@ where cmd.parse(Ok(&[])) } else { self.wait_response(Duration::from_millis(Cmd::MAX_TIMEOUT_MS.into()))?; - let response = self.res_slot.get(); + let response = self.res_slot.try_get().unwrap(); let response: &Response = &response.borrow(); cmd.parse(response.into()) } diff --git a/atat/src/response_slot.rs b/atat/src/response_slot.rs index 1cd24330..3fd43aa8 100644 --- a/atat/src/response_slot.rs +++ b/atat/src/response_slot.rs @@ -32,28 +32,38 @@ impl ResponseSlot { self.1.reset(); } - /// Wait for a response to become available - pub async fn wait(&self) { - self.1.wait().await - } + /// Wait for a response to become available and get a guard to the response + pub async fn wait<'a>(&'a self) -> ResponseSlotGuard<'a, N> { + self.1.wait().await; - /// Get whether a response is available - pub fn available(&self) -> bool { - self.1.signaled() + // The mutex is not locked when signal is emitted + self.0.try_lock().unwrap() } - /// Get a guard to response - pub fn get<'a>(&'a self) -> ResponseSlotGuard<'a, N> { - self.0.try_lock().unwrap() + /// Wait for a response to become available and get a guard to the response + pub fn try_get<'a>(&'a self) -> Option> { + if self.1.signaled() { + // The mutex is not locked when signal is emitted + Some(self.0.try_lock().unwrap()) + } else { + None + } } pub(crate) fn signal_prompt(&self, prompt: u8) -> Result<(), SlotInUseError> { if self.1.signaled() { return Err(SlotInUseError); } - let buf = self.0.try_lock().unwrap(); - let mut res = buf.borrow_mut(); - *res = Response::Prompt(prompt); + + // Not currently signaled: We know that the client is not currently holding the response slot guard + + { + let buf = self.0.try_lock().unwrap(); + let mut res = buf.borrow_mut(); + *res = Response::Prompt(prompt); + } + + // Mutex is unlocked before we signal self.1.signal(()); Ok(()) } @@ -65,9 +75,16 @@ impl ResponseSlot { if self.1.signaled() { return Err(SlotInUseError); } - let buf = self.0.try_lock().unwrap(); - let mut res = buf.borrow_mut(); - *res = response.into(); + + // Not currently signaled: We know that the client is not currently holding the response slot guard + + { + let buf = self.0.try_lock().unwrap(); + let mut res = buf.borrow_mut(); + *res = response.into(); + } + + // Mutex is unlocked before we signal self.1.signal(()); Ok(()) } From 97dbfecedfdea11523b850047333950cdcd38ae6 Mon Sep 17 00:00:00 2001 From: Rasmus Melchior Jacobsen Date: Thu, 21 Dec 2023 12:59:47 +0100 Subject: [PATCH 18/23] Fix compile error --- atat/src/ingress.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/atat/src/ingress.rs b/atat/src/ingress.rs index 57c10948..dfd0ec4b 100644 --- a/atat/src/ingress.rs +++ b/atat/src/ingress.rs @@ -336,7 +336,7 @@ mod tests { assert_eq!(Urc::ConnectOk, sub.try_next_message_pure().unwrap()); assert_eq!(Urc::ConnectFail, sub.try_next_message_pure().unwrap()); - let response = res_slot.get(); + let response = res_slot.try_get().unwrap(); let response: &Response<100> = &response.borrow(); assert_eq!(&Response::default(), response); } From 16b8e9868c53908ebcac8fa536bfbb21be1ee589 Mon Sep 17 00:00:00 2001 From: Rasmus Melchior Jacobsen Date: Thu, 21 Dec 2023 13:31:14 +0100 Subject: [PATCH 19/23] Remove client::send explicit lifetimes --- atat/src/asynch/client.rs | 6 +++--- atat/src/asynch/mod.rs | 7 ++----- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/atat/src/asynch/client.rs b/atat/src/asynch/client.rs index 7d3da310..9664f867 100644 --- a/atat/src/asynch/client.rs +++ b/atat/src/asynch/client.rs @@ -97,7 +97,7 @@ impl<'a, W: Write, const INGRESS_BUF_SIZE: usize> Client<'a, W, INGRESS_BUF_SIZE } impl AtatClient for Client<'_, W, INGRESS_BUF_SIZE> { - async fn send<'a, Cmd: AtatCmd>(&'a mut self, cmd: &'a Cmd) -> Result { + async fn send(&mut self, cmd: &Cmd) -> Result { let len = cmd.write(&mut self.buf); self.send_request(len).await?; if !Cmd::EXPECTS_RESPONSE_CODE { @@ -203,7 +203,7 @@ mod tests { // Do not emit a response effectively causing a timeout }); - let send = tokio::task::spawn(async move { + let send = tokio::spawn(async move { assert_eq!(Err(Error::Timeout), client.send(&cmd).await); }); @@ -251,7 +251,7 @@ mod tests { rx.signal_response(Ok(&[])).unwrap(); }); - let send = tokio::task::spawn(async move { + let send = tokio::spawn(async move { assert_eq!(Ok(NoResponse), client.send(&cmd).await); }); diff --git a/atat/src/asynch/mod.rs b/atat/src/asynch/mod.rs index 96976926..d65b1e57 100644 --- a/atat/src/asynch/mod.rs +++ b/atat/src/asynch/mod.rs @@ -12,12 +12,9 @@ pub trait AtatClient { /// This function will also make sure that at least `self.config.cmd_cooldown` /// has passed since the last response or URC has been received, to allow /// the slave AT device time to deliver URC's. - async fn send<'a, Cmd: AtatCmd>(&'a mut self, cmd: &'a Cmd) -> Result; + async fn send(&mut self, cmd: &Cmd) -> Result; - async fn send_retry<'a, Cmd: AtatCmd>( - &'a mut self, - cmd: &'a Cmd, - ) -> Result { + async fn send_retry(&mut self, cmd: &Cmd) -> Result { for attempt in 1..=Cmd::ATTEMPTS { if attempt > 1 { debug!("Attempt {}:", attempt); From d8ed5aa2037357d77b982bfa5b4b58aa65b51e40 Mon Sep 17 00:00:00 2001 From: Rasmus Melchior Jacobsen Date: Thu, 21 Dec 2023 13:56:35 +0100 Subject: [PATCH 20/23] For now, remove tests that cannot compile --- atat/src/asynch/client.rs | 172 +++++++++++++++++++------------------- 1 file changed, 86 insertions(+), 86 deletions(-) diff --git a/atat/src/asynch/client.rs b/atat/src/asynch/client.rs index 9664f867..e82bdbe0 100644 --- a/atat/src/asynch/client.rs +++ b/atat/src/asynch/client.rs @@ -175,90 +175,90 @@ mod tests { }}; } - #[tokio::test] - async fn custom_timeout() { - static CALL_COUNT: AtomicU64 = AtomicU64::new(0); - - fn custom_response_timeout(sent: Instant, timeout: Duration) -> Instant { - CALL_COUNT.fetch_add(1, Ordering::Relaxed); - assert_eq!( - Duration::from_millis(SetModuleFunctionality::MAX_TIMEOUT_MS.into()), - timeout - ); - // Effectively ignoring the timeout configured for the command - // The default response timeout is "sent + timeout" - sent + Duration::from_millis(100) - } - - let (mut client, mut tx, _rx) = - setup!(Config::new().get_response_timeout(custom_response_timeout)); - - let cmd = SetModuleFunctionality { - fun: Functionality::APM, - rst: Some(ResetMode::DontReset), - }; - - let sent = tokio::spawn(async move { - tx.next_message_pure().await; - // Do not emit a response effectively causing a timeout - }); - - let send = tokio::spawn(async move { - assert_eq!(Err(Error::Timeout), client.send(&cmd).await); - }); - - let (sent, send) = join!(sent, send); - sent.unwrap(); - send.unwrap(); - - assert_ne!(0, CALL_COUNT.load(Ordering::Relaxed)); - } - - #[tokio::test] - async fn custom_timeout_modified_during_request() { - static CALL_COUNT: AtomicU64 = AtomicU64::new(0); - - fn custom_response_timeout(sent: Instant, timeout: Duration) -> Instant { - CALL_COUNT.fetch_add(1, Ordering::Relaxed); - assert_eq!( - Duration::from_millis(SetModuleFunctionality::MAX_TIMEOUT_MS.into()), - timeout - ); - // Effectively ignoring the timeout configured for the command - // The default response timeout is "sent + timeout" - // Let the timeout instant be extended depending on the current time - if Instant::now() < sent + Duration::from_millis(100) { - // Initial timeout - sent + Duration::from_millis(200) - } else { - // Extended timeout - sent + Duration::from_millis(500) - } - } - - let (mut client, mut tx, rx) = - setup!(Config::new().get_response_timeout(custom_response_timeout)); - - let cmd = SetModuleFunctionality { - fun: Functionality::APM, - rst: Some(ResetMode::DontReset), - }; - - let sent = tokio::spawn(async move { - tx.next_message_pure().await; - // Emit response in the extended timeout timeframe - Timer::after(Duration::from_millis(300)).await; - rx.signal_response(Ok(&[])).unwrap(); - }); - - let send = tokio::spawn(async move { - assert_eq!(Ok(NoResponse), client.send(&cmd).await); - }); - - let (sent, send) = join!(sent, send); - sent.unwrap(); - send.unwrap(); - - assert_ne!(0, CALL_COUNT.load(Ordering::Relaxed)); - } + // #[tokio::test] + // async fn custom_timeout() { + // static CALL_COUNT: AtomicU64 = AtomicU64::new(0); + + // fn custom_response_timeout(sent: Instant, timeout: Duration) -> Instant { + // CALL_COUNT.fetch_add(1, Ordering::Relaxed); + // assert_eq!( + // Duration::from_millis(SetModuleFunctionality::MAX_TIMEOUT_MS.into()), + // timeout + // ); + // // Effectively ignoring the timeout configured for the command + // // The default response timeout is "sent + timeout" + // sent + Duration::from_millis(100) + // } + + // let (mut client, mut tx, _rx) = + // setup!(Config::new().get_response_timeout(custom_response_timeout)); + + // let cmd = SetModuleFunctionality { + // fun: Functionality::APM, + // rst: Some(ResetMode::DontReset), + // }; + + // let sent = tokio::spawn(async move { + // tx.next_message_pure().await; + // // Do not emit a response effectively causing a timeout + // }); + + // let send = tokio::spawn(async move { + // assert_eq!(Err(Error::Timeout), client.send(&cmd).await); + // }); + + // let (sent, send) = join!(sent, send); + // sent.unwrap(); + // send.unwrap(); + + // assert_ne!(0, CALL_COUNT.load(Ordering::Relaxed)); + // } + + // #[tokio::test] + // async fn custom_timeout_modified_during_request() { + // static CALL_COUNT: AtomicU64 = AtomicU64::new(0); + + // fn custom_response_timeout(sent: Instant, timeout: Duration) -> Instant { + // CALL_COUNT.fetch_add(1, Ordering::Relaxed); + // assert_eq!( + // Duration::from_millis(SetModuleFunctionality::MAX_TIMEOUT_MS.into()), + // timeout + // ); + // // Effectively ignoring the timeout configured for the command + // // The default response timeout is "sent + timeout" + // // Let the timeout instant be extended depending on the current time + // if Instant::now() < sent + Duration::from_millis(100) { + // // Initial timeout + // sent + Duration::from_millis(200) + // } else { + // // Extended timeout + // sent + Duration::from_millis(500) + // } + // } + + // let (mut client, mut tx, rx) = + // setup!(Config::new().get_response_timeout(custom_response_timeout)); + + // let cmd = SetModuleFunctionality { + // fun: Functionality::APM, + // rst: Some(ResetMode::DontReset), + // }; + + // let sent = tokio::spawn(async move { + // tx.next_message_pure().await; + // // Emit response in the extended timeout timeframe + // Timer::after(Duration::from_millis(300)).await; + // rx.signal_response(Ok(&[])).unwrap(); + // }); + + // let send = tokio::spawn(async move { + // assert_eq!(Ok(NoResponse), client.send(&cmd).await); + // }); + + // let (sent, send) = join!(sent, send); + // sent.unwrap(); + // send.unwrap(); + + // assert_ne!(0, CALL_COUNT.load(Ordering::Relaxed)); + // } } From ac99750723f31b23b6f5f3a735d845a644c3bfc8 Mon Sep 17 00:00:00 2001 From: Rasmus Melchior Jacobsen Date: Thu, 21 Dec 2023 14:52:27 +0100 Subject: [PATCH 21/23] Fix test compile error --- atat/src/asynch/client.rs | 177 ++++++++++++++++++------------------ atat/src/blocking/client.rs | 3 +- atat/src/response_slot.rs | 16 +++- 3 files changed, 103 insertions(+), 93 deletions(-) diff --git a/atat/src/asynch/client.rs b/atat/src/asynch/client.rs index e82bdbe0..803ef43e 100644 --- a/atat/src/asynch/client.rs +++ b/atat/src/asynch/client.rs @@ -4,7 +4,7 @@ use embassy_time::{Duration, Instant, TimeoutError, Timer}; use embedded_io_async::Write; use futures::{ future::{select, Either}, - pin_mut, Future, FutureExt, + pin_mut, Future, }; pub struct Client<'a, W: Write, const INGRESS_BUF_SIZE: usize> { @@ -55,7 +55,7 @@ impl<'a, W: Write, const INGRESS_BUF_SIZE: usize> Client<'a, W, INGRESS_BUF_SIZE } async fn wait_response(&mut self, timeout: Duration) -> Result<(), Error> { - self.with_timeout(timeout, self.res_slot.wait().map(|_| ())) + self.with_timeout(timeout, self.res_slot.wait()) .await .map_err(|_| Error::Timeout) } @@ -105,6 +105,7 @@ impl AtatClient for Client<'_, W, INGRE } else { self.wait_response(Duration::from_millis(Cmd::MAX_TIMEOUT_MS.into())) .await?; + assert!(self.res_slot.signaled()); let response = self.res_slot.try_get().unwrap(); let response: &Response = &response.borrow(); cmd.parse(response.into()) @@ -175,90 +176,90 @@ mod tests { }}; } - // #[tokio::test] - // async fn custom_timeout() { - // static CALL_COUNT: AtomicU64 = AtomicU64::new(0); - - // fn custom_response_timeout(sent: Instant, timeout: Duration) -> Instant { - // CALL_COUNT.fetch_add(1, Ordering::Relaxed); - // assert_eq!( - // Duration::from_millis(SetModuleFunctionality::MAX_TIMEOUT_MS.into()), - // timeout - // ); - // // Effectively ignoring the timeout configured for the command - // // The default response timeout is "sent + timeout" - // sent + Duration::from_millis(100) - // } - - // let (mut client, mut tx, _rx) = - // setup!(Config::new().get_response_timeout(custom_response_timeout)); - - // let cmd = SetModuleFunctionality { - // fun: Functionality::APM, - // rst: Some(ResetMode::DontReset), - // }; - - // let sent = tokio::spawn(async move { - // tx.next_message_pure().await; - // // Do not emit a response effectively causing a timeout - // }); - - // let send = tokio::spawn(async move { - // assert_eq!(Err(Error::Timeout), client.send(&cmd).await); - // }); - - // let (sent, send) = join!(sent, send); - // sent.unwrap(); - // send.unwrap(); - - // assert_ne!(0, CALL_COUNT.load(Ordering::Relaxed)); - // } - - // #[tokio::test] - // async fn custom_timeout_modified_during_request() { - // static CALL_COUNT: AtomicU64 = AtomicU64::new(0); - - // fn custom_response_timeout(sent: Instant, timeout: Duration) -> Instant { - // CALL_COUNT.fetch_add(1, Ordering::Relaxed); - // assert_eq!( - // Duration::from_millis(SetModuleFunctionality::MAX_TIMEOUT_MS.into()), - // timeout - // ); - // // Effectively ignoring the timeout configured for the command - // // The default response timeout is "sent + timeout" - // // Let the timeout instant be extended depending on the current time - // if Instant::now() < sent + Duration::from_millis(100) { - // // Initial timeout - // sent + Duration::from_millis(200) - // } else { - // // Extended timeout - // sent + Duration::from_millis(500) - // } - // } - - // let (mut client, mut tx, rx) = - // setup!(Config::new().get_response_timeout(custom_response_timeout)); - - // let cmd = SetModuleFunctionality { - // fun: Functionality::APM, - // rst: Some(ResetMode::DontReset), - // }; - - // let sent = tokio::spawn(async move { - // tx.next_message_pure().await; - // // Emit response in the extended timeout timeframe - // Timer::after(Duration::from_millis(300)).await; - // rx.signal_response(Ok(&[])).unwrap(); - // }); - - // let send = tokio::spawn(async move { - // assert_eq!(Ok(NoResponse), client.send(&cmd).await); - // }); - - // let (sent, send) = join!(sent, send); - // sent.unwrap(); - // send.unwrap(); - - // assert_ne!(0, CALL_COUNT.load(Ordering::Relaxed)); - // } + #[tokio::test] + async fn custom_timeout() { + static CALL_COUNT: AtomicU64 = AtomicU64::new(0); + + fn custom_response_timeout(sent: Instant, timeout: Duration) -> Instant { + CALL_COUNT.fetch_add(1, Ordering::Relaxed); + assert_eq!( + Duration::from_millis(SetModuleFunctionality::MAX_TIMEOUT_MS.into()), + timeout + ); + // Effectively ignoring the timeout configured for the command + // The default response timeout is "sent + timeout" + sent + Duration::from_millis(100) + } + + let (mut client, mut tx, _rx) = + setup!(Config::new().get_response_timeout(custom_response_timeout)); + + let cmd = SetModuleFunctionality { + fun: Functionality::APM, + rst: Some(ResetMode::DontReset), + }; + + let sent = tokio::spawn(async move { + tx.next_message_pure().await; + // Do not emit a response effectively causing a timeout + }); + + let send = tokio::spawn(async move { + assert_eq!(Err(Error::Timeout), client.send(&cmd).await); + }); + + let (sent, send) = join!(sent, send); + sent.unwrap(); + send.unwrap(); + + assert_ne!(0, CALL_COUNT.load(Ordering::Relaxed)); + } + + #[tokio::test] + async fn custom_timeout_modified_during_request() { + static CALL_COUNT: AtomicU64 = AtomicU64::new(0); + + fn custom_response_timeout(sent: Instant, timeout: Duration) -> Instant { + CALL_COUNT.fetch_add(1, Ordering::Relaxed); + assert_eq!( + Duration::from_millis(SetModuleFunctionality::MAX_TIMEOUT_MS.into()), + timeout + ); + // Effectively ignoring the timeout configured for the command + // The default response timeout is "sent + timeout" + // Let the timeout instant be extended depending on the current time + if Instant::now() < sent + Duration::from_millis(100) { + // Initial timeout + sent + Duration::from_millis(200) + } else { + // Extended timeout + sent + Duration::from_millis(500) + } + } + + let (mut client, mut tx, rx) = + setup!(Config::new().get_response_timeout(custom_response_timeout)); + + let cmd = SetModuleFunctionality { + fun: Functionality::APM, + rst: Some(ResetMode::DontReset), + }; + + let sent = tokio::spawn(async move { + tx.next_message_pure().await; + // Emit response in the extended timeout timeframe + Timer::after(Duration::from_millis(300)).await; + rx.signal_response(Ok(&[])).unwrap(); + }); + + let send = tokio::spawn(async move { + assert_eq!(Ok(NoResponse), client.send(&cmd).await); + }); + + let (sent, send) = join!(sent, send); + sent.unwrap(); + send.unwrap(); + + assert_ne!(0, CALL_COUNT.load(Ordering::Relaxed)); + } } diff --git a/atat/src/blocking/client.rs b/atat/src/blocking/client.rs index ae05bfae..8d1baf27 100644 --- a/atat/src/blocking/client.rs +++ b/atat/src/blocking/client.rs @@ -62,7 +62,7 @@ where } fn wait_response(&mut self, timeout: Duration) -> Result<(), Error> { - self.with_timeout(timeout, || self.res_slot.try_get().map(|_| ())) + self.with_timeout(timeout, || self.res_slot.signaled().then_some(())) .map_err(|_| Error::Timeout) } @@ -105,6 +105,7 @@ where cmd.parse(Ok(&[])) } else { self.wait_response(Duration::from_millis(Cmd::MAX_TIMEOUT_MS.into()))?; + assert!(self.res_slot.signaled()); let response = self.res_slot.try_get().unwrap(); let response: &Response = &response.borrow(); cmd.parse(response.into()) diff --git a/atat/src/response_slot.rs b/atat/src/response_slot.rs index 3fd43aa8..642cf270 100644 --- a/atat/src/response_slot.rs +++ b/atat/src/response_slot.rs @@ -32,15 +32,25 @@ impl ResponseSlot { self.1.reset(); } + /// Get whether a response is available + pub fn signaled(&self) -> bool { + self.1.signaled() + } + + /// Wait for a response to become available + pub async fn wait(&self) { + self.1.wait().await; + } + /// Wait for a response to become available and get a guard to the response - pub async fn wait<'a>(&'a self) -> ResponseSlotGuard<'a, N> { + pub async fn get<'a>(&'a self) -> ResponseSlotGuard<'a, N> { self.1.wait().await; // The mutex is not locked when signal is emitted self.0.try_lock().unwrap() } - /// Wait for a response to become available and get a guard to the response + /// If available, get a guard to the response pub fn try_get<'a>(&'a self) -> Option> { if self.1.signaled() { // The mutex is not locked when signal is emitted @@ -56,7 +66,6 @@ impl ResponseSlot { } // Not currently signaled: We know that the client is not currently holding the response slot guard - { let buf = self.0.try_lock().unwrap(); let mut res = buf.borrow_mut(); @@ -77,7 +86,6 @@ impl ResponseSlot { } // Not currently signaled: We know that the client is not currently holding the response slot guard - { let buf = self.0.try_lock().unwrap(); let mut res = buf.borrow_mut(); From a843d3d8774704de3ae19d0c6b821e751a9d15f3 Mon Sep 17 00:00:00 2001 From: Rasmus Melchior Jacobsen Date: Thu, 21 Dec 2023 15:25:29 +0100 Subject: [PATCH 22/23] Make failing tests pass --- atat/src/asynch/client.rs | 26 ++++++++++++++++---------- atat/src/blocking/client.rs | 17 +++++++++++------ atat/src/response_slot.rs | 14 ++------------ 3 files changed, 29 insertions(+), 28 deletions(-) diff --git a/atat/src/asynch/client.rs b/atat/src/asynch/client.rs index 803ef43e..7b9ceffb 100644 --- a/atat/src/asynch/client.rs +++ b/atat/src/asynch/client.rs @@ -1,5 +1,9 @@ use super::AtatClient; -use crate::{helpers::LossyStr, response_slot::ResponseSlot, AtatCmd, Config, Error, Response}; +use crate::{ + helpers::LossyStr, + response_slot::{ResponseSlot, ResponseSlotGuard}, + AtatCmd, Config, Error, Response, +}; use embassy_time::{Duration, Instant, TimeoutError, Timer}; use embedded_io_async::Write; use futures::{ @@ -54,8 +58,11 @@ impl<'a, W: Write, const INGRESS_BUF_SIZE: usize> Client<'a, W, INGRESS_BUF_SIZE Ok(()) } - async fn wait_response(&mut self, timeout: Duration) -> Result<(), Error> { - self.with_timeout(timeout, self.res_slot.wait()) + async fn wait_response<'guard>( + &'guard mut self, + timeout: Duration, + ) -> Result, Error> { + self.with_timeout(timeout, self.res_slot.get()) .await .map_err(|_| Error::Timeout) } @@ -103,10 +110,9 @@ impl AtatClient for Client<'_, W, INGRE if !Cmd::EXPECTS_RESPONSE_CODE { cmd.parse(Ok(&[])) } else { - self.wait_response(Duration::from_millis(Cmd::MAX_TIMEOUT_MS.into())) + let response = self + .wait_response(Duration::from_millis(Cmd::MAX_TIMEOUT_MS.into())) .await?; - assert!(self.res_slot.signaled()); - let response = self.res_slot.try_get().unwrap(); let response: &Response = &response.borrow(); cmd.parse(response.into()) } @@ -191,7 +197,7 @@ mod tests { sent + Duration::from_millis(100) } - let (mut client, mut tx, _rx) = + let (mut client, mut tx, _slot) = setup!(Config::new().get_response_timeout(custom_response_timeout)); let cmd = SetModuleFunctionality { @@ -233,11 +239,11 @@ mod tests { sent + Duration::from_millis(200) } else { // Extended timeout - sent + Duration::from_millis(500) + sent + Duration::from_millis(50000) } } - let (mut client, mut tx, rx) = + let (mut client, mut tx, slot) = setup!(Config::new().get_response_timeout(custom_response_timeout)); let cmd = SetModuleFunctionality { @@ -249,7 +255,7 @@ mod tests { tx.next_message_pure().await; // Emit response in the extended timeout timeframe Timer::after(Duration::from_millis(300)).await; - rx.signal_response(Ok(&[])).unwrap(); + slot.signal_response(Ok(&[])).unwrap(); }); let send = tokio::spawn(async move { diff --git a/atat/src/blocking/client.rs b/atat/src/blocking/client.rs index 8d1baf27..6616545a 100644 --- a/atat/src/blocking/client.rs +++ b/atat/src/blocking/client.rs @@ -2,7 +2,11 @@ use embassy_time::{Duration, Instant, TimeoutError}; use embedded_io::Write; use super::{blocking_timer::BlockingTimer, AtatClient}; -use crate::{helpers::LossyStr, response_slot::ResponseSlot, AtatCmd, Config, Error, Response}; +use crate::{ + helpers::LossyStr, + response_slot::{ResponseSlot, ResponseSlotGuard}, + AtatCmd, Config, Error, Response, +}; /// Client responsible for handling send, receive and timeout from the /// userfacing side. The client is decoupled from the ingress-manager through @@ -61,8 +65,11 @@ where Ok(()) } - fn wait_response(&mut self, timeout: Duration) -> Result<(), Error> { - self.with_timeout(timeout, || self.res_slot.signaled().then_some(())) + fn wait_response<'guard>( + &'guard mut self, + timeout: Duration, + ) -> Result, Error> { + self.with_timeout(timeout, || self.res_slot.try_get()) .map_err(|_| Error::Timeout) } @@ -104,9 +111,7 @@ where if !Cmd::EXPECTS_RESPONSE_CODE { cmd.parse(Ok(&[])) } else { - self.wait_response(Duration::from_millis(Cmd::MAX_TIMEOUT_MS.into()))?; - assert!(self.res_slot.signaled()); - let response = self.res_slot.try_get().unwrap(); + let response = self.wait_response(Duration::from_millis(Cmd::MAX_TIMEOUT_MS.into()))?; let response: &Response = &response.borrow(); cmd.parse(response.into()) } diff --git a/atat/src/response_slot.rs b/atat/src/response_slot.rs index 642cf270..081bef85 100644 --- a/atat/src/response_slot.rs +++ b/atat/src/response_slot.rs @@ -32,17 +32,7 @@ impl ResponseSlot { self.1.reset(); } - /// Get whether a response is available - pub fn signaled(&self) -> bool { - self.1.signaled() - } - - /// Wait for a response to become available - pub async fn wait(&self) { - self.1.wait().await; - } - - /// Wait for a response to become available and get a guard to the response + /// Wait for a response to be signaled and get a guard to the response pub async fn get<'a>(&'a self) -> ResponseSlotGuard<'a, N> { self.1.wait().await; @@ -50,7 +40,7 @@ impl ResponseSlot { self.0.try_lock().unwrap() } - /// If available, get a guard to the response + /// If signaled, get a guard to the response pub fn try_get<'a>(&'a self) -> Option> { if self.1.signaled() { // The mutex is not locked when signal is emitted From aaa8730c0f61654e7b3b309bb5f291e62d080175 Mon Sep 17 00:00:00 2001 From: Rasmus Melchior Jacobsen Date: Thu, 21 Dec 2023 15:51:00 +0100 Subject: [PATCH 23/23] Fix embassy example --- examples/Cargo.toml | 2 ++ examples/src/bin/embassy.rs | 12 +++++++----- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 5b88f354..a7def24e 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -33,6 +33,8 @@ panic-probe = { version = "0.3.0", features = ["print-defmt"], optional = true } embassy-executor = { version = "0.3", features = [ "defmt", "nightly", + "arch-cortex-m", + "executor-thread", "integrated-timers", ], optional = true } embassy-time = "0.2" diff --git a/examples/src/bin/embassy.rs b/examples/src/bin/embassy.rs index be3af143..6c16d982 100644 --- a/examples/src/bin/embassy.rs +++ b/examples/src/bin/embassy.rs @@ -1,7 +1,6 @@ #![no_std] #![no_main] #![feature(type_alias_impl_trait)] -#![allow(incomplete_features)] use atat::{ asynch::{AtatClient, Client}, @@ -10,9 +9,9 @@ use atat::{ use atat_examples::common; use embassy_executor::Spawner; use embassy_rp::{ - interrupt, + bind_interrupts, peripherals::UART0, - uart::{self, BufferedUart, BufferedUartRx}, + uart::{self, BufferedInterruptHandler, BufferedUart, BufferedUartRx}, }; use {defmt_rtt as _, panic_probe as _}; @@ -20,18 +19,21 @@ const INGRESS_BUF_SIZE: usize = 1024; const URC_CAPACITY: usize = 128; const URC_SUBSCRIBERS: usize = 3; +bind_interrupts!(struct Irqs { + UART0_IRQ => BufferedInterruptHandler; +}); + #[embassy_executor::main] async fn main(spawner: Spawner) { let p = embassy_rp::init(Default::default()); let (tx_pin, rx_pin, uart) = (p.PIN_0, p.PIN_1, p.UART0); - let irq = interrupt::take!(UART0_IRQ); let tx_buf = static_cell::make_static!([0u8; 16]); let rx_buf = static_cell::make_static!([0u8; 16]); let uart = BufferedUart::new( uart, - irq, + Irqs, tx_pin, rx_pin, tx_buf,