Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix(async): Ensure one sub on topic #64

Merged
merged 4 commits into from
Oct 10, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion src/shadows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ mod error;
mod shadow_diff;
pub mod topics;

use core::{marker::PhantomData, ops::DerefMut};
use core::{marker::PhantomData, ops::DerefMut, sync::atomic};

use bitmaps::{Bits, BitsImpl};
pub use data_types::Patch;
Expand Down Expand Up @@ -42,6 +42,9 @@ where
mqtt: &'m embedded_mqtt::MqttClient<'a, M, SUBS>,
subscription: Mutex<NoopRawMutex, Option<embedded_mqtt::Subscription<'a, 'm, M, SUBS, 2>>>,
_shadow: PhantomData<S>,
// request_lock is used to ensure that shadow operations such as subscribing, updating, or
// deleting are serialized, preventing multiple concurrent requests to the same MQTT topics.
request_lock: Mutex<NoopRawMutex, ()>,
}

impl<'a, 'm, M: RawMutex, S: ShadowState, const SUBS: usize> ShadowHandler<'a, 'm, M, S, SUBS>
Expand Down Expand Up @@ -95,6 +98,7 @@ where

if let Some(client) = delta.client_token {
if client.eq(self.mqtt.client_id()) {
warn!("DELTA CLIENT TOKEN WAS == TO DEVICE CLIENT ID");
return Ok(None);
}
}
Expand All @@ -105,6 +109,8 @@ where
/// Internal helper function for applying a delta state to the actual shadow
/// state, and update the cloud shadow.
async fn report<R: Serialize>(&self, reported: &R) -> Result<(), Error> {
let _update_requested_lock = self.request_lock.lock().await;

debug!(
"[{:?}] Updating reported shadow value.",
S::NAME.unwrap_or(CLASSIC_SHADOW),
Expand All @@ -127,6 +133,9 @@ where
S::MAX_PAYLOAD_SIZE + PARTIAL_REQUEST_OVERHEAD,
);

//Wait for mqtt to connect
self.mqtt.wait_connected().await;

let mut sub = self.publish_and_subscribe(Topic::Update, payload).await?;

//*** WAIT RESPONSE ***/
Expand Down Expand Up @@ -179,6 +188,8 @@ where

/// Initiate a `GetShadow` request, updating the local state from the cloud.
async fn get_shadow(&self) -> Result<DeltaState<S::PatchState>, Error> {
let _get_requested_lock = self.request_lock.lock().await;

//Wait for mqtt to connect
self.mqtt.wait_connected().await;

Expand Down Expand Up @@ -225,6 +236,8 @@ where
}

pub async fn delete_shadow(&self) -> Result<(), Error> {
let _delete_request = self.request_lock.lock().await;

// Wait for mqtt to connect
self.mqtt.wait_connected().await;

Expand Down Expand Up @@ -256,6 +269,8 @@ where
}

pub async fn create_shadow(&self) -> Result<DeltaState<S::PatchState>, Error> {
let _create_requested_lock = self.request_lock.lock().await;

debug!(
"[{:?}] Creating initial shadow value.",
S::NAME.unwrap_or(CLASSIC_SHADOW),
Expand Down Expand Up @@ -403,6 +418,7 @@ where
mqtt,
subscription: Mutex::new(None),
_shadow: PhantomData,
request_lock: Mutex::new(()),
};

Self {
Expand Down Expand Up @@ -528,6 +544,7 @@ where
mqtt,
subscription: Mutex::new(None),
_shadow: PhantomData,
request_lock: Mutex::new(()),
};
Self { handler, state }
}
Expand Down
Loading