Skip to content

Commit

Permalink
Fix some issues around SyncMessage (#210)
Browse files Browse the repository at this point in the history
  • Loading branch information
gferon authored Nov 19, 2023
1 parent 96ccb73 commit 7f90577
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 50 deletions.
66 changes: 40 additions & 26 deletions presage-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,12 @@ enum Cmd {
#[clap(long, short = 'k', help = "Master Key of the V2 group (hex string)", value_parser = parse_group_master_key)]
master_key: GroupMasterKeyBytes,
},
RequestSyncContacts,
RequestContactsSync,
}

enum Recipient {
Contact(Uuid),
Group(GroupMasterKeyBytes),
}

fn parse_group_master_key(value: &str) -> anyhow::Result<GroupMasterKeyBytes> {
Expand Down Expand Up @@ -209,22 +214,21 @@ async fn main() -> anyhow::Result<()> {
}

async fn send<S: Store + 'static>(
msg: &str,
uuid: &Uuid,
manager: &mut Manager<S, Registered>,
recipient: Recipient,
msg: impl Into<ContentBody>,
) -> anyhow::Result<()> {
let local = task::LocalSet::new();

let timestamp = std::time::SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_millis() as u64;

let message = ContentBody::DataMessage(DataMessage {
body: Some(msg.to_string()),
timestamp: Some(timestamp),
..Default::default()
});

let local = task::LocalSet::new();
let mut content_body = msg.into();
if let ContentBody::DataMessage(d) = &mut content_body {
d.timestamp = Some(timestamp);
}

local
.run_until(async move {
Expand All @@ -235,10 +239,22 @@ async fn send<S: Store + 'static>(
}
});

manager
.send_message(*uuid, message, timestamp)
.await
.unwrap();
match recipient {
Recipient::Contact(uuid) => {
info!("sending message to contact");
manager
.send_message(uuid, content_body, timestamp)
.await
.expect("failed to send message");
}
Recipient::Group(master_key) => {
info!("sending message to group");
manager
.send_message_to_group(&master_key, content_body, timestamp)
.await
.expect("failed to send message");
}
}
})
.await;

Expand Down Expand Up @@ -529,22 +545,22 @@ async fn run<S: Store + 'static>(subcommand: Cmd, config_store: S) -> anyhow::Re
}
Cmd::Send { uuid, message } => {
let mut manager = Manager::load_registered(config_store).await?;
send(&message, &uuid, &mut manager).await?;

let data_message = DataMessage {
body: Some(message),
..Default::default()
};

send(&mut manager, Recipient::Contact(uuid), data_message).await?;
}
Cmd::SendToGroup {
message,
master_key,
} => {
let mut manager = Manager::load_registered(config_store).await?;

let timestamp = std::time::SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_millis() as u64;

let data_message = DataMessage {
body: Some(message),
timestamp: Some(timestamp),
group_v2: Some(GroupContextV2 {
master_key: Some(master_key.to_vec()),
revision: Some(0),
Expand All @@ -553,9 +569,7 @@ async fn run<S: Store + 'static>(subcommand: Cmd, config_store: S) -> anyhow::Re
..Default::default()
};

manager
.send_message_to_group(&master_key, data_message, timestamp)
.await?;
send(&mut manager, Recipient::Group(master_key), data_message).await?;
}
Cmd::Unregister => unimplemented!(),
Cmd::RetrieveProfile {
Expand Down Expand Up @@ -656,9 +670,9 @@ async fn run<S: Store + 'static>(subcommand: Cmd, config_store: S) -> anyhow::Re
println!("{contact:#?}");
}
}
Cmd::RequestSyncContacts => {
Cmd::RequestContactsSync => {
let mut manager = Manager::load_registered(config_store).await?;
manager.request_contacts_sync().await?;
manager.sync_contacts().await?;
}
Cmd::ListMessages {
group_master_key,
Expand Down
2 changes: 1 addition & 1 deletion presage-store-sled/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -977,7 +977,7 @@ impl IdentityKeyStore for SledStore {
"failed to load registration ID",
"no registration data".into(),
))?;
Ok(data.registration_id())
Ok(data.registration_id)
}

async fn save_identity(
Expand Down
4 changes: 2 additions & 2 deletions presage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ authors = ["Gabriel Féron <[email protected]>"]
edition = "2021"

[dependencies]
libsignal-service = { git = "https://github.com/whisperfish/libsignal-service-rs", rev = "afb5114" }
libsignal-service-hyper = { git = "https://github.com/whisperfish/libsignal-service-rs", rev = "afb5114" }
libsignal-service = { git = "https://github.com/whisperfish/libsignal-service-rs", rev = "6fc62c8" }
libsignal-service-hyper = { git = "https://github.com/whisperfish/libsignal-service-rs", rev = "6fc62c8" }

base64 = "0.21"
futures = "0.3"
Expand Down
72 changes: 51 additions & 21 deletions presage/src/manager/registered.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::fmt;
use std::ops::RangeBounds;
use std::pin::pin;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use futures::{future, AsyncReadExt, Stream, StreamExt};
use libsignal_service::attachment_cipher::decrypt_in_place;
Expand Down Expand Up @@ -116,26 +117,32 @@ pub struct RegistrationData {
}

impl RegistrationData {
pub fn registration_id(&self) -> u32 {
self.registration_id
/// Account identity
pub fn aci(&self) -> Uuid {
self.service_ids.aci
}

pub fn service_ids(&self) -> &ServiceIds {
&self.service_ids
/// Phone number identity
pub fn pni(&self) -> Uuid {
self.service_ids.pni
}

/// Our own profile key
pub fn profile_key(&self) -> ProfileKey {
self.profile_key
}

pub fn device_name(&self) -> Option<&String> {
self.device_name.as_ref()
/// The name of the device (if linked as secondary)
pub fn device_name(&self) -> Option<&str> {
self.device_name.as_deref()
}

/// Account identity public key
pub fn aci_public_key(&self) -> PublicKey {
self.aci_public_key
}

/// Account identity private key
pub fn aci_private_key(&self) -> PrivateKey {
self.aci_private_key
}
Expand Down Expand Up @@ -283,7 +290,7 @@ impl<S: Store> Manager<S, Registered> {

account_manager
.set_account_attributes(AccountAttributes {
name: self.state.data.device_name().cloned(),
name: self.state.data.device_name().map(|d| d.to_string()),
registration_id: self.state.data.registration_id,
pni_registration_id,
signaling_key: None,
Expand Down Expand Up @@ -316,19 +323,40 @@ impl<S: Store> Manager<S, Registered> {
Ok(())
}

/// Request that the primary device to encrypt & send all of its contacts as a message to ourselves
/// which can be then received, decrypted and stored in the message receiving loop.
/// Requests contacts synchronization and waits until the primary device sends them
///
/// Note: DO NOT call this function if you're already running a receiving loop
pub async fn sync_contacts(&mut self) -> Result<(), Error<S::Error>> {
debug!("synchronizing contacts");

let mut messages = pin!(
self.receive_messages_with_mode(ReceivingMode::WaitForContacts)
.await?
);

self.request_contacts().await?;

tokio::time::timeout(Duration::from_secs(60), async move {
while let Some(msg) = messages.next().await {
log::trace!("got message while waiting for contacts sync: {msg:?}");
}
})
.await?;

Ok(())
}

/// Request the primary device to encrypt & send all of its contacts.
///
/// **Note**: If successful, the contacts are not yet received and stored, but will only be
/// processed when they're received using the `MessageReceiver`.
pub async fn request_contacts_sync(&mut self) -> Result<(), Error<S::Error>> {
/// processed when they're received after polling on the
pub async fn request_contacts(&mut self) -> Result<(), Error<S::Error>> {
trace!("requesting contacts sync");
let var_name = sync_message::request::Type::Contacts as i32;
let sync_message = SyncMessage {
request: Some(sync_message::Request {
r#type: Some(var_name),
r#type: Some(sync_message::request::Type::Contacts.into()),
}),
..Default::default()
..SyncMessage::with_padding()
};

let timestamp = SystemTime::now()
Expand Down Expand Up @@ -704,15 +732,17 @@ impl<S: Store> Manager<S, Registered> {
pub async fn send_message_to_group(
&mut self,
master_key_bytes: &[u8],
mut message: DataMessage,
message: impl Into<ContentBody>,
timestamp: u64,
) -> Result<(), Error<S::Error>> {
let mut content_body = message.into();

// Only update the expiration timer if it is not set.
match message {
DataMessage {
match content_body {
ContentBody::DataMessage(DataMessage {
expire_timer: ref mut timer,
..
} if timer.is_none() => {
}) if timer.is_none() => {
// Set the expire timer to None for errors.
let store_expire_timer = self
.store
Expand Down Expand Up @@ -755,7 +785,7 @@ impl<S: Store> Manager<S, Registered> {

let online_only = false;
let results = sender
.send_message_to_group(recipients, message.clone(), timestamp, online_only)
.send_message_to_group(recipients, content_body.clone(), timestamp, online_only)
.await;

// return first error if any
Expand All @@ -769,7 +799,7 @@ impl<S: Store> Manager<S, Registered> {
needs_receipt: false, // TODO: this is just wrong
unidentified_sender: false,
},
body: message.into(),
body: content_body,
};

save_message(&mut self.store, content)?;
Expand Down

0 comments on commit 7f90577

Please sign in to comment.