diff --git a/src/shadows/mod.rs b/src/shadows/mod.rs index fa9d487..2f40f8f 100644 --- a/src/shadows/mod.rs +++ b/src/shadows/mod.rs @@ -130,22 +130,34 @@ where >(&request) .map_err(|_| Error::Overflow)?; - let update_topic = Topic::Update.format::(self.mqtt.client_id(), S::NAME)?; - - self.mqtt - .publish(Publish { - dup: false, - qos: QoS::AtLeastOnce, - retain: false, - pid: None, - topic_name: update_topic.as_str(), - payload: payload.as_slice(), - properties: embedded_mqtt::Properties::Slice(&[]), - }) - .await - .map_err(Error::MqttError)?; + let message = self + .publish_and_subscribe(Topic::Update, payload.as_slice()) + .await?; - Ok(()) + //Check if topic is GetAccepted + match Topic::from_str(message.topic_name()) { + Some((Topic::UpdateAccepted, _, _)) => Ok(()), + Some((Topic::UpdateRejected, _, _)) => { + match serde_json_core::from_slice::(message.payload()) { + //Try to return shadow error from message error code. Return NotFound otherwise + Ok((error_response, _)) => { + if let Ok(shadow_error) = error_response.try_into() { + Err(Error::ShadowError(shadow_error)) + } else { + Err(Error::ShadowError(error::ShadowError::NotFound)) + } + } + Err(_) => { + error!("Error deserializing GetRejected message"); + Err(Error::ShadowError(error::ShadowError::NotFound)) + } + } + } + _ => { + error!("Expected Topic name GetRejected or GetAccepted but got something else"); + Err(Error::WrongShadowName) + } + } } /// Initiate a `GetShadow` request, updating the local state from the cloud. @@ -153,50 +165,7 @@ where //Wait for mqtt to connect self.mqtt.wait_connected().await; - //Subscribe to accepted and rejected - let mut sub = self - .mqtt - .subscribe::<2>(Subscribe::new(&[ - SubscribeTopic { - topic_path: topics::Topic::GetAccepted - .format::<64>(self.mqtt.client_id(), S::NAME)? - .as_str(), - maximum_qos: QoS::AtLeastOnce, - no_local: false, - retain_as_published: false, - retain_handling: RetainHandling::SendAtSubscribeTime, - }, - SubscribeTopic { - topic_path: topics::Topic::GetRejected - .format::<64>(self.mqtt.client_id(), S::NAME)? - .as_str(), - maximum_qos: QoS::AtLeastOnce, - no_local: false, - retain_as_published: false, - retain_handling: RetainHandling::SendAtSubscribeTime, - }, - ])) - .await - .map_err(Error::MqttError)?; - - //Initiate a get shadow request - let get_topic = Topic::Get.format::(self.mqtt.client_id(), S::NAME)?; - self.mqtt - .publish(Publish { - dup: false, - qos: QoS::AtLeastOnce, - retain: false, - pid: None, - topic_name: get_topic.as_str(), - payload: b"", - properties: embedded_mqtt::Properties::Slice(&[]), - }) - .await - .map_err(Error::MqttError)?; - - debug!("Wait for GetAccepted or GetRejected"); - //Wait for next message on topic - let get_message = sub.next_message().await.ok_or(Error::InvalidPayload)?; + let get_message = self.publish_and_subscribe(Topic::Get, b"").await?; //Check if topic is GetAccepted //Deserialize message @@ -243,20 +212,99 @@ where } pub async fn delete_shadow(&mut self) -> Result<(), Error> { - let delete_topic = Topic::Delete.format::(self.mqtt.client_id(), S::NAME)?; + //Wait for mqtt to connect + self.mqtt.wait_connected().await; + + let message = self + .publish_and_subscribe(topics::Topic::Delete, b"") + .await?; + + //Check if topic is DeleteAccepted + match Topic::from_str(message.topic_name()) { + Some((Topic::DeleteAccepted, _, _)) => Ok(()), + Some((Topic::DeleteRejected, _, _)) => { + match serde_json_core::from_slice::(message.payload()) { + //Try to return shadow error from message error code. Return NotFound otherwise + Ok((error_response, _)) => { + if let Ok(shadow_error) = error_response.try_into() { + Err(Error::ShadowError(shadow_error)) + } else { + Err(Error::ShadowError(error::ShadowError::NotFound)) + } + } + Err(_) => { + error!("Error deserializing GetRejected message"); + Err(Error::ShadowError(error::ShadowError::NotFound)) + } + } + } + _ => { + error!("Expected Topic name GetRejected or GetAccepted but got something else"); + Err(Error::WrongShadowName) + } + } + } + ///This function will subscribe to accepted and rejected topics and then do a publish. + ///It will only return when something is accepted or rejected + ///Topic is the topic you want to publish to + ///The function will automatically subscribe to the accepted and rejected topic related to the publish topic + async fn publish_and_subscribe( + &mut self, + topic: topics::Topic, + payload: &[u8], + ) -> Result, Error> { + let (accepted, rejected) = match topic { + Topic::Get => (Topic::GetAccepted, Topic::GetRejected), + Topic::Update => (Topic::UpdateAccepted, Topic::UpdateRejected), + Topic::Delete => (Topic::DeleteAccepted, Topic::DeleteRejected), + _ => return Err(Error::ShadowError(error::ShadowError::Forbidden)), + }; + + //*** SUBSCRIBE ***/ + let mut sub = self + .mqtt + .subscribe::<2>(Subscribe::new(&[ + SubscribeTopic { + topic_path: accepted + .format::<64>(self.mqtt.client_id(), S::NAME)? + .as_str(), + maximum_qos: QoS::AtLeastOnce, + no_local: false, + retain_as_published: false, + retain_handling: RetainHandling::SendAtSubscribeTime, + }, + SubscribeTopic { + topic_path: rejected + .format::<64>(self.mqtt.client_id(), S::NAME)? + .as_str(), + maximum_qos: QoS::AtLeastOnce, + no_local: false, + retain_as_published: false, + retain_handling: RetainHandling::SendAtSubscribeTime, + }, + ])) + .await + .map_err(Error::MqttError)?; + + //*** PUBLISH REQUEST ***/ + let topic_name = topic.format::(self.mqtt.client_id(), S::NAME)?; self.mqtt .publish(Publish { dup: false, qos: QoS::AtLeastOnce, retain: false, pid: None, - topic_name: delete_topic.as_str(), - payload: b"", + topic_name: topic_name.as_str(), + payload, properties: embedded_mqtt::Properties::Slice(&[]), }) .await .map_err(Error::MqttError)?; - Ok(()) + + //*** WAIT RESPONSE ***/ + debug!("Wait for Accepted or Rejected"); + let message = sub.next_message().await.ok_or(Error::InvalidPayload)?; + Ok(message) } } @@ -358,7 +406,9 @@ where } pub async fn delete_shadow(&mut self) -> Result<(), Error> { - self.handler.delete_shadow().await + self.handler.delete_shadow().await?; + self.dao.write(&S::default()).await?; + Ok(()) } }