diff --git a/presage-store-sled/src/lib.rs b/presage-store-sled/src/lib.rs index 87cf26640..395cda70e 100644 --- a/presage-store-sled/src/lib.rs +++ b/presage-store-sled/src/lib.rs @@ -428,14 +428,9 @@ impl ContentsStore for SledStore { Ok(()) } - fn save_contacts( - &mut self, - contacts: impl Iterator, - ) -> Result<(), SledStoreError> { - for contact in contacts { - self.insert(SLED_TREE_CONTACTS, contact.uuid, contact)?; - } - debug!("saved contacts"); + fn save_contact(&mut self, contact: &Contact) -> Result<(), SledStoreError> { + self.insert(SLED_TREE_CONTACTS, contact.uuid, contact)?; + debug!("saved contact"); Ok(()) } diff --git a/presage/src/manager/registered.rs b/presage/src/manager/registered.rs index 4227537bc..61b6c397e 100644 --- a/presage/src/manager/registered.rs +++ b/presage/src/manager/registered.rs @@ -13,10 +13,11 @@ use libsignal_service::messagepipe::{Incoming, MessagePipe, ServiceCredentials}; use libsignal_service::models::Contact; use libsignal_service::prelude::phonenumber::PhoneNumber; use libsignal_service::prelude::Uuid; +use libsignal_service::profile_cipher::ProfileCipher; use libsignal_service::proto::data_message::Delete; use libsignal_service::proto::{ sync_message, AttachmentPointer, DataMessage, EditMessage, GroupContextV2, NullMessage, - SyncMessage, + SyncMessage, Verified, }; use libsignal_service::protocol::SenderCertificate; use libsignal_service::protocol::{PrivateKey, PublicKey}; @@ -508,6 +509,7 @@ impl Manager { encrypted_messages: S, message_receiver: MessageReceiver, service_cipher: ServiceCipher, + push_service: HyperPushService, store: C, groups_manager: GroupsManager, mode: ReceivingMode, @@ -517,6 +519,7 @@ impl Manager { encrypted_messages: Box::pin(self.receive_messages_encrypted().await?), message_receiver: MessageReceiver::new(self.identified_push_service()), service_cipher: self.new_service_cipher()?, + push_service: self.identified_push_service(), store: self.store.clone(), groups_manager: self.groups_manager()?, mode, @@ -539,15 +542,11 @@ impl Manager { match state.message_receiver.retrieve_contacts(contacts).await { Ok(contacts) => { let _ = state.store.clear_contacts(); - match state - .store - .save_contacts(contacts.filter_map(Result::ok)) - { - Ok(()) => { - info!("saved contacts"); - } - Err(e) => { + info!("saving contacts"); + for contact in contacts.filter_map(Result::ok) { + if let Err(e) = state.store.save_contact(&contact) { warn!("failed to save contacts: {e}"); + break; } } } @@ -603,7 +602,13 @@ impl Manager { } } - if let Err(e) = save_message(&mut state.store, content.clone()) { + if let Err(e) = save_message( + &mut state.store, + &mut state.push_service, + content.clone(), + ) + .await + { error!("Error saving message to store: {}", e); } @@ -704,7 +709,8 @@ impl Manager { body: content_body, }; - save_message(&mut self.store, content)?; + let mut push_service = self.identified_push_service(); + save_message(&mut self.store, &mut push_service, content).await?; Ok(()) } @@ -802,7 +808,8 @@ impl Manager { body: content_body, }; - save_message(&mut self.store, content)?; + let mut push_service = self.identified_push_service(); + save_message(&mut self.store, &mut push_service, content).await?; Ok(()) } @@ -1010,7 +1017,11 @@ async fn upsert_group( Ok(store.group(master_key_bytes.try_into()?)?) } -fn save_message(store: &mut S, message: Content) -> Result<(), Error> { +async fn save_message( + store: &mut S, + push_service: &mut HyperPushService, + message: Content, +) -> Result<(), Error> { // derive the thread from the message type let thread = Thread::try_from(&message)?; @@ -1039,6 +1050,44 @@ fn save_message(store: &mut S, message: Content) -> Result<(), Error Result<(), Self::ContentsStoreError>; - /// Replace all contact data - fn save_contacts( - &mut self, - contacts: impl Iterator, - ) -> Result<(), Self::ContentsStoreError>; + /// Save a contact + fn save_contact(&mut self, contacts: &Contact) -> Result<(), Self::ContentsStoreError>; /// Get an iterator on all stored (synchronized) contacts fn contacts(&self) -> Result;