Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge hotfix branch into main #200

Merged
merged 35 commits into from
Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
cc8e6d9
Track the time before processing a request
Jun 22, 2023
048a5ac
Track the number of slow requests
Jun 22, 2023
1d46a30
Update tests
Jun 22, 2023
bdcffda
Merge pull request #183 from matrix-org/dmr/setup-metrics
Jun 22, 2023
2829a7a
Log device ID after requests
Jun 23, 2023
f4e935c
Record user and device on context ASAP
Jun 23, 2023
b47ebaa
Log warning for slow requests
Jun 23, 2023
685ec94
Merge pull request #184 from matrix-org/dmr/more-debugging
Jun 23, 2023
a8d4f7a
Additional debugging
Jun 21, 2023
a78612e
Fix bad backport
Jun 23, 2023
f36c038
Rate limit pubsub.V2DeviceData updates to be at most 1 per second
kegsay Jun 27, 2023
0caeb03
kick ci?
kegsay Jun 27, 2023
82c21e6
Remove branches main for now to kick ci maybe
kegsay Jun 27, 2023
b9bc83d
Add WorkerPool and use it for OnE2EEData
kegsay Jun 28, 2023
b2ac518
Merge pull request #190 from matrix-org/kegan/bound-db-conns-e2ee-data
kegsay Jun 28, 2023
0342a99
bugfix: prevent clients starving themselves by constantly changing re…
kegsay Jul 4, 2023
54cb2cb
Don't rate limit on M_UNKNOWN_POS to allow more rapid recovery after …
kegsay Jul 4, 2023
aaea223
Don't needlessly hit the db for txn IDs for live events which were no…
kegsay Jul 4, 2023
0c95c56
logging: log less, aggregate more. Fix npe on slowReq metric
kegsay Jul 4, 2023
8336dd2
More trace logging
kegsay Jul 6, 2023
e22f30e
Add gauge for tracking pending EnsurePolling calls
kegsay Jul 6, 2023
365ed4c
nil checks
kegsay Jul 6, 2023
e67ba9a
Add more poller metrics
kegsay Jul 7, 2023
7621aa1
Track num polls in the right place
kegsay Jul 7, 2023
4d8f3d5
Track num_devices_pending_ensure_polling per device not per http req …
kegsay Jul 7, 2023
150821f
Add unregister hooks
kegsay Jul 7, 2023
f22ef91
bugfix: distinguish between a 0 invited_count and a missing invited_c…
kegsay Jul 7, 2023
41a7240
Report a metric for the size of gappy state blocks
Jul 10, 2023
5064f64
Log error message to stdout if poller panics
Jul 10, 2023
dcf8db3
Actually observe the new metric
Jul 10, 2023
ea25b81
Merge pull request #194 from matrix-org/dmr/metric-for-gappy-state
Jul 10, 2023
e947612
Fix #192: ignore unseen old events
kegsay Jul 11, 2023
b72ad3b
Merge branch 'main' into dmr/debug-from-stable
kegsay Jul 12, 2023
7380273
Merge pull request #198 from matrix-org/kegan/fix-backfill-invite
kegsay Jul 12, 2023
c1b0f0b
Review comments
kegsay Jul 12, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ name: Tests

on:
push:
branches: ["main"]
kegsay marked this conversation as resolved.
Show resolved Hide resolved
pull_request:

permissions:
Expand Down
7 changes: 6 additions & 1 deletion internal/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ var (
// logging metadata for a single request
type data struct {
userID string
deviceID string
since int64
next int64
numRooms int
Expand All @@ -37,13 +38,14 @@ func RequestContext(ctx context.Context) context.Context {
}

// add the user ID to this request context. Need to have called RequestContext first.
func SetRequestContextUserID(ctx context.Context, userID string) {
func SetRequestContextUserID(ctx context.Context, userID, deviceID string) {
d := ctx.Value(ctxData)
if d == nil {
return
}
da := d.(*data)
da.userID = userID
da.deviceID = deviceID
if hub := sentry.GetHubFromContext(ctx); hub != nil {
sentry.ConfigureScope(func(scope *sentry.Scope) {
scope.SetUser(sentry.User{Username: userID})
Expand Down Expand Up @@ -79,6 +81,9 @@ func DecorateLogger(ctx context.Context, l *zerolog.Event) *zerolog.Event {
if da.userID != "" {
l = l.Str("u", da.userID)
}
if da.deviceID != "" {
l = l.Str("dev", da.deviceID)
}
if da.since >= 0 {
l = l.Int64("p", da.since)
}
Expand Down
67 changes: 67 additions & 0 deletions internal/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package internal

type WorkerPool struct {
N int
ch chan func()
}

// Create a new worker pool of size N. Up to N work can be done concurrently.
// The size of N depends on the expected frequency of work and contention for
// shared resources. Large values of N allow more frequent work at the cost of
// more contention for shared resources like cpu, memory and fds. Small values
// of N allow less frequent work but control the amount of shared resource contention.
// Ideally this value will be derived from whatever shared resource constraints you
// are hitting up against, rather than set to a fixed value. For example, if you have
// a database connection limit of 100, then setting N to some fraction of the limit is
// preferred to setting this to an arbitrary number < 100. If more than N work is requested,
// eventually WorkerPool.Queue will block until some work is done.
//
// The larger N is, the larger the up front memory costs are due to the implementation of WorkerPool.
func NewWorkerPool(n int) *WorkerPool {
return &WorkerPool{
N: n,
// If we have N workers, we can process N work concurrently.
// If we have >N work, we need to apply backpressure to stop us
// making more and more work which takes up more and more memory.
// By setting the channel size to N, we ensure that backpressure is
// being applied on the producer, stopping it from creating more work,
// and hence bounding memory consumption. Work is still being produced
// upstream on the homeserver, but we will consume it when we're ready
// rather than gobble it all at once.
//
// Note: we aren't forced to set this to N, it just serves as a useful
// metric which scales on the number of workers. The amount of in-flight
// work is N, so it makes sense to allow up to N work to be queued up before
// applying backpressure. If the channel buffer is < N then the channel can
// become the bottleneck in the case where we have lots of instantaneous work
// to do. If the channel buffer is too large, we needlessly consume memory as
// make() will allocate a backing array of whatever size you give it up front (sad face)
ch: make(chan func(), n),
}
}

// Start the workers. Only call this once.
func (wp *WorkerPool) Start() {
for i := 0; i < wp.N; i++ {
go wp.worker()
}
}

// Stop the worker pool. Only really useful for tests as a worker pool should be started once
// and persist for the lifetime of the process, else it causes needless goroutine churn.
// Only call this once.
func (wp *WorkerPool) Stop() {
close(wp.ch)
}

// Queue some work on the pool. May or may not block until some work is processed.
func (wp *WorkerPool) Queue(fn func()) {
wp.ch <- fn
}

// worker impl
func (wp *WorkerPool) worker() {
for fn := range wp.ch {
fn()
}
}
186 changes: 186 additions & 0 deletions internal/pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
package internal

import (
"sync"
"testing"
"time"
)

// Test basic functions of WorkerPool
func TestWorkerPool(t *testing.T) {
wp := NewWorkerPool(2)
wp.Start()
defer wp.Stop()

// we should process this concurrently as N=2 so it should take 1s not 2s
var wg sync.WaitGroup
wg.Add(2)
start := time.Now()
wp.Queue(func() {
time.Sleep(time.Second)
wg.Done()
})
wp.Queue(func() {
time.Sleep(time.Second)
wg.Done()
})
wg.Wait()
took := time.Since(start)
if took > 2*time.Second {
t.Fatalf("took %v for queued work, it should have been faster than 2s", took)
}
}

func TestWorkerPoolDoesWorkPriorToStart(t *testing.T) {
wp := NewWorkerPool(2)

// return channel to use to see when work is done
ch := make(chan int, 2)
wp.Queue(func() {
ch <- 1
})
wp.Queue(func() {
ch <- 2
})

// the work should not be done yet
time.Sleep(100 * time.Millisecond)
if len(ch) > 0 {
t.Fatalf("Queued work was done before Start()")
}

// the work should be starting now
wp.Start()
defer wp.Stop()

sum := 0
for {
select {
case <-time.After(time.Second):
t.Fatalf("timed out waiting for work to be done")
case val := <-ch:
sum += val
}
if sum == 3 { // 2 + 1
break
}
}
}

type workerState struct {
id int
state int // not running, queued, running, finished
unblock *sync.WaitGroup // decrement to unblock this worker
}

func TestWorkerPoolBackpressure(t *testing.T) {
// this test assumes backpressure starts at n*2+1 due to a chan buffer of size n, and n in-flight work.
n := 2
wp := NewWorkerPool(n)
wp.Start()
defer wp.Stop()

var mu sync.Mutex
stateNotRunning := 0
stateQueued := 1
stateRunning := 2
stateFinished := 3
size := (2 * n) + 1
running := make([]*workerState, size)

go func() {
// we test backpressure by scheduling (n*2)+1 work and ensuring that we see the following running states:
// [2,2,1,1,0] <-- 2 running, 2 queued, 1 blocked <-- THIS IS BACKPRESSURE
// [3,2,2,1,1] <-- 1 finished, 2 running, 2 queued
// [3,3,2,2,1] <-- 2 finished, 2 running , 1 queued
// [3,3,3,2,2] <-- 3 finished, 2 running
for i := 0; i < size; i++ {
// set initial state of this piece of work
wg := &sync.WaitGroup{}
wg.Add(1)
state := &workerState{
id: i,
state: stateNotRunning,
unblock: wg,
}
mu.Lock()
running[i] = state
mu.Unlock()

// queue the work on the pool. The final piece of work will block here and remain in
// stateNotRunning and not transition to stateQueued until the first piece of work is done.
wp.Queue(func() {
mu.Lock()
if running[state.id].state != stateQueued {
// we ran work in the worker faster than the code underneath .Queue, so let it catch up
mu.Unlock()
time.Sleep(10 * time.Millisecond)
mu.Lock()
}
running[state.id].state = stateRunning
mu.Unlock()

running[state.id].unblock.Wait()
mu.Lock()
running[state.id].state = stateFinished
mu.Unlock()
})

// mark this work as queued
mu.Lock()
running[i].state = stateQueued
mu.Unlock()
}
}()

// wait for the workers to be doing work and assert the states of each task
time.Sleep(time.Second)

assertStates(t, &mu, running, []int{
stateRunning, stateRunning, stateQueued, stateQueued, stateNotRunning,
})

// now let the first task complete
running[0].unblock.Done()
// wait for the pool to grab more work
time.Sleep(100 * time.Millisecond)
// assert new states
assertStates(t, &mu, running, []int{
stateFinished, stateRunning, stateRunning, stateQueued, stateQueued,
})

// now let the second task complete
running[1].unblock.Done()
// wait for the pool to grab more work
time.Sleep(100 * time.Millisecond)
// assert new states
assertStates(t, &mu, running, []int{
stateFinished, stateFinished, stateRunning, stateRunning, stateQueued,
})

// now let the third task complete
running[2].unblock.Done()
// wait for the pool to grab more work
time.Sleep(100 * time.Millisecond)
// assert new states
assertStates(t, &mu, running, []int{
stateFinished, stateFinished, stateFinished, stateRunning, stateRunning,
})

}

func assertStates(t *testing.T, mu *sync.Mutex, running []*workerState, wantStates []int) {
t.Helper()
mu.Lock()
defer mu.Unlock()
if len(running) != len(wantStates) {
t.Fatalf("assertStates: bad wantStates length, got %d want %d", len(wantStates), len(running))
}
for i := range running {
state := running[i]
wantVal := wantStates[i]
if state.state != wantVal {
t.Errorf("work[%d] got state %d want %d", i, state.state, wantVal)
}
}
}
4 changes: 1 addition & 3 deletions pubsub/v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,7 @@ type V2InitialSyncComplete struct {
func (*V2InitialSyncComplete) Type() string { return "V2InitialSyncComplete" }

type V2DeviceData struct {
UserID string
DeviceID string
Pos int64
UserIDToDeviceIDs map[string][]string
}

func (*V2DeviceData) Type() string { return "V2DeviceData" }
Expand Down
90 changes: 90 additions & 0 deletions sync2/device_data_ticker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package sync2

import (
"sync"
"time"

"github.com/matrix-org/sliding-sync/pubsub"
)

// This struct remembers user+device IDs to notify for then periodically
// emits them all to the caller. Use to rate limit the frequency of device list
// updates.
type DeviceDataTicker struct {
// data structures to periodically notify downstream about device data updates
// The ticker controls the frequency of updates. The done channel is used to stop ticking
// and clean up the goroutine. The notify map contains the values to notify for.
ticker *time.Ticker
done chan struct{}
notifyMap *sync.Map // map of PollerID to bools, unwrapped when notifying
fn func(payload *pubsub.V2DeviceData)
}

// Create a new device data ticker, which batches calls to Remember and invokes a callback every
// d duration. If d is 0, no batching is performed and the callback is invoked synchronously, which
// is useful for testing.
func NewDeviceDataTicker(d time.Duration) *DeviceDataTicker {
ddt := &DeviceDataTicker{
done: make(chan struct{}),
notifyMap: &sync.Map{},
}
if d != 0 {
ddt.ticker = time.NewTicker(d)
}
return ddt
}

// Stop ticking.
func (t *DeviceDataTicker) Stop() {
if t.ticker != nil {
t.ticker.Stop()
}
close(t.done)
}

// Set the function which should be called when the tick happens.
func (t *DeviceDataTicker) SetCallback(fn func(payload *pubsub.V2DeviceData)) {
t.fn = fn
}

// Remember this user/device ID, and emit it later on.
func (t *DeviceDataTicker) Remember(pid PollerID) {
t.notifyMap.Store(pid, true)
if t.ticker == nil {
t.emitUpdate()
}
}

func (t *DeviceDataTicker) emitUpdate() {
var p pubsub.V2DeviceData
p.UserIDToDeviceIDs = make(map[string][]string)
// populate the pubsub payload
t.notifyMap.Range(func(key, value any) bool {
pid := key.(PollerID)
devices := p.UserIDToDeviceIDs[pid.UserID]
devices = append(devices, pid.DeviceID)
p.UserIDToDeviceIDs[pid.UserID] = devices
// clear the map of this value
t.notifyMap.Delete(key)
return true // keep enumerating
})
// notify if we have entries
if len(p.UserIDToDeviceIDs) > 0 {
t.fn(&p)
}
}

// Blocks forever, ticking until Stop() is called.
func (t *DeviceDataTicker) Run() {
if t.ticker == nil {
return
}
for {
select {
case <-t.done:
return
case <-t.ticker.C:
t.emitUpdate()
}
}
}
Loading
Loading