Skip to content

Commit

Permalink
wait for accepted and rejected for delete and update as well
Browse files Browse the repository at this point in the history
  • Loading branch information
KennethKnudsen97 committed Jul 30, 2024
1 parent a89397c commit e2c9374
Showing 1 changed file with 114 additions and 64 deletions.
178 changes: 114 additions & 64 deletions src/shadows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,73 +130,42 @@ where
>(&request)
.map_err(|_| Error::Overflow)?;

let update_topic = Topic::Update.format::<MAX_TOPIC_LEN>(self.mqtt.client_id(), S::NAME)?;

self.mqtt
.publish(Publish {
dup: false,
qos: QoS::AtLeastOnce,
retain: false,
pid: None,
topic_name: update_topic.as_str(),
payload: payload.as_slice(),
properties: embedded_mqtt::Properties::Slice(&[]),
})
.await
.map_err(Error::MqttError)?;
let message = self
.publish_and_subscribe(Topic::Update, payload.as_slice())
.await?;

Ok(())
//Check if topic is GetAccepted
match Topic::from_str(message.topic_name()) {
Some((Topic::UpdateAccepted, _, _)) => Ok(()),
Some((Topic::UpdateRejected, _, _)) => {
match serde_json_core::from_slice::<ErrorResponse>(message.payload()) {
//Try to return shadow error from message error code. Return NotFound otherwise
Ok((error_response, _)) => {
if let Ok(shadow_error) = error_response.try_into() {
Err(Error::ShadowError(shadow_error))
} else {
Err(Error::ShadowError(error::ShadowError::NotFound))
}
}
Err(_) => {
error!("Error deserializing GetRejected message");
Err(Error::ShadowError(error::ShadowError::NotFound))
}
}
}
_ => {
error!("Expected Topic name GetRejected or GetAccepted but got something else");
Err(Error::WrongShadowName)
}
}
}

/// Initiate a `GetShadow` request, updating the local state from the cloud.
pub async fn get_shadow(&mut self) -> Result<S::PatchState, Error> {
//Wait for mqtt to connect
self.mqtt.wait_connected().await;

//Subscribe to accepted and rejected
let mut sub = self
.mqtt
.subscribe::<2>(Subscribe::new(&[
SubscribeTopic {
topic_path: topics::Topic::GetAccepted
.format::<64>(self.mqtt.client_id(), S::NAME)?
.as_str(),
maximum_qos: QoS::AtLeastOnce,
no_local: false,
retain_as_published: false,
retain_handling: RetainHandling::SendAtSubscribeTime,
},
SubscribeTopic {
topic_path: topics::Topic::GetRejected
.format::<64>(self.mqtt.client_id(), S::NAME)?
.as_str(),
maximum_qos: QoS::AtLeastOnce,
no_local: false,
retain_as_published: false,
retain_handling: RetainHandling::SendAtSubscribeTime,
},
]))
.await
.map_err(Error::MqttError)?;

//Initiate a get shadow request
let get_topic = Topic::Get.format::<MAX_TOPIC_LEN>(self.mqtt.client_id(), S::NAME)?;
self.mqtt
.publish(Publish {
dup: false,
qos: QoS::AtLeastOnce,
retain: false,
pid: None,
topic_name: get_topic.as_str(),
payload: b"",
properties: embedded_mqtt::Properties::Slice(&[]),
})
.await
.map_err(Error::MqttError)?;

debug!("Wait for GetAccepted or GetRejected");
//Wait for next message on topic
let get_message = sub.next_message().await.ok_or(Error::InvalidPayload)?;
let get_message = self.publish_and_subscribe(Topic::Get, b"").await?;

//Check if topic is GetAccepted
//Deserialize message
Expand Down Expand Up @@ -243,20 +212,99 @@ where
}

pub async fn delete_shadow(&mut self) -> Result<(), Error> {
let delete_topic = Topic::Delete.format::<MAX_TOPIC_LEN>(self.mqtt.client_id(), S::NAME)?;
//Wait for mqtt to connect
self.mqtt.wait_connected().await;

let message = self
.publish_and_subscribe(topics::Topic::Delete, b"")
.await?;

//Check if topic is DeleteAccepted
match Topic::from_str(message.topic_name()) {
Some((Topic::DeleteAccepted, _, _)) => Ok(()),
Some((Topic::DeleteRejected, _, _)) => {
match serde_json_core::from_slice::<ErrorResponse>(message.payload()) {
//Try to return shadow error from message error code. Return NotFound otherwise
Ok((error_response, _)) => {
if let Ok(shadow_error) = error_response.try_into() {
Err(Error::ShadowError(shadow_error))
} else {
Err(Error::ShadowError(error::ShadowError::NotFound))
}
}
Err(_) => {
error!("Error deserializing GetRejected message");
Err(Error::ShadowError(error::ShadowError::NotFound))
}
}
}
_ => {
error!("Expected Topic name GetRejected or GetAccepted but got something else");
Err(Error::WrongShadowName)
}
}
}
///This function will subscribe to accepted and rejected topics and then do a publish.
///It will only return when something is accepted or rejected
///Topic is the topic you want to publish to
///The function will automatically subscribe to the accepted and rejected topic related to the publish topic
async fn publish_and_subscribe(
&mut self,
topic: topics::Topic,
payload: &[u8],
) -> Result<embedded_mqtt::Message<'a, SUBS>, Error> {
let (accepted, rejected) = match topic {
Topic::Get => (Topic::GetAccepted, Topic::GetRejected),
Topic::Update => (Topic::UpdateAccepted, Topic::UpdateRejected),
Topic::Delete => (Topic::DeleteAccepted, Topic::DeleteRejected),
_ => return Err(Error::ShadowError(error::ShadowError::Forbidden)),
};

//*** SUBSCRIBE ***/
let mut sub = self
.mqtt
.subscribe::<2>(Subscribe::new(&[
SubscribeTopic {
topic_path: accepted
.format::<64>(self.mqtt.client_id(), S::NAME)?
.as_str(),
maximum_qos: QoS::AtLeastOnce,
no_local: false,
retain_as_published: false,
retain_handling: RetainHandling::SendAtSubscribeTime,
},
SubscribeTopic {
topic_path: rejected
.format::<64>(self.mqtt.client_id(), S::NAME)?
.as_str(),
maximum_qos: QoS::AtLeastOnce,
no_local: false,
retain_as_published: false,
retain_handling: RetainHandling::SendAtSubscribeTime,
},
]))
.await
.map_err(Error::MqttError)?;

//*** PUBLISH REQUEST ***/
let topic_name = topic.format::<MAX_TOPIC_LEN>(self.mqtt.client_id(), S::NAME)?;
self.mqtt
.publish(Publish {
dup: false,
qos: QoS::AtLeastOnce,
retain: false,
pid: None,
topic_name: delete_topic.as_str(),
payload: b"",
topic_name: topic_name.as_str(),
payload,
properties: embedded_mqtt::Properties::Slice(&[]),
})
.await
.map_err(Error::MqttError)?;
Ok(())

//*** WAIT RESPONSE ***/
debug!("Wait for Accepted or Rejected");
let message = sub.next_message().await.ok_or(Error::InvalidPayload)?;
Ok(message)
}
}

Expand Down Expand Up @@ -358,7 +406,9 @@ where
}

pub async fn delete_shadow(&mut self) -> Result<(), Error> {
self.handler.delete_shadow().await
self.handler.delete_shadow().await?;
self.dao.write(&S::default()).await?;
Ok(())
}
}

Expand Down

0 comments on commit e2c9374

Please sign in to comment.