Skip to content

Commit

Permalink
Merge pull request #449 from matrix-org/kegan/sequence-protection
Browse files Browse the repository at this point in the history
Protect against dropped databases
  • Loading branch information
kegsay authored Jun 5, 2024
2 parents 5da6865 + 7ea0b71 commit 58a4c6f
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 4 deletions.
29 changes: 25 additions & 4 deletions sync3/extensions/todevice.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,30 @@ func (r *ToDeviceRequest) ProcessInitial(ctx context.Context, res *Response, ext
r.Limit = 100 // default to 100
}
l := logger.With().Str("user", extCtx.UserID).Str("device", extCtx.DeviceID).Logger()

mapMu.Lock()
lastSentPos, exists := deviceIDToSinceDebugOnly[extCtx.DeviceID]
internal.Logf(ctx, "to_device", "since=%v limit=%v last_sent=%v", r.Since, r.Limit, lastSentPos)
isFirstRequest := !exists
mapMu.Unlock()

// If this is the first time we've seen this device ID since starting up, ignore the client-provided 'since'
// value. This is done to protect against dropped postgres sequences. Consider:
// - 5 to-device messages arrive for Alice
// - Alice requests all messages, gets them and acks them so since=5, and the nextval() sequence is 6.
// - the server admin drops the DB and starts over again. The DB sequence starts back at 1.
// - 2 to-device messages arrive for Alice
// - Alice requests messages from since=5. No messages are returned as the 2 new messages have a lower sequence number.
// - Even worse, those 2 messages are deleted because sending since=5 ACKNOWLEDGES all messages <=5.
// By ignoring the first `since` on startup, we effectively force the client into sending since=0. In this scenario,
// it will then A) not delete anything as since=0 acknowledges nothing, B) return the 2 to-device events.
//
// The cost to this is that it is possible to send duplicate to-device events if the server restarts before the client
// has time to send the ACK to the server. This isn't fatal as clients do suppress duplicate to-device events.
if isFirstRequest {
r.Since = ""
}

var from int64
var err error
if r.Since != "" {
Expand All @@ -82,10 +106,7 @@ func (r *ToDeviceRequest) ProcessInitial(ctx context.Context, res *Response, ext
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
}
}
mapMu.Lock()
lastSentPos := deviceIDToSinceDebugOnly[extCtx.DeviceID]
internal.Logf(ctx, "to_device", "since=%v limit=%v last_sent=%v", r.Since, r.Limit, lastSentPos)
mapMu.Unlock()

if from < lastSentPos {
// we told the client about a newer position, but yet they are using an older position, yell loudly
l.Warn().Int64("last_sent", lastSentPos).Int64("recv", from).Bool("initial", extCtx.IsInitial).Msg(
Expand Down
53 changes: 53 additions & 0 deletions tests-integration/extensions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package syncv3

import (
"encoding/json"
"fmt"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -443,6 +445,57 @@ func TestExtensionToDevice(t *testing.T) {
m.MatchResponse(t, res, m.MatchList("a", m.MatchV3Count(0)), m.MatchToDeviceMessages(newToDeviceMsgs))
}

// Test that if you sync with a very very high numbered since value, we return lower numbered entries.
// This guards against dropped databases.
func TestExtensionToDeviceSequence(t *testing.T) {
pqString := testutils.PrepareDBConnectionString()
// setup code
v2 := runTestV2Server(t)
v3 := runTestServer(t, v2, pqString)
defer v2.close()
defer v3.close()
alice := "@TestExtensionToDeviceSequence_alice:localhost"
aliceToken := "ALICE_BEARER_TOKEN_TestExtensionToDeviceSequence"
v2.addAccount(t, alice, aliceToken)
toDeviceMsgs := []json.RawMessage{
json.RawMessage(`{"sender":"alice","type":"something","content":{"foo":"1"}}`),
json.RawMessage(`{"sender":"alice","type":"something","content":{"foo":"2"}}`),
json.RawMessage(`{"sender":"alice","type":"something","content":{"foo":"3"}}`),
json.RawMessage(`{"sender":"alice","type":"something","content":{"foo":"4"}}`),
}
v2.queueResponse(alice, sync2.SyncResponse{
ToDevice: sync2.EventsResponse{
Events: toDeviceMsgs,
},
})

hiSince := 999999
res := v3.mustDoV3Request(t, aliceToken, sync3.Request{
Lists: map[string]sync3.RequestList{"a": {
Ranges: sync3.SliceRanges{
[2]int64{0, 10}, // doesn't matter
},
}},
Extensions: extensions.Request{
ToDevice: &extensions.ToDeviceRequest{
Core: extensions.Core{Enabled: &boolTrue},
Since: fmt.Sprintf("%d", hiSince),
},
},
})
m.MatchResponse(t, res, m.MatchList("a", m.MatchV3Count(0)), m.MatchToDeviceMessages(toDeviceMsgs), func(res *sync3.Response) error {
// ensure that we return a lower numbered since token
got, err := strconv.Atoi(res.Extensions.ToDevice.NextBatch)
if err != nil {
return err
}
if got >= hiSince {
return fmt.Errorf("next_batch got %v wanted lower than %v", got, hiSince)
}
return nil
})
}

// tests that the account data extension works:
// 1- check global account data is sent on first connection
// 2- check global account data updates are proxied through
Expand Down

0 comments on commit 58a4c6f

Please sign in to comment.