Skip to content

Commit

Permalink
Rewrite database subsetting
Browse files Browse the repository at this point in the history
Previously, subsetting involved creating entities called RowSubsets
which were then passed to a function which created a database using
them.

Now, you create the new database first as a context manager, and use it
to produce RowSubset objects.

The new approach means the context manager can track its own subsets, so
you don't need to explicitly list them.
  • Loading branch information
Nicholas FitzRoy-Dale committed Oct 18, 2023
1 parent fdbccdc commit d14adf4
Show file tree
Hide file tree
Showing 11 changed files with 240 additions and 178 deletions.
24 changes: 18 additions & 6 deletions rime/graphql.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from .event import MessageEvent, MediaEvent
from .mergedcontact import merge_contacts
from .anonymise import Anonymiser
from .subset import Subsetter
from .subset import DeviceSubsetter, ProviderSubsetter, SubsetOptions, SubsetFillOption
from .device import Device
from .provider import Provider
from .event import Event, MessageSession
Expand Down Expand Up @@ -495,7 +495,8 @@ def _create_subset_prepare_device(rime, target):
return device, new_device


def _create_subset_populate_device(rime, device, new_device, events_filter_obj, contacts_filter_obj):
def _create_subset_populate_device(rime, opts: SubsetOptions, device, new_device, events_filter_obj,
contacts_filter_obj):
"""
Create a subset of 'targets' with events and contacts matching the supplied filters.
Expand All @@ -504,7 +505,7 @@ def _create_subset_populate_device(rime, device, new_device, events_filter_obj,
Raises CreateSubsetError for any error we might expect callers to reasonably deal with.
May raise anything else if something goes wrong (e.g. while a particular provider is perfoming a subset).
"""
subsetter = Subsetter(new_device.fs)
device_subsetter = DeviceSubsetter(new_device.fs, opts)

# Find and remember the contacts subset.
contacts_by_provider = {
Expand All @@ -522,12 +523,16 @@ def _create_subset_populate_device(rime, device, new_device, events_filter_obj,
else:
contacts_for_provider = []

ebp.provider.subset(subsetter, ebp.events, contacts_for_provider)
provider_subsetter = ProviderSubsetter(device_subsetter, opts)

ebp.provider.subset(provider_subsetter, ebp.events, contacts_for_provider)

# Also subset contacts-only providers with no subsetted events.
for provider_name in unsubsetted_contact_providers:
provider = device.providers[provider_name]
provider.subset(subsetter, [], contacts_by_provider[provider_name])

provider_subsetter = ProviderSubsetter(device_subsetter, opts)
provider.subset(provider_subsetter, [], contacts_by_provider[provider_name])

new_device.reload_providers()

Expand All @@ -543,6 +548,12 @@ def resolve_create_subset(rime, info, targets, eventsFilter, contactsFilter, ano

devices = [] # list of (old device, new device)

# Create default subset options. TODO: Expose these to GraphQL.
opts = SubsetOptions(
fill=SubsetFillOption.UNUSED_DBS_AND_TABLES,
anonymise=anonymise,
)

async def _create_subset_impl(bg_rime):
# TODO: Error reporting, status updates
errorMessage = None
Expand All @@ -557,7 +568,8 @@ async def _create_subset_impl(bg_rime):

try:
for old_device, new_device in devices:
_create_subset_populate_device(bg_rime, old_device, new_device, events_filter_obj, contacts_filter_obj)
_create_subset_populate_device(bg_rime, opts, old_device, new_device, events_filter_obj,
contacts_filter_obj)
if anonymiser:
for provider in new_device.providers.values():
anonymiser.anonymise_device_provider(new_device, provider)
Expand Down
14 changes: 14 additions & 0 deletions rime/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from abc import ABC, abstractmethod

from .media import MediaData
from .filesystem.base import File


class Provider(ABC):
Expand All @@ -28,6 +29,13 @@ def subset(self, subsetter, events, contacts):
"""
return None

@abstractmethod
def all_files(self) -> list[File]:
"""
Return a list of all files associated with the app.
"""
return []

@abstractmethod
def search_events(self, device, filter_):
"""
Expand Down Expand Up @@ -57,6 +65,12 @@ def find_providers(fs) -> dict[str, Provider]:
providers_dict = {}
for provider in Provider.__subclasses__():
instance = provider.from_filesystem(fs)

# Sanity check the providers...
if not provider.NAME or not provider.FRIENDLY_NAME:
raise ValueError(f'Provider {provider.__name__} has no NAME or FRIENDLY_NAME')

# ... and store them.
if instance:
providers_dict[provider.NAME] = instance

Expand Down
36 changes: 17 additions & 19 deletions rime/providers/androidcontacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,25 +138,23 @@ def search_contacts(self, contacts_filter):
}

def subset(self, subsetter, events: Iterable[Event], contacts: Iterable[Contact]):
rows_contacts = subsetter.row_subset('contacts', '_id')
rows_raw_contacts = subsetter.row_subset('raw_contacts', '_id')
rows_data = subsetter.row_subset('data', 'raw_contact_id')
mimetypes = subsetter.complete_table('mimetypes')

for contact in contacts:
if contact.providerName != self.NAME:
continue

rows_contacts.add(contact.local_id)
rows_raw_contacts.update(contact.provider_data.raw_contact_row_ids)
rows_data.update(contact.provider_data.raw_contact_row_ids)

subsetter.create_db_and_copy_rows(self.conn, self.DB_PATH, [
rows_contacts,
rows_raw_contacts,
rows_data,
mimetypes
])
with subsetter.db_subset(src_conn=self.conn, new_db_pathname=self.DB_PATH) as db_subset:
rows_contacts = db_subset.row_subset('contacts', '_id')
rows_raw_contacts = db_subset.row_subset('raw_contacts', '_id')
rows_data = db_subset.row_subset('data', 'raw_contact_id')
db_subset.complete_table('mimetypes')

for contact in contacts:
if contact.providerName != self.NAME:
continue

rows_contacts.add(contact.local_id)
rows_raw_contacts.update(contact.provider_data.raw_contact_row_ids)
rows_data.update(contact.provider_data.raw_contact_row_ids)

def all_files(self):
# TODO
return []

def _get_mime_types(self):
"""
Expand Down
4 changes: 4 additions & 0 deletions rime/providers/androidgenericmedia.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ def subset(self, subsetter, events, contacts):
"""
return None

def all_files(self):
# TODO
return []

def search_events(self, device, filter_):
"""
Search for events matching ``filter_``, which is an EventFilter.
Expand Down
29 changes: 14 additions & 15 deletions rime/providers/androidtelephony.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,23 +120,22 @@ def subset(self, subsetter, events, contacts):
"""
Create a subset using the given events and contacts.
"""
rows_sms = subsetter.row_subset("sms", "_id")
rows_threads = subsetter.row_subset('threads', '_id')
rows_address = subsetter.row_subset('canonical_addresses', '_id')
with subsetter.db_subset(src_conn=self.db, new_db_pathname=self.MMSSMS_DB) as subset_db:
rows_sms = subset_db.row_subset("sms", "_id")
rows_threads = subset_db.row_subset('threads', '_id')
rows_address = subset_db.row_subset('canonical_addresses', '_id')

rows_address.update(
contact.local_id for contact in contacts if contact.providerName == self.NAME
)
rows_threads.update(
event.provider_data.threads_table_id for event in events if event.provider.NAME == self.NAME
)
rows_sms.update(event.id_ for event in events if event.provider.NAME == self.NAME)
rows_address.update(
contact.local_id for contact in contacts if contact.providerName == self.NAME
)
rows_threads.update(
event.provider_data.threads_table_id for event in events if event.provider.NAME == self.NAME
)
rows_sms.update(event.id_ for event in events if event.provider.NAME == self.NAME)

subsetter.create_db_and_copy_rows(self.db, self.MMSSMS_DB, [
rows_sms,
rows_threads,
rows_address,
])
def all_files(self):
# TODO
return []

@classmethod
def from_filesystem(cls, fs):
Expand Down
110 changes: 52 additions & 58 deletions rime/providers/androidwhatsapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,64 +450,58 @@ def subset(self, subsetter, events: Iterable[Event], contacts: Iterable[Contact]
Create a WhatsApp subset using the provided events and contacts.
"""
# Copy the contacts
rows_wa_contacts = subsetter.row_subset("wa_contacts", "_id")
rows_wa_contacts.update(contact.provider_data.id_ for contact in contacts)

# Copy session participants
rows_group_participant_user = subsetter.row_subset("group_participant_user", "_id")

# Copy events
rows_message = subsetter.row_subset("message", "_id")
rows_message_media = subsetter.row_subset("message_media", "message_row_id")
rows_message_details = subsetter.row_subset("message_details", "message_row_id")
rows_jid = subsetter.row_subset("jid", "_id")
rows_chat = subsetter.row_subset("chat", "_id")

for event in events:
# Reject if it's not one of ours.
if not isinstance(event, MessageEvent) or event.provider.NAME != self.NAME:
continue

wa_message = event.provider_data

rows_message.add(wa_message.message_row_id)
if event.sender and event.sender.provider_data:
rows_jid.update(jid_contact.id_ for jid_contact in event.sender.provider_data.jid_contacts)

if event.session:
wa_session = event.session.provider_data
rows_group_participant_user.update(wa_session.group_participant_user_ids)
if wa_session.group_user_id:
rows_wa_contacts.add(wa_session.group_user_id)
if wa_session.group_jid_row_id:
rows_jid.add(wa_session.group_jid_row_id)

rows_message_details.add(wa_message.message_row_id)
rows_chat.add(wa_message.chat_row_id)
rows_message_media.add(wa_message.message_row_id)

# Write the message db.
subsetter.create_db_and_copy_rows(self.msgdb, self.MESSAGE_DB, [
rows_message,
rows_message_details,
rows_message_media,
rows_jid,
rows_chat,
rows_group_participant_user,
])

# Write the contacts DB.
subsetter.create_db_and_copy_rows(self.wadb, self.WA_DB, [rows_wa_contacts])

# copy media by copying each named file.
media_table = Table('message_media')
query = Query.from_(media_table) \
.select('file_path') \
.where(media_table.message_row_id.isin(rows_message_media.rows))

for row in self.msgdb.execute(query.get_sql()):
pathname = self._media_path(row[0])
subsetter.copy_file(self.fs.open(pathname), pathname)
with subsetter.db_subset(src_conn=self.wadb, new_db_pathname=self.WA_DB) as subset_wadb, \
subsetter.db_subset(src_conn=self.msgdb, new_db_pathname=self.MESSAGE_DB) as subset_msgdb:

rows_wa_contacts = subset_wadb.row_subset("wa_contacts", "_id")
rows_wa_contacts.update(contact.provider_data.id_ for contact in contacts)

# Copy session participants
rows_group_participant_user = subset_msgdb.row_subset("group_participant_user", "_id")

# Copy events
rows_message = subset_msgdb.row_subset("message", "_id")
rows_message_media = subset_msgdb.row_subset("message_media", "message_row_id")
rows_message_details = subset_msgdb.row_subset("message_details", "message_row_id")
rows_jid = subset_msgdb.row_subset("jid", "_id")
rows_chat = subset_msgdb.row_subset("chat", "_id")

for event in events:
# Reject if it's not one of ours.
if not isinstance(event, MessageEvent) or event.provider.NAME != self.NAME:
continue

wa_message = event.provider_data

rows_message.add(wa_message.message_row_id)
if event.sender and event.sender.provider_data:
rows_jid.update(jid_contact.id_ for jid_contact in event.sender.provider_data.jid_contacts)

if event.session:
wa_session = event.session.provider_data
rows_group_participant_user.update(wa_session.group_participant_user_ids)
if wa_session.group_user_id:
rows_wa_contacts.add(wa_session.group_user_id)
if wa_session.group_jid_row_id:
rows_jid.add(wa_session.group_jid_row_id)

rows_message_details.add(wa_message.message_row_id)
rows_chat.add(wa_message.chat_row_id)
rows_message_media.add(wa_message.message_row_id)

# copy media by copying each named file.
media_table = Table('message_media')
query = Query.from_(media_table) \
.select('file_path') \
.where(media_table.message_row_id.isin(rows_message_media.rows))

for row in self.msgdb.execute(query.get_sql()):
pathname = self._media_path(row[0])
subsetter.copy_file(self.fs.open(pathname), pathname)

def all_files(self):
# TODO
return []

@classmethod
def from_filesystem(cls, fs):
Expand Down
53 changes: 27 additions & 26 deletions rime/providers/imessage.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,32 +152,33 @@ def search_contacts(self, filter_):
def subset(self, subsetter, events: Iterable[Event], contacts: Iterable[Contact]):
"""
"""
handle_rows = subsetter.row_subset('handle', 'ROWID')
handle_rows.update(contact.provider_data.row_id for contact in contacts if contact.providerName == self.NAME)

message_rows = subsetter.row_subset('message', 'ROWID')
chat_rows = subsetter.row_subset('chat', 'ROWID')
chat_message_join_rows = subsetter.row_subset('chat_message_join', 'chat_id')
chat_handle_join_rows = subsetter.row_subset('chat_handle_join', 'chat_id')

for event in events:
if not isinstance(event, MessageEvent) or event.provider != self:
continue

message_rows.add(event.provider_data.message_row_id)
chat_rows.add(event.provider_data.chat_row_id)
chat_message_join_rows.add(event.provider_data.chat_row_id)
chat_handle_join_rows.add(event.provider_data.chat_row_id)
if event.session:
handle_rows.update(contact.provider_data.row_id for contact in event.session.participants)

subsetter.create_db_and_copy_rows(self.conn, self.MESSAGE_DB, [
handle_rows,
message_rows,
chat_rows,
chat_message_join_rows,
chat_handle_join_rows,
])
with subsetter.db_subset(src_conn=self.conn, new_db_pathname=self.MESSAGE_DB) as subset_db:
handle_rows = subset_db.row_subset('handle', 'ROWID')
handle_rows.update(
contact.provider_data.row_id
for contact in contacts
if contact.providerName == self.NAME
)

message_rows = subset_db.row_subset('message', 'ROWID')
chat_rows = subset_db.row_subset('chat', 'ROWID')
chat_message_join_rows = subset_db.row_subset('chat_message_join', 'chat_id')
chat_handle_join_rows = subset_db.row_subset('chat_handle_join', 'chat_id')

for event in events:
if not isinstance(event, MessageEvent) or event.provider != self:
continue

message_rows.add(event.provider_data.message_row_id)
chat_rows.add(event.provider_data.chat_row_id)
chat_message_join_rows.add(event.provider_data.chat_row_id)
chat_handle_join_rows.add(event.provider_data.chat_row_id)
if event.session:
handle_rows.update(contact.provider_data.row_id for contact in event.session.participants)

def all_files(self):
# TODO
return []

@classmethod
def from_filesystem(cls, fs):
Expand Down
Loading

0 comments on commit d14adf4

Please sign in to comment.