Skip to content

Commit

Permalink
Merge pull request #200 from matrix-org/dmr/debug-from-stable
Browse files Browse the repository at this point in the history
Merge hotfix branch into main
  • Loading branch information
kegsay authored Jul 12, 2023
2 parents 57c25ab + c1b0f0b commit ad7c264
Show file tree
Hide file tree
Showing 28 changed files with 1,175 additions and 266 deletions.
7 changes: 6 additions & 1 deletion internal/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ var (
// logging metadata for a single request
type data struct {
userID string
deviceID string
since int64
next int64
numRooms int
Expand All @@ -37,13 +38,14 @@ func RequestContext(ctx context.Context) context.Context {
}

// add the user ID to this request context. Need to have called RequestContext first.
func SetRequestContextUserID(ctx context.Context, userID string) {
func SetRequestContextUserID(ctx context.Context, userID, deviceID string) {
d := ctx.Value(ctxData)
if d == nil {
return
}
da := d.(*data)
da.userID = userID
da.deviceID = deviceID
if hub := sentry.GetHubFromContext(ctx); hub != nil {
sentry.ConfigureScope(func(scope *sentry.Scope) {
scope.SetUser(sentry.User{Username: userID})
Expand Down Expand Up @@ -79,6 +81,9 @@ func DecorateLogger(ctx context.Context, l *zerolog.Event) *zerolog.Event {
if da.userID != "" {
l = l.Str("u", da.userID)
}
if da.deviceID != "" {
l = l.Str("dev", da.deviceID)
}
if da.since >= 0 {
l = l.Int64("p", da.since)
}
Expand Down
67 changes: 67 additions & 0 deletions internal/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package internal

type WorkerPool struct {
N int
ch chan func()
}

// Create a new worker pool of size N. Up to N work can be done concurrently.
// The size of N depends on the expected frequency of work and contention for
// shared resources. Large values of N allow more frequent work at the cost of
// more contention for shared resources like cpu, memory and fds. Small values
// of N allow less frequent work but control the amount of shared resource contention.
// Ideally this value will be derived from whatever shared resource constraints you
// are hitting up against, rather than set to a fixed value. For example, if you have
// a database connection limit of 100, then setting N to some fraction of the limit is
// preferred to setting this to an arbitrary number < 100. If more than N work is requested,
// eventually WorkerPool.Queue will block until some work is done.
//
// The larger N is, the larger the up front memory costs are due to the implementation of WorkerPool.
func NewWorkerPool(n int) *WorkerPool {
return &WorkerPool{
N: n,
// If we have N workers, we can process N work concurrently.
// If we have >N work, we need to apply backpressure to stop us
// making more and more work which takes up more and more memory.
// By setting the channel size to N, we ensure that backpressure is
// being applied on the producer, stopping it from creating more work,
// and hence bounding memory consumption. Work is still being produced
// upstream on the homeserver, but we will consume it when we're ready
// rather than gobble it all at once.
//
// Note: we aren't forced to set this to N, it just serves as a useful
// metric which scales on the number of workers. The amount of in-flight
// work is N, so it makes sense to allow up to N work to be queued up before
// applying backpressure. If the channel buffer is < N then the channel can
// become the bottleneck in the case where we have lots of instantaneous work
// to do. If the channel buffer is too large, we needlessly consume memory as
// make() will allocate a backing array of whatever size you give it up front (sad face)
ch: make(chan func(), n),
}
}

// Start the workers. Only call this once.
func (wp *WorkerPool) Start() {
for i := 0; i < wp.N; i++ {
go wp.worker()
}
}

// Stop the worker pool. Only really useful for tests as a worker pool should be started once
// and persist for the lifetime of the process, else it causes needless goroutine churn.
// Only call this once.
func (wp *WorkerPool) Stop() {
close(wp.ch)
}

// Queue some work on the pool. May or may not block until some work is processed.
func (wp *WorkerPool) Queue(fn func()) {
wp.ch <- fn
}

// worker impl
func (wp *WorkerPool) worker() {
for fn := range wp.ch {
fn()
}
}
186 changes: 186 additions & 0 deletions internal/pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
package internal

import (
"sync"
"testing"
"time"
)

// Test basic functions of WorkerPool
func TestWorkerPool(t *testing.T) {
wp := NewWorkerPool(2)
wp.Start()
defer wp.Stop()

// we should process this concurrently as N=2 so it should take 1s not 2s
var wg sync.WaitGroup
wg.Add(2)
start := time.Now()
wp.Queue(func() {
time.Sleep(time.Second)
wg.Done()
})
wp.Queue(func() {
time.Sleep(time.Second)
wg.Done()
})
wg.Wait()
took := time.Since(start)
if took > 2*time.Second {
t.Fatalf("took %v for queued work, it should have been faster than 2s", took)
}
}

func TestWorkerPoolDoesWorkPriorToStart(t *testing.T) {
wp := NewWorkerPool(2)

// return channel to use to see when work is done
ch := make(chan int, 2)
wp.Queue(func() {
ch <- 1
})
wp.Queue(func() {
ch <- 2
})

// the work should not be done yet
time.Sleep(100 * time.Millisecond)
if len(ch) > 0 {
t.Fatalf("Queued work was done before Start()")
}

// the work should be starting now
wp.Start()
defer wp.Stop()

sum := 0
for {
select {
case <-time.After(time.Second):
t.Fatalf("timed out waiting for work to be done")
case val := <-ch:
sum += val
}
if sum == 3 { // 2 + 1
break
}
}
}

type workerState struct {
id int
state int // not running, queued, running, finished
unblock *sync.WaitGroup // decrement to unblock this worker
}

func TestWorkerPoolBackpressure(t *testing.T) {
// this test assumes backpressure starts at n*2+1 due to a chan buffer of size n, and n in-flight work.
n := 2
wp := NewWorkerPool(n)
wp.Start()
defer wp.Stop()

var mu sync.Mutex
stateNotRunning := 0
stateQueued := 1
stateRunning := 2
stateFinished := 3
size := (2 * n) + 1
running := make([]*workerState, size)

go func() {
// we test backpressure by scheduling (n*2)+1 work and ensuring that we see the following running states:
// [2,2,1,1,0] <-- 2 running, 2 queued, 1 blocked <-- THIS IS BACKPRESSURE
// [3,2,2,1,1] <-- 1 finished, 2 running, 2 queued
// [3,3,2,2,1] <-- 2 finished, 2 running , 1 queued
// [3,3,3,2,2] <-- 3 finished, 2 running
for i := 0; i < size; i++ {
// set initial state of this piece of work
wg := &sync.WaitGroup{}
wg.Add(1)
state := &workerState{
id: i,
state: stateNotRunning,
unblock: wg,
}
mu.Lock()
running[i] = state
mu.Unlock()

// queue the work on the pool. The final piece of work will block here and remain in
// stateNotRunning and not transition to stateQueued until the first piece of work is done.
wp.Queue(func() {
mu.Lock()
if running[state.id].state != stateQueued {
// we ran work in the worker faster than the code underneath .Queue, so let it catch up
mu.Unlock()
time.Sleep(10 * time.Millisecond)
mu.Lock()
}
running[state.id].state = stateRunning
mu.Unlock()

running[state.id].unblock.Wait()
mu.Lock()
running[state.id].state = stateFinished
mu.Unlock()
})

// mark this work as queued
mu.Lock()
running[i].state = stateQueued
mu.Unlock()
}
}()

// wait for the workers to be doing work and assert the states of each task
time.Sleep(time.Second)

assertStates(t, &mu, running, []int{
stateRunning, stateRunning, stateQueued, stateQueued, stateNotRunning,
})

// now let the first task complete
running[0].unblock.Done()
// wait for the pool to grab more work
time.Sleep(100 * time.Millisecond)
// assert new states
assertStates(t, &mu, running, []int{
stateFinished, stateRunning, stateRunning, stateQueued, stateQueued,
})

// now let the second task complete
running[1].unblock.Done()
// wait for the pool to grab more work
time.Sleep(100 * time.Millisecond)
// assert new states
assertStates(t, &mu, running, []int{
stateFinished, stateFinished, stateRunning, stateRunning, stateQueued,
})

// now let the third task complete
running[2].unblock.Done()
// wait for the pool to grab more work
time.Sleep(100 * time.Millisecond)
// assert new states
assertStates(t, &mu, running, []int{
stateFinished, stateFinished, stateFinished, stateRunning, stateRunning,
})

}

func assertStates(t *testing.T, mu *sync.Mutex, running []*workerState, wantStates []int) {
t.Helper()
mu.Lock()
defer mu.Unlock()
if len(running) != len(wantStates) {
t.Fatalf("assertStates: bad wantStates length, got %d want %d", len(wantStates), len(running))
}
for i := range running {
state := running[i]
wantVal := wantStates[i]
if state.state != wantVal {
t.Errorf("work[%d] got state %d want %d", i, state.state, wantVal)
}
}
}
4 changes: 1 addition & 3 deletions pubsub/v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,7 @@ type V2InitialSyncComplete struct {
func (*V2InitialSyncComplete) Type() string { return "V2InitialSyncComplete" }

type V2DeviceData struct {
UserID string
DeviceID string
Pos int64
UserIDToDeviceIDs map[string][]string
}

func (*V2DeviceData) Type() string { return "V2DeviceData" }
Expand Down
Loading

0 comments on commit ad7c264

Please sign in to comment.