Skip to content

Commit

Permalink
Mutate a runInteraction->simple_upsert_txn into a simple_upsert for _…
Browse files Browse the repository at this point in the history
…insert_graph_receipt
  • Loading branch information
realtyem committed Sep 12, 2023
1 parent 07234cb commit 4964815
Showing 1 changed file with 9 additions and 17 deletions.
26 changes: 9 additions & 17 deletions synapse/storage/databases/main/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -795,27 +795,21 @@ async def insert_receipt(
now - event_ts,
)

await self.db_pool.runInteraction(
"insert_graph_receipt",
self._insert_graph_receipt_txn,
await self._insert_graph_receipt(
room_id,
receipt_type,
user_id,
event_ids,
thread_id,
data,
# Use READ_COMMITTED to avoid 'could not serialize access due to concurrent
# update' Postgres errors which lead to rollbacks and re-dos.
isolation_level=IsolationLevel.READ_COMMITTED,
)

max_persisted_id = self._receipts_id_gen.get_current_token()

return stream_id, max_persisted_id

def _insert_graph_receipt_txn(
async def _insert_graph_receipt(
self,
txn: LoggingTransaction,
room_id: str,
receipt_type: str,
user_id: str,
Expand All @@ -825,13 +819,6 @@ def _insert_graph_receipt_txn(
) -> None:
assert self._can_write_to_receipts

txn.call_after(
self._get_receipts_for_user_with_orderings.invalidate,
(user_id, receipt_type),
)
# FIXME: This shouldn't invalidate the whole cache
txn.call_after(self._get_linearized_receipts_for_room.invalidate, (room_id,))

keyvalues = {
"room_id": room_id,
"receipt_type": receipt_type,
Expand All @@ -843,8 +830,8 @@ def _insert_graph_receipt_txn(
else:
keyvalues["thread_id"] = thread_id

self.db_pool.simple_upsert_txn(
txn,
await self.db_pool.simple_upsert(
desc="insert_graph_receipt",
table="receipts_graph",
keyvalues=keyvalues,
values={
Expand All @@ -854,6 +841,11 @@ def _insert_graph_receipt_txn(
where_clause=where_clause,
)

self._get_receipts_for_user_with_orderings.invalidate((user_id, receipt_type))

# FIXME: This shouldn't invalidate the whole cache
self._get_linearized_receipts_for_room.invalidate((room_id,))


class ReceiptsBackgroundUpdateStore(SQLBaseStore):
POPULATE_RECEIPT_EVENT_STREAM_ORDERING = "populate_event_stream_ordering"
Expand Down

0 comments on commit 4964815

Please sign in to comment.