Skip to content

Commit

Permalink
bugfix: prevent clients starving themselves by constantly changing re…
Browse files Browse the repository at this point in the history
…q params

Because the proxy services changes to req params preferentially to live
data, if the client constantly changes the window (e.g due to spidering)
then it can accidentally stop the delivery of live events to the client
until the spidering process is complete. To help address this, we now
process live updates _even if_ we have some data to send to the client.
This is bounded in size to prevent the inverse happening: constantly
seeing new live events which starves changes to req params. This should
hopefully strike the right balance.

With regression test.
  • Loading branch information
kegsay committed Jul 4, 2023
1 parent b2ac518 commit 0342a99
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 25 deletions.
48 changes: 29 additions & 19 deletions sync3/handler/connstate_live.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,35 +81,45 @@ func (s *connStateLive) liveUpdate(
return
case update := <-s.updates:
internal.Logf(ctx, "liveUpdate", "process live update")

s.processLiveUpdate(ctx, update, response)
// pass event to extensions AFTER processing
roomIDsToLists := s.lists.ListsByVisibleRoomIDs(s.muxedReq.Lists)
s.extensionsHandler.HandleLiveUpdate(ctx, update, ex, &response.Extensions, extensions.Context{
IsInitial: false,
RoomIDToTimeline: response.RoomIDsToTimelineEventIDs(),
UserID: s.userID,
DeviceID: s.deviceID,
RoomIDsToLists: roomIDsToLists,
})
s.processUpdate(ctx, update, response, ex)
// if there's more updates and we don't have lots stacked up already, go ahead and process another
for len(s.updates) > 0 && response.ListOps() < 50 {
update = <-s.updates
s.processLiveUpdate(ctx, update, response)
s.extensionsHandler.HandleLiveUpdate(ctx, update, ex, &response.Extensions, extensions.Context{
IsInitial: false,
RoomIDToTimeline: response.RoomIDsToTimelineEventIDs(),
UserID: s.userID,
DeviceID: s.deviceID,
RoomIDsToLists: roomIDsToLists,
})
s.processUpdate(ctx, update, response, ex)
}
}
}

// If a client constantly changes their request params in every request they make, we will never consume from
// the update channel as the response will always have data already. In an effort to prevent starvation of new
// data, we will process some updates even though we have data already, but only if A) we didn't live stream
// due to natural circumstances, B) it isn't an initial request and C) there is in fact some data there.
numQueuedUpdates := len(s.updates)
if !hasLiveStreamed && !isInitial && numQueuedUpdates > 0 {
for i := 0; i < numQueuedUpdates; i++ {
update := <-s.updates
s.processUpdate(ctx, update, response, ex)
}
log.Debug().Int("num_queued", numQueuedUpdates).Msg("liveUpdate: caught up")
}

log.Trace().Bool("live_streamed", hasLiveStreamed).Msg("liveUpdate: returning")
// TODO: op consolidation
}

func (s *connStateLive) processUpdate(ctx context.Context, update caches.Update, response *sync3.Response, ex extensions.Request) {
s.processLiveUpdate(ctx, update, response)
// pass event to extensions AFTER processing
roomIDsToLists := s.lists.ListsByVisibleRoomIDs(s.muxedReq.Lists)
s.extensionsHandler.HandleLiveUpdate(ctx, update, ex, &response.Extensions, extensions.Context{
IsInitial: false,
RoomIDToTimeline: response.RoomIDsToTimelineEventIDs(),
UserID: s.userID,
DeviceID: s.deviceID,
RoomIDsToLists: roomIDsToLists,
})
}

func (s *connStateLive) processLiveUpdate(ctx context.Context, up caches.Update, response *sync3.Response) bool {
internal.AssertWithContext(ctx, "processLiveUpdate: response list length != internal list length", s.lists.Len() == len(response.Lists))
internal.AssertWithContext(ctx, "processLiveUpdate: request list length != internal list length", s.lists.Len() == len(s.muxedReq.Lists))
Expand Down
70 changes: 70 additions & 0 deletions tests-e2e/num_live_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package syncv3_test

import (
"fmt"
"testing"
"time"

"github.com/matrix-org/sliding-sync/sync3"
"github.com/matrix-org/sliding-sync/testutils/m"
"github.com/tidwall/gjson"
)

func TestNumLive(t *testing.T) {
Expand Down Expand Up @@ -126,3 +129,70 @@ func TestNumLive(t *testing.T) {
},
}))
}

// Test that if you constantly change req params, we still see live traffic. It does this by:
// - Creating 11 rooms.
// - Hitting /sync with a range [0,1] then [0,2] then [0,3]. Each time this causes a new room to be returned.
// - Interleaving each /sync request with genuine events sent into a room.
// - ensuring we see the genuine events by the time we finish.
func TestReqParamStarvation(t *testing.T) {
alice := registerNewUser(t)
bob := registerNewUser(t)
roomID := alice.CreateRoom(t, map[string]interface{}{
"preset": "public_chat",
})
numOtherRooms := 10
for i := 0; i < numOtherRooms; i++ {
bob.CreateRoom(t, map[string]interface{}{
"preset": "public_chat",
})
}
bob.JoinRoom(t, roomID, nil)
res := bob.SlidingSyncUntilMembership(t, "", roomID, bob, "join")

wantEventIDs := make(map[string]bool)
for i := 0; i < numOtherRooms; i++ {
res = bob.SlidingSync(t, sync3.Request{
Lists: map[string]sync3.RequestList{
"a": {
Ranges: sync3.SliceRanges{{0, int64(i)}}, // [0,0], [0,1], ... [0,9]
},
},
}, WithPos(res.Pos))

// mark off any event we see in wantEventIDs
for _, r := range res.Rooms {
for _, ev := range r.Timeline {
gotEventID := gjson.GetBytes(ev, "event_id").Str
wantEventIDs[gotEventID] = false
}
}

// send an event in the first few syncs to add to wantEventIDs
// We do this for the first few /syncs and don't dictate which response they should arrive
// in, as we do not know and cannot force the proxy to deliver the event in a particular response.
if i < 3 {
eventID := alice.SendEventSynced(t, roomID, Event{
Type: "m.room.message",
Content: map[string]interface{}{
"msgtype": "m.text",
"body": fmt.Sprintf("msg %d", i),
},
})
wantEventIDs[eventID] = true
}

// it's possible the proxy won't see this event before the next /sync
// and that is the reason why we don't send it, as opposed to starvation.
// To try to counter this, sleep a bit. This is why we sleep on every cycle and
// why we send the events early on.
time.Sleep(50 * time.Millisecond)
}

// at this point wantEventIDs should all have false values if we got the events
for evID, unseen := range wantEventIDs {
if unseen {
t.Errorf("failed to see event %v", evID)
}
}
}
9 changes: 3 additions & 6 deletions tests-integration/room_subscriptions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,9 @@ func TestRoomSubscriptionMisorderedTimeline(t *testing.T) {
})
m.MatchResponse(t, res, m.MatchRoomSubscriptionsStrict(map[string][]m.RoomMatcher{
room.roomID: {
// TODO: this is the correct result, but due to how timeline loading works currently
// it will be returning the last 5 events BEFORE D,E, which isn't ideal but also isn't
// incorrect per se due to the fact that clients don't know when D,E have been processed
// on the server.
// m.MatchRoomTimeline(append(abcInitialEvents, deLiveEvents...)),
m.MatchRoomTimeline(append(roomState[len(roomState)-2:], abcInitialEvents...)),
// we append live events AFTER processing the new timeline limit, so 7 events not 5.
// TODO: ideally we'd just return abcde here.
m.MatchRoomTimeline(append(roomState[len(roomState)-2:], append(abcInitialEvents, deLiveEvents...)...)),
},
}), m.LogResponse(t))

Expand Down

0 comments on commit 0342a99

Please sign in to comment.