Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Process previously failed backfill events in the background #15585

Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
fd26164
Process previously failed backfill events in the background
MadLittleMods May 12, 2023
c5dc746
Add changelog
MadLittleMods May 12, 2023
8fc47d8
Add consideration
MadLittleMods May 12, 2023
b5d95f7
Fix lints
MadLittleMods May 12, 2023
ebc93be
Merge branch 'develop' into madlittlemods/process-previously-failed-e…
MadLittleMods May 16, 2023
e13f5a9
Always check for failed attempts
MadLittleMods May 16, 2023
70f5911
Add comments and concern about maybe queue
MadLittleMods May 16, 2023
45934fe
Process all failed events as a sequential task in the background
MadLittleMods May 16, 2023
b1998d7
Merge branch 'develop' into madlittlemods/process-previously-failed-e…
MadLittleMods May 16, 2023
93de856
Better comments
MadLittleMods May 16, 2023
631d7db
Add test for `separate_event_ids_with_failed_pull_attempts`
MadLittleMods May 16, 2023
beeccc3
Avoid doing extra work if the list is empty
MadLittleMods May 17, 2023
7eabc60
Make sure to retain the same order they were given in case the depth …
MadLittleMods May 17, 2023
7583c2c
Add comments why OrderedDict
MadLittleMods May 17, 2023
e101318
Make test more robust around ordering
MadLittleMods May 17, 2023
899fc34
Add test description
MadLittleMods May 17, 2023
b5aec4f
Same order separated results
MadLittleMods May 17, 2023
6edd126
Refactor to get_event_ids_with_failed_pull_attempts(...)
MadLittleMods May 17, 2023
d4b8ff7
Update comment doc
MadLittleMods May 17, 2023
6a0ec9d
Merge branch 'develop' into madlittlemods/process-previously-failed-e…
MadLittleMods May 18, 2023
d843557
Use List
MadLittleMods May 18, 2023
75bec52
Merge branch 'develop' into madlittlemods/process-previously-failed-e…
MadLittleMods May 23, 2023
c4e1533
Trace differentiaed events
MadLittleMods May 23, 2023
ec230a3
Prefer plain language
MadLittleMods May 24, 2023
22a69be
Use a `set` for efficient lookups
MadLittleMods May 24, 2023
65febed
Add some context
MadLittleMods May 24, 2023
6474b4e
Use dedicated `partition` function to separate list
MadLittleMods May 24, 2023
15527f7
Add context for why source order for MSC2716
MadLittleMods May 24, 2023
d59615f
Add sanity check test that failed pull attempt events are still proce…
MadLittleMods May 24, 2023
95ffa7c
Use obvious type
MadLittleMods May 25, 2023
50acf6a
Merge branch 'develop' into madlittlemods/process-previously-failed-e…
MadLittleMods May 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/15585.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Process previously failed backfill events in the background to avoid blocking requests for something that is bound to fail again.
65 changes: 58 additions & 7 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -865,7 +865,7 @@ async def _process_pulled_events(
[event.event_id for event in events]
)

new_events = []
new_events: List[EventBase] = []
for event in events:
event_id = event.event_id

Expand Down Expand Up @@ -895,12 +895,63 @@ async def _process_pulled_events(
str(len(new_events)),
)

# We want to sort these by depth so we process them and
# tell clients about them in order.
sorted_events = sorted(new_events, key=lambda x: x.depth)
for ev in sorted_events:
with nested_logging_context(ev.event_id):
await self._process_pulled_event(origin, ev, backfilled=backfilled)
@trace
async def _process_new_pulled_events(new_events: Collection[EventBase]) -> None:
# We want to sort these by depth so we process them and
# tell clients about them in order.
sorted_events = sorted(new_events, key=lambda x: x.depth)
for ev in sorted_events:
with nested_logging_context(ev.event_id):
await self._process_pulled_event(origin, ev, backfilled=backfilled)

# Check if we've already tried to process these events at some point in the
# past. We aren't concerned with the expontntial backoff here, just whether it
# has failed to be processed before.
event_ids_with_failed_pull_attempts = (
await self._store.get_event_ids_with_failed_pull_attempts(
[event.event_id for event in new_events]
)
)
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

# Process previously failed backfill events in the background to not waste
# time on something that is bound to fail again.
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
events_with_failed_pull_attempts = [
event
for event in new_events
if event.event_id in event_ids_with_failed_pull_attempts
]
set_tag(
SynapseTags.FUNC_ARG_PREFIX + "events_with_failed_pull_attempts",
str(event_ids_with_failed_pull_attempts),
)
set_tag(
SynapseTags.RESULT_PREFIX + "events_with_failed_pull_attempts.length",
str(len(events_with_failed_pull_attempts)),
)
if len(events_with_failed_pull_attempts) > 0:
run_as_background_process(
"_process_new_pulled_events_with_failed_pull_attempts",
_process_new_pulled_events,
events_with_failed_pull_attempts,
)

# We can optimistically try to process and wait for the event to be fully
# persisted if we've never tried before.
fresh_events = [
event
for event in new_events
if event.event_id not in event_ids_with_failed_pull_attempts
]
set_tag(
SynapseTags.FUNC_ARG_PREFIX + "fresh_events",
str([event.event_id for event in fresh_events]),
)
set_tag(
SynapseTags.RESULT_PREFIX + "fresh_events.length",
str(len(fresh_events)),
)
if len(fresh_events) > 0:
await _process_new_pulled_events(fresh_events)

@trace
@tag_args
Expand Down
37 changes: 37 additions & 0 deletions synapse/storage/databases/main/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1583,6 +1583,43 @@ def _record_event_failed_pull_attempt_upsert_txn(

txn.execute(sql, (room_id, event_id, 1, self._clock.time_msec(), cause))

@trace
async def get_event_ids_with_failed_pull_attempts(
self, event_ids: List[str]
) -> List[str]:
"""
Filter the given list of `event_ids` and return events which have any failed
pull attempts.

Args:
event_ids: A list of events to filter down.

Returns:
A filtered down list of `event_ids` that have previous failed pull attempts
(order is maintained).
"""

rows = await self.db_pool.simple_select_many_batch(
table="event_failed_pull_attempts",
column="event_id",
iterable=event_ids,
keyvalues={},
retcols=("event_id",),
desc="get_event_ids_with_failed_pull_attempts",
)
event_ids_with_failed_pull_attempts_from_database = [
str(row["event_id"]) for row in rows
]
# We want to maintain the order of the given `event_ids` so we re-construct the
# list since there is no gurantees from the database implementation/query.
event_ids_with_failed_pull_attempts = [
event_id
for event_id in event_ids
if event_id in event_ids_with_failed_pull_attempts_from_database
]
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

return event_ids_with_failed_pull_attempts

@trace
async def get_event_ids_to_not_pull_from_backoff(
self,
Expand Down
49 changes: 49 additions & 0 deletions tests/storage/test_event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1134,6 +1134,55 @@ def test_get_insertion_event_backward_extremities_in_room_attempted_event_retry_
backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
self.assertEqual(backfill_event_ids, ["insertion_eventA"])

def test_get_event_ids_with_failed_pull_attempts(self) -> None:
"""
Test to make sure we properly get event_ids based on whether they have any
failed pull attempts.
"""
# Create the room
user_id = self.register_user("alice", "test")
tok = self.login("alice", "test")
room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)

# We purposely record the failed pull attempt for `$c_failed_event_id3` first to
# make sure we return results in the order of the `event_ids` passed in instead
# of the order in which we find things in the database or the unordered
# collections we might accidentally use. They also purposely have reverse
# prefixed a-c in front to better test dubious sorting happening somewhere.
self.get_success(
self.store.record_event_failed_pull_attempt(
room_id, "$a_failed_event_id3", "fake cause"
)
)
self.get_success(
self.store.record_event_failed_pull_attempt(
room_id, "$c_failed_event_id1", "fake cause"
)
)
self.get_success(
self.store.record_event_failed_pull_attempt(
room_id, "$b_failed_event_id2", "fake cause"
)
)

event_ids_with_failed_pull_attempts = self.get_success(
self.store.get_event_ids_with_failed_pull_attempts(
event_ids=[
"$c_failed_event_id1",
"$c_fresh_event_id1",
"$b_failed_event_id2",
"$b_fresh_event_id2",
"$a_failed_event_id3",
"$a_fresh_event_id3",
]
)
)

self.assertEqual(
event_ids_with_failed_pull_attempts,
["$c_failed_event_id1", "$b_failed_event_id2", "$a_failed_event_id3"],
)

def test_get_event_ids_to_not_pull_from_backoff(self) -> None:
"""
Test to make sure only event IDs we should backoff from are returned.
Expand Down