Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix historical messages backfilling in random order on remote homeservers (MSC2716) #11114

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
f30302d
Scratch debugging why events appear out of order on remote homeservers
MadLittleMods Oct 18, 2021
438e222
Use OrderedDict to gurantee order returned is the same as we were bui…
MadLittleMods Oct 18, 2021
4983739
Avoid constant missing prev_event fetching while backfilling
MadLittleMods Oct 19, 2021
a64bb2e
Add changelog
MadLittleMods Oct 19, 2021
260ca06
Some more trials of trying to get many many events to backfill in ord…
MadLittleMods Oct 19, 2021
886071b
Fix backfill not picking up batch events connected to non-base insert…
MadLittleMods Oct 20, 2021
477c15d
Some more debug logging
MadLittleMods Oct 21, 2021
4191f56
Remove fake prev events from historical state chain
MadLittleMods Oct 21, 2021
f39c1da
Remove debug logging
MadLittleMods Oct 21, 2021
7da8012
Remove extra event info
MadLittleMods Oct 21, 2021
69dfa16
Move to sorting the backfill events in the existing sorted
MadLittleMods Oct 21, 2021
83474d9
Put MSC2716 backfill logic behind experimental feature flag
MadLittleMods Oct 21, 2021
1263c7e
Remove unused import
MadLittleMods Oct 21, 2021
ee47878
Fix mypy lints
MadLittleMods Oct 21, 2021
5bfde7b
Merge branch 'master' into madlittlemods/return-historical-events-in-…
MadLittleMods Oct 21, 2021
2fbe3f1
Merge branch 'develop' into madlittlemods/return-historical-events-in…
MadLittleMods Oct 21, 2021
1d3f417
Revert back to string interpolation for SQL boolean value
MadLittleMods Oct 21, 2021
4a12304
Put empty prev_events behind new room version
MadLittleMods Oct 28, 2021
9a6d8fa
WIP: Don't include the event we branch from
MadLittleMods Oct 29, 2021
3e09d49
Revert "WIP: Don't include the event we branch from"
MadLittleMods Oct 29, 2021
5afc264
WIP: Sort events topologically when we receive them over backfill
MadLittleMods Oct 29, 2021
6ea263b
Revert "WIP: Sort events topologically when we receive them over back…
MadLittleMods Oct 29, 2021
3d387f9
WIP: Sort events topologically when we receive them over backfill
MadLittleMods Oct 29, 2021
fb8e281
Fix direction of fake edges
MadLittleMods Oct 29, 2021
c772b35
Implement backfill in handler so we can do fetching later
MadLittleMods Oct 29, 2021
e0ff66d
Fix backfill being able to cleanly branch into history and back to "l…
MadLittleMods Oct 29, 2021
76d454f
Some backfill receive sorting fixes but not using it yet
MadLittleMods Oct 30, 2021
3529449
Fix lints
MadLittleMods Oct 30, 2021
321f9ea
Move back to the old get_backfill_events and simplify backfill.
MadLittleMods Nov 2, 2021
15c3282
Remove the new backfill implementation and pull some good parts of th…
MadLittleMods Nov 2, 2021
5db717a
Always process marker events regardless if backfilled
MadLittleMods Nov 3, 2021
e96fd5c
Add comment docs
MadLittleMods Nov 3, 2021
f3b7b3e
Add better explanatory comment
MadLittleMods Nov 3, 2021
7f2105a
Remove topological sort when receiving backfill events
MadLittleMods Nov 3, 2021
246278e
Fix lints
MadLittleMods Nov 3, 2021
ec35be5
Merge branch 'develop' into madlittlemods/return-historical-events-in…
MadLittleMods Nov 3, 2021
bc0ba8c
Protect from no auth events for non-existent provided prev_event
MadLittleMods Nov 3, 2021
363aed6
Revert unused refactor to get PDU raw
MadLittleMods Nov 3, 2021
d771fbd
Only run the tests package to get streaming Complement output
MadLittleMods Nov 11, 2021
b559e23
Merge branch 'develop' into madlittlemods/return-historical-events-in…
MadLittleMods Nov 11, 2021
6b64184
Merge branch 'develop' into madlittlemods/return-historical-events-in…
MadLittleMods Dec 9, 2021
1d00043
Merge branch 'develop' into madlittlemods/return-historical-events-in…
MadLittleMods Dec 16, 2021
b071426
Plumb allow_no_prev_events through for MSC2716
MadLittleMods Dec 16, 2021
ec33a40
Make the historical events float separately from the state chain
MadLittleMods Dec 16, 2021
b99efa8
Plumb allow_no_prev_events through create_and_send_nonmember_event
MadLittleMods Dec 16, 2021
3810ae1
Clarify comments
MadLittleMods Dec 16, 2021
df2a152
Fix NPE when trying to grab event from wrong roomId (fix sytest)
MadLittleMods Dec 16, 2021
cc4eb72
Merge branch 'develop' into madlittlemods/return-historical-events-in…
MadLittleMods Jan 14, 2022
47590bb
Merge branch 'develop' into madlittlemods/return-historical-events-in…
MadLittleMods Feb 4, 2022
a38befa
Some review optimizations
MadLittleMods Feb 4, 2022
033360a
Fix lints
MadLittleMods Feb 4, 2022
3f22e42
Fix unused lint
MadLittleMods Feb 4, 2022
e5670ff
Fix lints
MadLittleMods Feb 4, 2022
023bd3e
Don't run MSC2716 complement tests for everyone
MadLittleMods Feb 7, 2022
b3fcffb
Use same txn iteration optimization
MadLittleMods Feb 7, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions synapse/events/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,13 +283,13 @@ def format_event_for_client_v1(d: JsonDict) -> JsonDict:

def format_event_for_client_v2(d: JsonDict) -> JsonDict:
drop_keys = (
"auth_events",
"prev_events",
"hashes",
"signatures",
"depth",
"origin",
"prev_state",
# "auth_events",
# "prev_events",
# "hashes",
# "signatures",
# "depth",
# "origin",
# "prev_state",
)
for key in drop_keys:
d.pop(key, None)
Expand Down Expand Up @@ -340,6 +340,9 @@ def serialize_event(

d["event_id"] = e.event_id

# TODO: Remove
d["stream_ordering"] = e.internal_metadata.stream_ordering

if "age_ts" in d["unsigned"]:
d["unsigned"]["age"] = time_now_ms - d["unsigned"]["age_ts"]
del d["unsigned"]["age_ts"]
Expand Down
10 changes: 5 additions & 5 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,14 +148,14 @@ async def _maybe_backfill_inner(
insertion_events_to_be_backfilled = (
await self.store.get_insertion_event_backwards_extremities_in_room(room_id)
)
logger.debug(
logger.info(
"_maybe_backfill_inner: extremities oldest_events_with_depth=%s insertion_events_to_be_backfilled=%s",
oldest_events_with_depth,
insertion_events_to_be_backfilled,
)

if not oldest_events_with_depth and not insertion_events_to_be_backfilled:
logger.debug("Not backfilling as no extremeties found.")
logger.info("Not backfilling as no extremeties found.")
return False

# We only want to paginate if we can actually see the events we'll get,
Expand Down Expand Up @@ -203,7 +203,7 @@ async def _maybe_backfill_inner(
redact=False,
check_history_visibility_only=True,
)
logger.debug(
logger.info(
"_maybe_backfill_inner: filtered_extremities %s", filtered_extremities
)

Expand All @@ -230,7 +230,7 @@ async def _maybe_backfill_inner(
# much larger factor will result in triggering a backfill request much
# earlier than necessary.
if current_depth - 2 * limit > max_depth:
logger.debug(
logger.info(
"Not backfilling as we don't need to. %d < %d - 2 * %d",
max_depth,
current_depth,
Expand All @@ -249,7 +249,7 @@ async def _maybe_backfill_inner(
t for t in sorted_extremeties_tuple if int(t[1]) <= current_depth
]

logger.debug(
logger.info(
"room_id: %s, backfill: current_depth: %s, limit: %s, max_depth: %s, extrems: %s filtered_sorted_extremeties_tuple: %s",
room_id,
current_depth,
Expand Down
9 changes: 8 additions & 1 deletion synapse/handlers/room_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,13 +353,20 @@ async def persist_historical_events(
# Events are sorted by (topological_ordering, stream_ordering)
# where topological_ordering is just depth.
for (event, context) in reversed(events_to_persist):
await self.event_creation_handler.handle_new_client_event(
result_event = await self.event_creation_handler.handle_new_client_event(
await self.create_requester_for_user_id_from_app_service(
event["sender"], app_service_requester.app_service
),
event=event,
context=context,
)
logger.info(
"result_event depth=%s stream_ordering=%s event_id=%s body=%s",
result_event.depth,
result_event.internal_metadata.stream_ordering,
result_event.event_id,
result_event.content.get("body", None),
)

return event_ids

Expand Down
94 changes: 77 additions & 17 deletions synapse/storage/databases/main/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1014,19 +1014,22 @@ def _get_backfill_events(self, txn, room_id, event_list, limit):

# Look for the prev_event_id connected to the given event_id
query = """
SELECT depth, prev_event_id FROM event_edges
/* Get the depth of the prev_event_id from the events table */
SELECT depth, stream_ordering, prev_event_id FROM event_edges
/* Get the depth and stream_ordering of the prev_event_id from the events table */
INNER JOIN events
ON prev_event_id = events.event_id
/* Find an event which matches the given event_id */
/* Look for an edge which matches the given event_id */
WHERE event_edges.event_id = ?
AND event_edges.is_state = ?
/* Because we can have many events at the same depth,
* we want to also tie-break and sort on stream_ordering */
ORDER BY depth DESC, stream_ordering DESC
LIMIT ?
"""

# Look for the "insertion" events connected to the given event_id
connected_insertion_event_query = """
SELECT e.depth, i.event_id FROM insertion_event_edges AS i
SELECT e.depth, e.stream_ordering, i.event_id FROM insertion_event_edges AS i
/* Get the depth of the insertion event from the events table */
INNER JOIN events AS e USING (event_id)
/* Find an insertion event which points via prev_events to the given event_id */
Expand All @@ -1036,7 +1039,7 @@ def _get_backfill_events(self, txn, room_id, event_list, limit):

# Find any batch connections of a given insertion event
batch_connection_query = """
SELECT e.depth, c.event_id FROM insertion_events AS i
SELECT e.depth, e.stream_ordering, c.event_id FROM insertion_events AS i
/* Find the batch that connects to the given insertion event */
INNER JOIN batch_events AS c
ON i.next_batch_id = c.batch_id
Expand All @@ -1055,26 +1058,68 @@ def _get_backfill_events(self, txn, room_id, event_list, limit):
queue = PriorityQueue()
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

for event_id in event_list:
depth = self.db_pool.simple_select_one_onecol_txn(
event_lookup_result = self.db_pool.simple_select_one_txn(
txn,
table="events",
keyvalues={"event_id": event_id, "room_id": room_id},
retcol="depth",
retcols=(
"depth",
"stream_ordering",
),
allow_none=True,
)

if depth:
queue.put((-depth, event_id))
if event_lookup_result["depth"]:
queue.put(
(
-event_lookup_result["depth"],
-event_lookup_result["stream_ordering"],
event_id,
)
)

while not queue.empty() and len(event_results) < limit:
try:
_, event_id = queue.get_nowait()
_, _, event_id = queue.get_nowait()
except Empty:
break

if event_id in event_results:
continue

event_lookup_result = self.db_pool.simple_select_one_txn(
txn,
table="events",
keyvalues={"event_id": event_id},
retcols=["type", "depth", "stream_ordering", "content"],
allow_none=True,
)

event_json_lookup_result = self.db_pool.simple_select_one_onecol_txn(
txn,
table="event_json",
keyvalues={"event_id": event_id},
retcol="json",
allow_none=True,
)

ev = db_to_json(event_json_lookup_result)

if event_lookup_result:
logger.info(
"_get_backfill_events: event_results add event_id=%s type=%s depth=%s stream_ordering=%s content=%s",
event_id,
ev["type"],
ev["depth"],
event_lookup_result["stream_ordering"],
ev["content"].get("body", None),
)
else:
logger.info(
"_get_backfill_events: event_results event_id=%s failed to lookup",
event_id,
)

event_results.add(event_id)

# Try and find any potential historical batches of message history.
Expand All @@ -1094,8 +1139,15 @@ def _get_backfill_events(self, txn, room_id, event_list, limit):
)
for row in connected_insertion_event_id_results:
connected_insertion_event_depth = row[0]
connected_insertion_event = row[1]
queue.put((-connected_insertion_event_depth, connected_insertion_event))
connected_insertion_event_stream_ordering = row[1]
connected_insertion_event = row[2]
queue.put(
(
-connected_insertion_event_depth,
-connected_insertion_event_stream_ordering,
connected_insertion_event,
)
)

# Find any batch connections for the given insertion event
txn.execute(
Expand All @@ -1108,18 +1160,26 @@ def _get_backfill_events(self, txn, room_id, event_list, limit):
batch_start_event_id_results,
)
for row in batch_start_event_id_results:
if row[1] not in event_results:
queue.put((-row[0], row[1]))
if row[2] not in event_results:
queue.put((-row[0], -row[1], row[2]))

txn.execute(query, (event_id, False, limit - len(event_results)))
prev_event_id_results = txn.fetchall()
logger.debug(
logger.info(
"_get_backfill_events: prev_event_ids %s", prev_event_id_results
)

# TODO: Find out why stream_ordering is all out of order compared to
# when we persisted the events
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

# TODO: We should probably skip adding the event itself if we
# branched off onto the insertion event first above. Need to make this a
# bit smart so it doesn't skip over the event altogether if we're at
# the end of the historical messages.
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

for row in prev_event_id_results:
if row[1] not in event_results:
queue.put((-row[0], row[1]))
if row[2] not in event_results:
queue.put((-row[0], -row[1], row[2]))

return event_results

Expand Down