Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add inflight message limiter #429

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions flowcontrol/fixed.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,27 @@ func (fl *fixedLimiter) StartMessage(ctx context.Context, size uint64) (gotRespo
}

func (fixedLimiter) Release() {}

// NewInflightMessageLimiter returns a FlowLimiter that enforces a fixed limit on
// the number of inflight messages. This is useful when you don't know the size
// of messages a priori.
func NewInflightMessageLimiter(n int64) FlowLimiter {
sem := semaphore.NewWeighted(n)
return (*inflightMessageLimiter)(sem)
}

type inflightMessageLimiter semaphore.Weighted

func (fl *inflightMessageLimiter) StartMessage(ctx context.Context, _ uint64) (gotResponse func(), err error) {
if err = (*semaphore.Weighted)(fl).Acquire(ctx, 1); err == nil {
gotResponse = fl.gotResponse
}

return
}

func (fl *inflightMessageLimiter) gotResponse() {
(*semaphore.Weighted)(fl).Release(1)
}

func (inflightMessageLimiter) Release() {}
52 changes: 43 additions & 9 deletions flowcontrol/fixed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,44 +6,78 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestFixed(t *testing.T) {
t.Parallel()

ctx := context.Background()
lim := NewFixedLimiter(10)

// Start a couple messages:
got4, err := lim.StartMessage(ctx, 4)
assert.Nil(t, err, "Limiter returned an error")
require.NoError(t, err, "Limiter returned an error")
got6, err := lim.StartMessage(ctx, 6)
assert.Nil(t, err, "Limiter returned an error")
require.NoError(t, err, "Limiter returned an error")

// We're now exactly at the limit, so if we try again it should block:
func() {
ctxTimeout, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer cancel()

_, err = lim.StartMessage(ctxTimeout, 1)
assert.NotNil(t, err, "Limiter didn't return an error")
assert.Equal(t, err, ctxTimeout.Err(), "Error wasn't from the context")
release, err := lim.StartMessage(ctxTimeout, 1)
require.ErrorIs(t, err, context.DeadlineExceeded, "Limiter didn't return an error")
assert.Nil(t, release, "should not return release function for failed call to StartMessage")
}()

// Ok, finish one of them and then it should go through again:
got4()
got1, err := lim.StartMessage(ctx, 1)
assert.Nil(t, err, "Limiter returned an error")
require.NoError(t, err, "Limiter returned an error")

// There are 10 - (6 + 1) = 3 bytes remaining. It should therefore block
// if we ask for four:
func() {
ctxTimeout, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer cancel()

_, err = lim.StartMessage(ctxTimeout, 4)
assert.NotNil(t, err, "Limiter didn't return an error")
assert.Equal(t, err, ctxTimeout.Err(), "Error wasn't from the context")
release, err := lim.StartMessage(ctxTimeout, 4)
require.ErrorIs(t, err, context.DeadlineExceeded, "Limiter didn't return an error")
assert.Nil(t, release, "should not return release function for failed call to StartMessage")
}()
got6()
got1()
}

func TestInflight(t *testing.T) {
t.Parallel()

ctx := context.Background()
lim := NewInflightMessageLimiter(2)

// Start a couple messages:
got1, err := lim.StartMessage(ctx, 4) // use arbitrary size to ensure we don't block
require.NoError(t, err, "Limiter returned an error")
got2, err := lim.StartMessage(ctx, 6)
require.NoError(t, err, "Limiter returned an error")

// We're now exactly at the limit, so if we try again it should block:
func() {
ctxTimeout, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer cancel()

release, err := lim.StartMessage(ctxTimeout, 1)
require.ErrorIs(t, err, context.DeadlineExceeded, "Limiter didn't return an error")
assert.Nil(t, release, "should not return release function for failed call to StartMessage")
}()

// Ok, finish one of them and then it should go through again:
got1()
got1, err = lim.StartMessage(ctx, 1)
assert.NoError(t, err, "Limiter returned an error")

// Clean up
got1()
got2()
}