Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch db updates in Accumulator.Accumulate #196

Closed
wants to merge 12 commits into from

Conversation

DMRobertson
Copy link
Contributor

@DMRobertson DMRobertson commented Jul 10, 2023

Suppose a timeline has M new message events and S new state events.
The logic in

for _, ev := range newEvents {
var replacesNID int64
// the snapshot ID we assign to this event is unaffected by whether /this/ event is state or not,
// as this is the before snapshot ID.
beforeSnapID := snapID
if ev.IsState {
// make a new snapshot and update the snapshot ID
var oldStripped StrippedEvents
if snapID != 0 {
oldStripped, err = a.strippedEventsForSnapshot(txn, snapID)
if err != nil {
return 0, nil, fmt.Errorf("failed to load stripped state events for snapshot %d: %s", snapID, err)
}
}
newStripped, replacedNID, err := a.calculateNewSnapshot(oldStripped, ev)
if err != nil {
return 0, nil, fmt.Errorf("failed to calculateNewSnapshot: %s", err)
}
replacesNID = replacedNID
memNIDs, otherNIDs := newStripped.NIDs()
newSnapshot := &SnapshotRow{
RoomID: roomID,
MembershipEvents: memNIDs,
OtherEvents: otherNIDs,
}
if err = a.snapshotTable.Insert(txn, newSnapshot); err != nil {
return 0, nil, fmt.Errorf("failed to insert new snapshot: %w", err)
}
snapID = newSnapshot.SnapshotID
}
if err := a.eventsTable.UpdateBeforeSnapshotID(txn, ev.NID, beforeSnapID, replacesNID); err != nil {
return 0, nil, err
}
}
will make a lot of small DB queries:

  • up to S selects on 378
  • S inserts on 394
  • M+S updates on 399.

It would be good to batch these up, because we occasionally see large gappy state timelines (S big) and we want to minimise the cost of handling these.

Before this change, the new TestLargeGappyStateBlock takes 9.11s; afterwards it takes 6.95s. So a ~25% improvement on my machine. Not as much as I was hoping for, but the effect might be larger in a deployment where the database communication RTT is larger. The bulk of the remaining time is spent in v2Pub.Notify... though I don't understand why???


I had two doubts on this.

Is it possible to even do a batched update?

The answer seems to be "yes". Postgres extends the SQL spec to allow an UPDATE ... FROM... syntax, see https://www.postgresql.org/docs/14/sql-update.html. I cribbed this from jmoiron/sqlx#796 (comment).

Is it possible to pre-advance the snapshot ID sequence so that the application can insert snapshots with those IDs in one update?

Answer: yes. I used the same technque Synapse does, which is essentially to repeatedly call nexval.

Copy link
Member

@kegsay kegsay left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SQL isn't the worst I've seen, but it is using a syntax I'm unfamiliar with. Please link me to some docs explaining what this is doing on the postgres side. Overall, I'm not opposed to something like this, as it basically allows us to batch update in a single query which is nice. That being said, we don't seem to call UpdateBeforeSnapshotIDs yet, so I'm curious where you think this is going to fit in.

}
for j, ev := range chunks[i] {
if ev.NID != eventNID {
t.Errorf("chunk %d got wrong event in position %d: got NID %d want NID %d", i, j, ev.NID, eventNID)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to reuse the code in TestChunkify.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the medium term I'd probably move everything over to the generic version so that there's only one true Chunkify.

state/event_table.go Outdated Show resolved Hide resolved
CAST(:event_nid AS bigint)
)
) AS u(before_state_snapshot_id, event_replaces_nid, event_nid)
WHERE e.event_nid = u.event_nid`, chunk)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am failing to see what this SQL is doing.

We have a chunk of beforeSnapshotUpdate structs. We are somehow making that addressable as u by passing in chunk, but I'm failing to see any $1.

Once we have this addressable as u, we just do a basic update which I do understand.

Copy link
Contributor Author

@DMRobertson DMRobertson Jul 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's drop the CASTs for a moment, so that the query is

		UPDATE syncv3_events AS e
		SET before_state_snapshot_id = u.before_state_snapshot_id,
		    event_replaces_nid = u.event_replaces_nid
		FROM (
			VALUES (:before_state_snapshot_id, :event_replaces_nid, :event_nid)
		) AS u(before_state_snapshot_id, event_replaces_nid, event_nid)
		WHERE e.event_nid = u.event_nid

Suppose we have two updates that we want to apply, corresponding to two VALUES expressions (bss1, replace1, nid1) and (bss2, replace2, nid2). My understanding is that sqlx will transform this query into

		UPDATE syncv3_events AS e
		SET before_state_snapshot_id = u.before_state_snapshot_id,
		    event_replaces_nid = u.event_replaces_nid
		FROM (
			VALUES ($1, $2, $3), ($4, $5, $6)
		) AS u(before_state_snapshot_id, event_replaces_nid, event_nid)
		WHERE e.event_nid = u.event_nid

and pass the query 6 parameters bss1, replace1, nid1, bss2, replace2, nid2 in that order.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://www.psycopg.org/docs/extras.html#psycopg2.extras.execute_values seems to suggest that it does a similar thing (generate an appropriately-sized VALUES expression). I think psycopg2 does a better job of it, too.

@@ -946,3 +979,110 @@ func TestEventTableSelectUnknownEventIDs(t *testing.T) {
}
}
}

func TestEventTableUpdateBeforeSnapshotIDs(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're going to need a lot more tests to convince me that the SQL is fine. Specifically:

  • UpdateBeforeSnapshotIDs with no beforeSnapshotUpdate
  • UpdateBeforeSnapshotIDs with exactly 1 beforeSnapshotUpdate
  • UpdateBeforeSnapshotIDs with len(beforeSnapshotUpdate) > chunk size.
  • beforeSnapshotUpdate has some known and some unknown event NIDs.
  • beforeSnapshotUpdate has some duplicate event NIDs and some not duplicated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's very fair. I am also suspicious of this arcane incantation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

beforeSnapshotUpdate has some duplicate event NIDs and some not duplicated.

I think this is essentially undefined behaviour. Quoting postgres' docs:

When a FROM clause is present, what essentially happens is that the target table is joined to the tables mentioned in the from_item list, and each output row of the join represents an update operation for the target table. When using FROM you should ensure that the join produces at most one output row for each row to be modified. In other words, a target row shouldn't join to more than one row from the other table(s). If it does, then only one of the join rows will be used to update the target row, but which one will be used is not readily predictable.

@DMRobertson
Copy link
Contributor Author

That being said, we don't seem to call UpdateBeforeSnapshotIDs yet, so I'm curious where you think this is going to fit in.

00c97b8 roughs out the logic I have in mind. Though it looks like plenty of tests are failing, so that'll need investigation.

@DMRobertson
Copy link
Contributor Author

DMRobertson commented Jul 12, 2023

Rebased atop the merge #200 main and force-pushed, with apologies.

@DMRobertson DMRobertson force-pushed the dmr/batch-snapshot-ids branch 2 times, most recently from 9de9301 to 1361964 Compare July 12, 2023 11:49
@DMRobertson DMRobertson changed the base branch from dmr/debug-from-stable to main July 12, 2023 11:49
@DMRobertson DMRobertson force-pushed the dmr/batch-snapshot-ids branch 2 times, most recently from 88826c6 to 93779be Compare July 12, 2023 12:03
@DMRobertson
Copy link
Contributor Author

DMRobertson commented Jul 12, 2023

Before this change, the new TestLargeGappyStateBlock takes 9.11s; afterwards it takes 6.95s. So a ~25% improvement on my machine. Not as much as I was hoping for, but the effect might be larger in a deployment where the database communication RTT is larger.

Additional debugging shows that the bulk of time is now spent in v2Pub.Notify:

image

and I'm guessing that we presumably time out after 5s?

case <-time.After(5 * time.Second):
return fmt.Errorf("notify with payload %v timed out", p.Type())
}

Or maybe it simply takes 5s to process 5k events in

for i := range events {
h.Dispatcher.OnNewEvent(ctx, p.RoomID, events[i], p.EventNIDs[i])
}
and
func (d *Dispatcher) OnNewEvent(
ctx context.Context, roomID string, event json.RawMessage, nid int64,
) {
ed := d.newEventData(event, roomID, nid)
// update the tracker
targetUser := ""
membership := ""
shouldForceInitial := false
if ed.EventType == "m.room.member" && ed.StateKey != nil {
targetUser = *ed.StateKey
membership = ed.Content.Get("membership").Str
switch membership {
case "invite":
// we only do this to track invite counts correctly.
d.jrt.UsersInvitedToRoom([]string{targetUser}, ed.RoomID)
case "join":
if d.jrt.UserJoinedRoom(targetUser, ed.RoomID) {
shouldForceInitial = true
}
case "ban":
fallthrough
case "leave":
d.jrt.UserLeftRoom(targetUser, ed.RoomID)
}
ed.InviteCount = d.jrt.NumInvitedUsersForRoom(ed.RoomID)
}
// notify all people in this room
userIDs, joinCount := d.jrt.JoinedUsersForRoom(ed.RoomID, func(userID string) bool {
if userID == DispatcherAllUsers {
return false // safety guard to prevent dupe global callbacks
}
return d.ReceiverForUser(userID) != nil
})
ed.JoinCount = joinCount
d.notifyListeners(ctx, ed, userIDs, targetUser, shouldForceInitial, membership)
}

I wonder if it might make sense to have a bulk OnNewEvents which considers all events at once. Although I'm not entirely sure how we'd speed that up.

Comment on lines +510 to +519
func countStateEvents(events []Event) (count int64) {
for _, event := range events {
if event.IsState {
count++
}
}
return count
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be inlined.

Comment on lines +91 to +92
membershipNIDs = make([]int64, 0, len(se))
otherNIDs = make([]int64, 0, len(se))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chose to make this return nonnil slices if len(se) == 0 so that I didn't have to do this at the storage layer.

@DMRobertson DMRobertson marked this pull request as ready for review July 12, 2023 18:18
@DMRobertson DMRobertson changed the title WIP: batch db updates in Accumulator.Accumulate Batch db updates in Accumulator.Accumulate Jul 12, 2023
@DMRobertson DMRobertson requested a review from kegsay July 12, 2023 18:22
@DMRobertson
Copy link
Contributor Author

@kegsay did we want to land this or not? I can't remember the outcome of our discussion.

@kegsay
Copy link
Member

kegsay commented Jul 18, 2023

The outcome of our discussion was to close this PR on the basis that whilst yes it improves things by ~25%, it's still doing more work than we actually want.

Ideally, we would instead make the events in the state block part of the state snapshot and then add new timeline events in Accumulate only. This:

  • more accurately maps onto what upstream is telling us.
  • is much faster as we aren't calculating state snapshots for each 10,000s event.

This implies a third function in the accumulator between Initialise and Accumulate which can both snapshot and roll forward the timeilne in a single txn.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants