Skip to content

Commit

Permalink
Upsert a contact when seeing it for the first time (#211)
Browse files Browse the repository at this point in the history
Also:

* Remove unimplemented CLI subcommands
* Fix upserting profile keys when receiving messages
  • Loading branch information
gferon committed Nov 20, 2023
1 parent 7f90577 commit 676cf3a
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 87 deletions.
29 changes: 4 additions & 25 deletions presage-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,6 @@ enum Cmd {
#[clap(long, help = "Force to register again if already registered")]
force: bool,
},
#[clap(about = "Unregister from Signal")]
Unregister,
#[clap(
about = "Generate a QR code to scan with Signal for iOS or Android to link this client as secondary device"
)]
LinkDevice {
/// Possible values: staging, production
#[clap(long, short = 's', default_value = "production")]
Expand All @@ -106,16 +101,6 @@ enum Cmd {
#[clap(long, value_parser = parse_base64_profile_key)]
profile_key: Option<ProfileKey>,
},
#[clap(about = "Set a name, status and avatar")]
UpdateProfile,
#[clap(about = "Check if a user is registered on Signal")]
GetUserStatus,
#[clap(about = "Block contacts or groups")]
Block,
#[clap(about = "Unblock contacts or groups")]
Unblock,
#[clap(about = "Update the details of a contact")]
UpdateContact,
#[clap(about = "Receive all pending messages and saves them to disk")]
Receive {
#[clap(long = "notifications", short = 'n')]
Expand Down Expand Up @@ -420,15 +405,15 @@ fn print_message<S: Store>(
let ts = content.timestamp();
let (prefix, body) = match msg {
Msg::Received(Thread::Contact(sender), body) => {
let contact = format_contact(*sender);
let contact = format_contact(sender);
(format!("From {contact} @ {ts}: "), body)
}
Msg::Sent(Thread::Contact(recipient), body) => {
let contact = format_contact(*recipient);
let contact = format_contact(recipient);
(format!("To {contact} @ {ts}"), body)
}
Msg::Received(Thread::Group(key), body) => {
let sender = format_contact(content.metadata.sender.uuid);
let sender = format_contact(&content.metadata.sender.uuid);
let group = format_group(*key);
(format!("From {sender} to group {group} @ {ts}: "), body)
}
Expand Down Expand Up @@ -571,7 +556,6 @@ async fn run<S: Store + 'static>(subcommand: Cmd, config_store: S) -> anyhow::Re

send(&mut manager, Recipient::Group(master_key), data_message).await?;
}
Cmd::Unregister => unimplemented!(),
Cmd::RetrieveProfile {
uuid,
mut profile_key,
Expand Down Expand Up @@ -599,11 +583,6 @@ async fn run<S: Store + 'static>(subcommand: Cmd, config_store: S) -> anyhow::Re
};
println!("{profile:#?}");
}
Cmd::UpdateProfile => unimplemented!(),
Cmd::GetUserStatus => unimplemented!(),
Cmd::Block => unimplemented!(),
Cmd::Unblock => unimplemented!(),
Cmd::UpdateContact => unimplemented!(),
Cmd::ListGroups => {
let manager = Manager::load_registered(config_store).await?;
for group in manager.store().groups()? {
Expand Down Expand Up @@ -648,7 +627,7 @@ async fn run<S: Store + 'static>(subcommand: Cmd, config_store: S) -> anyhow::Re
}
Cmd::GetContact { ref uuid } => {
let manager = Manager::load_registered(config_store).await?;
match manager.store().contact_by_id(*uuid)? {
match manager.store().contact_by_id(uuid)? {
Some(contact) => println!("{contact:#?}"),
None => eprintln!("Could not find contact for {uuid}"),
}
Expand Down
13 changes: 4 additions & 9 deletions presage-store-sled/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,14 +428,9 @@ impl ContentsStore for SledStore {
Ok(())
}

fn save_contacts(
&mut self,
contacts: impl Iterator<Item = Contact>,
) -> Result<(), SledStoreError> {
for contact in contacts {
self.insert(SLED_TREE_CONTACTS, contact.uuid, contact)?;
}
debug!("saved contacts");
fn save_contact(&mut self, contact: &Contact) -> Result<(), SledStoreError> {
self.insert(SLED_TREE_CONTACTS, contact.uuid, contact)?;
debug!("saved contact");
Ok(())
}

Expand All @@ -447,7 +442,7 @@ impl ContentsStore for SledStore {
})
}

fn contact_by_id(&self, id: Uuid) -> Result<Option<Contact>, SledStoreError> {
fn contact_by_id(&self, id: &Uuid) -> Result<Option<Contact>, SledStoreError> {
self.get(SLED_TREE_CONTACTS, id)
}

Expand Down
157 changes: 111 additions & 46 deletions presage/src/manager/registered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ use libsignal_service::messagepipe::{Incoming, MessagePipe, ServiceCredentials};
use libsignal_service::models::Contact;
use libsignal_service::prelude::phonenumber::PhoneNumber;
use libsignal_service::prelude::Uuid;
use libsignal_service::profile_cipher::ProfileCipher;
use libsignal_service::proto::data_message::Delete;
use libsignal_service::proto::{
sync_message, AttachmentPointer, DataMessage, EditMessage, GroupContextV2, NullMessage,
SyncMessage,
SyncMessage, Verified,
};
use libsignal_service::protocol::SenderCertificate;
use libsignal_service::protocol::{PrivateKey, PublicKey};
Expand Down Expand Up @@ -508,6 +509,7 @@ impl<S: Store> Manager<S, Registered> {
encrypted_messages: S,
message_receiver: MessageReceiver<HyperPushService>,
service_cipher: ServiceCipher<C>,
push_service: HyperPushService,
store: C,
groups_manager: GroupsManager<HyperPushService, InMemoryCredentialsCache>,
mode: ReceivingMode,
Expand All @@ -517,6 +519,7 @@ impl<S: Store> Manager<S, Registered> {
encrypted_messages: Box::pin(self.receive_messages_encrypted().await?),
message_receiver: MessageReceiver::new(self.identified_push_service()),
service_cipher: self.new_service_cipher()?,
push_service: self.identified_push_service(),
store: self.store.clone(),
groups_manager: self.groups_manager()?,
mode,
Expand All @@ -539,15 +542,11 @@ impl<S: Store> Manager<S, Registered> {
match state.message_receiver.retrieve_contacts(contacts).await {
Ok(contacts) => {
let _ = state.store.clear_contacts();
match state
.store
.save_contacts(contacts.filter_map(Result::ok))
{
Ok(()) => {
info!("saved contacts");
}
Err(e) => {
info!("saving contacts");
for contact in contacts.filter_map(Result::ok) {
if let Err(e) = state.store.save_contact(&contact) {
warn!("failed to save contacts: {e}");
break;
}
}
}
Expand Down Expand Up @@ -603,7 +602,13 @@ impl<S: Store> Manager<S, Registered> {
}
}

if let Err(e) = save_message(&mut state.store, content.clone()) {
if let Err(e) = save_message(
&mut state.store,
&mut state.push_service,
content.clone(),
)
.await
{
error!("Error saving message to store: {}", e);
}

Expand Down Expand Up @@ -704,7 +709,8 @@ impl<S: Store> Manager<S, Registered> {
body: content_body,
};

save_message(&mut self.store, content)?;
let mut push_service = self.identified_push_service();
save_message(&mut self.store, &mut push_service, content).await?;

Ok(())
}
Expand Down Expand Up @@ -802,7 +808,8 @@ impl<S: Store> Manager<S, Registered> {
body: content_body,
};

save_message(&mut self.store, content)?;
let mut push_service = self.identified_push_service();
save_message(&mut self.store, &mut push_service, content).await?;

Ok(())
}
Expand Down Expand Up @@ -899,7 +906,7 @@ impl<S: Store> Manager<S, Registered> {
pub async fn thread_title(&self, thread: &Thread) -> Result<String, Error<S::Error>> {
match thread {
Thread::Contact(uuid) => {
let contact = match self.store.contact_by_id(*uuid) {
let contact = match self.store.contact_by_id(uuid) {
Ok(contact) => contact,
Err(e) => {
info!("Error getting contact by id: {}, {:?}", e, uuid);
Expand All @@ -925,7 +932,7 @@ impl<S: Store> Manager<S, Registered> {
/// Note: this only currently works when linked as secondary device (the contacts are sent by the primary device at linking time)
#[deprecated = "use the store handle directly"]
pub fn contact_by_id(&self, id: &Uuid) -> Result<Option<Contact>, Error<S::Error>> {
Ok(self.store.contact_by_id(*id)?)
Ok(self.store.contact_by_id(id)?)
}

/// Returns an iterator on contacts stored in the [Store].
Expand Down Expand Up @@ -1010,52 +1017,110 @@ async fn upsert_group<S: Store>(
Ok(store.group(master_key_bytes.try_into()?)?)
}

fn save_message<S: Store>(store: &mut S, message: Content) -> Result<(), Error<S::Error>> {
async fn save_message<S: Store>(
store: &mut S,
push_service: &mut HyperPushService,
message: Content,
) -> Result<(), Error<S::Error>> {
// derive the thread from the message type
let thread = Thread::try_from(&message)?;

// only save DataMessage and SynchronizeMessage (sent)
let message = match message.body {
ContentBody::NullMessage(_) => Some(message),
ContentBody::DataMessage(ref data_message)
ContentBody::DataMessage(
ref data_message @ DataMessage {
ref profile_key, ..
},
)
| ContentBody::SynchronizeMessage(SyncMessage {
sent:
Some(sync_message::Sent {
message: Some(ref data_message),
message:
Some(
ref data_message @ DataMessage {
ref profile_key, ..
},
),
..
}),
..
}) => match data_message {
DataMessage {
profile_key: Some(profile_key_bytes),
delete:
Some(Delete {
target_sent_timestamp: Some(ts),
}),
..
} => {
// update recipient profile key
if let Ok(profile_key_bytes) = profile_key_bytes.clone().try_into() {
let sender_uuid = message.metadata.sender.uuid;
let profile_key = ProfileKey::create(profile_key_bytes);
debug!("inserting profile key for {sender_uuid}");
store.upsert_profile_key(&sender_uuid, profile_key)?;
}) => {
// update recipient profile key if changed
if let Some(profile_key_bytes) = profile_key.clone().and_then(|p| p.try_into().ok()) {
let sender_uuid = message.metadata.sender.uuid;
let profile_key = ProfileKey::create(profile_key_bytes);
debug!("inserting profile key for {sender_uuid}");

// Either:
// - insert a new contact with the profile information
// - update the contact if the profile key has changed
// TODO: mark this contact as "created by us" maybe to know whether we should update it or not
if store.contact_by_id(&sender_uuid)?.is_none()
|| !store
.profile_key(&sender_uuid)?
.is_some_and(|p| p.bytes == profile_key.bytes)
{
let encrypted_profile = push_service
.retrieve_profile_by_id(sender_uuid.into(), Some(profile_key))
.await?;
let profile_cipher = ProfileCipher::from(profile_key);
let decrypted_profile = encrypted_profile.decrypt(profile_cipher).unwrap();

let contact = Contact {
uuid: sender_uuid,
phone_number: None,
name: decrypted_profile
.name
// FIXME: this assumes [firstname] [lastname]
.map(|pn| {
if let Some(family_name) = pn.family_name {
format!("{} {}", pn.given_name, family_name)
} else {
pn.given_name
}
})
.unwrap_or_default(),
profile_key: profile_key.bytes.to_vec(),
color: None,
blocked: false,
expire_timer: 0,
inbox_position: 0,
archived: false,
avatar: None,
verified: Verified::default(),
};

info!("saved contact after seeing {sender_uuid} for the first time");
store.save_contact(&contact)?;
}

// replace an existing message by an empty NullMessage
if let Some(mut existing_msg) = store.message(&thread, *ts)? {
existing_msg.metadata.sender.uuid = Uuid::nil();
existing_msg.body = NullMessage::default().into();
store.save_message(&thread, existing_msg)?;
debug!("message in thread {thread} @ {ts} deleted");
None
} else {
warn!("could not find message to delete in thread {thread} @ {ts}");
None
store.upsert_profile_key(&sender_uuid, profile_key)?;
}

match data_message {
DataMessage {
delete:
Some(Delete {
target_sent_timestamp: Some(ts),
}),
..
} => {
// replace an existing message by an empty NullMessage
if let Some(mut existing_msg) = store.message(&thread, *ts)? {
existing_msg.metadata.sender.uuid = Uuid::nil();
existing_msg.body = NullMessage::default().into();
store.save_message(&thread, existing_msg)?;
debug!("message in thread {thread} @ {ts} deleted");
None
} else {
warn!("could not find message to delete in thread {thread} @ {ts}");
None
}
}
_ => Some(message),
}
_ => Some(message),
},
}
ContentBody::EditMessage(EditMessage {
target_sent_timestamp: Some(ts),
data_message: Some(data_message),
Expand Down Expand Up @@ -1088,8 +1153,8 @@ fn save_message<S: Store>(store: &mut S, message: Content) -> Result<(), Error<S
call_event: Some(_),
..
}) => Some(message),
ContentBody::SynchronizeMessage(s) => {
debug!("skipping saving sync message without interesting fields: {s:?}");
ContentBody::SynchronizeMessage(_) => {
debug!("skipping saving sync message without interesting fields");
None
}
ContentBody::ReceiptMessage(_) => {
Expand Down
11 changes: 4 additions & 7 deletions presage/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ pub trait ContentsStore {
/// or [Group::disappearing_messages_timer].
fn expire_timer(&self, thread: &Thread) -> Result<Option<u32>, Self::ContentsStoreError> {
match thread {
Thread::Contact(uuid) => Ok(self.contact_by_id(*uuid)?.map(|c| c.expire_timer)),
Thread::Contact(uuid) => Ok(self.contact_by_id(uuid)?.map(|c| c.expire_timer)),
Thread::Group(key) => Ok(self
.group(*key)?
.and_then(|g| g.disappearing_messages_timer)
Expand All @@ -175,17 +175,14 @@ pub trait ContentsStore {
/// Clear all saved synchronized contact data
fn clear_contacts(&mut self) -> Result<(), Self::ContentsStoreError>;

/// Replace all contact data
fn save_contacts(
&mut self,
contacts: impl Iterator<Item = Contact>,
) -> Result<(), Self::ContentsStoreError>;
/// Save a contact
fn save_contact(&mut self, contacts: &Contact) -> Result<(), Self::ContentsStoreError>;

/// Get an iterator on all stored (synchronized) contacts
fn contacts(&self) -> Result<Self::ContactsIter, Self::ContentsStoreError>;

/// Get contact data for a single user by its [Uuid].
fn contact_by_id(&self, id: Uuid) -> Result<Option<Contact>, Self::ContentsStoreError>;
fn contact_by_id(&self, id: &Uuid) -> Result<Option<Contact>, Self::ContentsStoreError>;

/// Delete all cached group data
fn clear_groups(&mut self) -> Result<(), Self::ContentsStoreError>;
Expand Down

0 comments on commit 676cf3a

Please sign in to comment.