Skip to content

Commit

Permalink
Fix(async): Ensure one sub on topic (#64)
Browse files Browse the repository at this point in the history
* Ensure only one sub on the same topic

* wait for mqtt connected in report

* Reduce request locks to one
  • Loading branch information
KennethKnudsen97 authored Oct 10, 2024
1 parent d712932 commit 01c44a0
Showing 1 changed file with 18 additions and 1 deletion.
19 changes: 18 additions & 1 deletion src/shadows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ mod error;
mod shadow_diff;
pub mod topics;

use core::{marker::PhantomData, ops::DerefMut};
use core::{marker::PhantomData, ops::DerefMut, sync::atomic};

use bitmaps::{Bits, BitsImpl};
pub use data_types::Patch;
Expand Down Expand Up @@ -42,6 +42,9 @@ where
mqtt: &'m embedded_mqtt::MqttClient<'a, M, SUBS>,
subscription: Mutex<NoopRawMutex, Option<embedded_mqtt::Subscription<'a, 'm, M, SUBS, 2>>>,
_shadow: PhantomData<S>,
// request_lock is used to ensure that shadow operations such as subscribing, updating, or
// deleting are serialized, preventing multiple concurrent requests to the same MQTT topics.
request_lock: Mutex<NoopRawMutex, ()>,
}

impl<'a, 'm, M: RawMutex, S: ShadowState, const SUBS: usize> ShadowHandler<'a, 'm, M, S, SUBS>
Expand Down Expand Up @@ -95,6 +98,7 @@ where

if let Some(client) = delta.client_token {
if client.eq(self.mqtt.client_id()) {
warn!("DELTA CLIENT TOKEN WAS == TO DEVICE CLIENT ID");
return Ok(None);
}
}
Expand All @@ -105,6 +109,8 @@ where
/// Internal helper function for applying a delta state to the actual shadow
/// state, and update the cloud shadow.
async fn report<R: Serialize>(&self, reported: &R) -> Result<(), Error> {
let _update_requested_lock = self.request_lock.lock().await;

debug!(
"[{:?}] Updating reported shadow value.",
S::NAME.unwrap_or(CLASSIC_SHADOW),
Expand All @@ -127,6 +133,9 @@ where
S::MAX_PAYLOAD_SIZE + PARTIAL_REQUEST_OVERHEAD,
);

//Wait for mqtt to connect
self.mqtt.wait_connected().await;

let mut sub = self.publish_and_subscribe(Topic::Update, payload).await?;

//*** WAIT RESPONSE ***/
Expand Down Expand Up @@ -179,6 +188,8 @@ where

/// Initiate a `GetShadow` request, updating the local state from the cloud.
async fn get_shadow(&self) -> Result<DeltaState<S::PatchState>, Error> {
let _get_requested_lock = self.request_lock.lock().await;

//Wait for mqtt to connect
self.mqtt.wait_connected().await;

Expand Down Expand Up @@ -225,6 +236,8 @@ where
}

pub async fn delete_shadow(&self) -> Result<(), Error> {
let _delete_request = self.request_lock.lock().await;

// Wait for mqtt to connect
self.mqtt.wait_connected().await;

Expand Down Expand Up @@ -256,6 +269,8 @@ where
}

pub async fn create_shadow(&self) -> Result<DeltaState<S::PatchState>, Error> {
let _create_requested_lock = self.request_lock.lock().await;

debug!(
"[{:?}] Creating initial shadow value.",
S::NAME.unwrap_or(CLASSIC_SHADOW),
Expand Down Expand Up @@ -403,6 +418,7 @@ where
mqtt,
subscription: Mutex::new(None),
_shadow: PhantomData,
request_lock: Mutex::new(()),
};

Self {
Expand Down Expand Up @@ -528,6 +544,7 @@ where
mqtt,
subscription: Mutex::new(None),
_shadow: PhantomData,
request_lock: Mutex::new(()),
};
Self { handler, state }
}
Expand Down

0 comments on commit 01c44a0

Please sign in to comment.