diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 3a455f011b05..a074c439895e 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -795,27 +795,21 @@ async def insert_receipt( now - event_ts, ) - await self.db_pool.runInteraction( - "insert_graph_receipt", - self._insert_graph_receipt_txn, + await self._insert_graph_receipt( room_id, receipt_type, user_id, event_ids, thread_id, data, - # Use READ_COMMITTED to avoid 'could not serialize access due to concurrent - # update' Postgres errors which lead to rollbacks and re-dos. - isolation_level=IsolationLevel.READ_COMMITTED, ) max_persisted_id = self._receipts_id_gen.get_current_token() return stream_id, max_persisted_id - def _insert_graph_receipt_txn( + async def _insert_graph_receipt( self, - txn: LoggingTransaction, room_id: str, receipt_type: str, user_id: str, @@ -825,13 +819,6 @@ def _insert_graph_receipt_txn( ) -> None: assert self._can_write_to_receipts - txn.call_after( - self._get_receipts_for_user_with_orderings.invalidate, - (user_id, receipt_type), - ) - # FIXME: This shouldn't invalidate the whole cache - txn.call_after(self._get_linearized_receipts_for_room.invalidate, (room_id,)) - keyvalues = { "room_id": room_id, "receipt_type": receipt_type, @@ -843,8 +830,8 @@ def _insert_graph_receipt_txn( else: keyvalues["thread_id"] = thread_id - self.db_pool.simple_upsert_txn( - txn, + await self.db_pool.simple_upsert( + desc="insert_graph_receipt", table="receipts_graph", keyvalues=keyvalues, values={ @@ -854,6 +841,11 @@ def _insert_graph_receipt_txn( where_clause=where_clause, ) + self._get_receipts_for_user_with_orderings.invalidate((user_id, receipt_type)) + + # FIXME: This shouldn't invalidate the whole cache + self._get_linearized_receipts_for_room.invalidate((room_id,)) + class ReceiptsBackgroundUpdateStore(SQLBaseStore): POPULATE_RECEIPT_EVENT_STREAM_ORDERING = "populate_event_stream_ordering"