diff --git a/go.mod b/go.mod index bbae2f604f..6835cce813 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,6 @@ require ( github.com/jacobsa/oglematchers v0.0.0-20150720000706-141901ea67cd github.com/jacobsa/oglemock v0.0.0-20150831005832-e94d794d06ff github.com/jacobsa/ogletest v0.0.0-20170503003838-80d50a735a11 - github.com/jacobsa/ratelimit v0.0.0-20150904001804-f5e47030f3b0 github.com/jacobsa/reqtrace v0.0.0-20150505043853-245c9e0234cb github.com/jacobsa/syncutil v0.0.0-20180201203307-228ac8e5a6c3 github.com/jacobsa/timeutil v0.0.0-20170205232429-577e5acbbcf6 @@ -55,6 +54,7 @@ require ( golang.org/x/sync v0.1.0 // indirect golang.org/x/sys v0.6.0 // indirect golang.org/x/text v0.8.0 // indirect + golang.org/x/time v0.3.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 // indirect diff --git a/go.sum b/go.sum index 008c5ac876..b158cb6b2e 100644 --- a/go.sum +++ b/go.sum @@ -220,8 +220,6 @@ github.com/jacobsa/oglemock v0.0.0-20150831005832-e94d794d06ff h1:2xRHTvkpJ5zJmg github.com/jacobsa/oglemock v0.0.0-20150831005832-e94d794d06ff/go.mod h1:gJWba/XXGl0UoOmBQKRWCJdHrr3nE0T65t6ioaj3mLI= github.com/jacobsa/ogletest v0.0.0-20170503003838-80d50a735a11 h1:BMb8s3ENQLt5ulwVIHVDWFHp8eIXmbfSExkvdn9qMXI= github.com/jacobsa/ogletest v0.0.0-20170503003838-80d50a735a11/go.mod h1:+DBdDyfoO2McrOyDemRBq0q9CMEByef7sYl7JH5Q3BI= -github.com/jacobsa/ratelimit v0.0.0-20150904001804-f5e47030f3b0 h1:6GaIakaFrxn738iBykUc6fyS5sIAKRg/wafwzrzRX30= -github.com/jacobsa/ratelimit v0.0.0-20150904001804-f5e47030f3b0/go.mod h1:5/sdn6lSZE5l3rXMkJGO7Y3MHJImklO43rZx9ouOWYQ= github.com/jacobsa/reqtrace v0.0.0-20150505043853-245c9e0234cb h1:uSWBjJdMf47kQlXMwWEfmc864bA1wAC+Kl3ApryuG9Y= github.com/jacobsa/reqtrace v0.0.0-20150505043853-245c9e0234cb/go.mod h1:ivcmUvxXWjb27NsPEaiYK7AidlZXS7oQ5PowUS9z3I4= github.com/jacobsa/syncutil v0.0.0-20180201203307-228ac8e5a6c3 h1:+gHfvQxomE6fI4zg7QYyaGDCnuw2wylD4i6yzrQvAmY= @@ -475,6 +473,8 @@ golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= +golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= diff --git a/internal/gcsx/bucket_manager.go b/internal/gcsx/bucket_manager.go index a8cfa51c6a..d645564fba 100644 --- a/internal/gcsx/bucket_manager.go +++ b/internal/gcsx/bucket_manager.go @@ -20,17 +20,16 @@ import ( "path" "time" - "github.com/googlecloudplatform/gcsfuse/internal/storage" - "github.com/jacobsa/reqtrace" - "golang.org/x/net/context" - "github.com/googlecloudplatform/gcsfuse/internal/canned" "github.com/googlecloudplatform/gcsfuse/internal/logger" "github.com/googlecloudplatform/gcsfuse/internal/monitor" + "github.com/googlecloudplatform/gcsfuse/internal/ratelimit" + "github.com/googlecloudplatform/gcsfuse/internal/storage" "github.com/jacobsa/gcloud/gcs" "github.com/jacobsa/gcloud/gcs/gcscaching" - "github.com/jacobsa/ratelimit" + "github.com/jacobsa/reqtrace" "github.com/jacobsa/timeutil" + "golang.org/x/net/context" ) type BucketConfig struct { @@ -117,7 +116,7 @@ func setUpRateLimiting( // window of the given size. const window = 8 * time.Hour - opCapacity, err := ratelimit.ChooseTokenBucketCapacity( + opCapacity, err := ratelimit.ChooseLimiterCapacity( opRateLimitHz, window) @@ -126,7 +125,7 @@ func setUpRateLimiting( return } - egressCapacity, err := ratelimit.ChooseTokenBucketCapacity( + egressCapacity, err := ratelimit.ChooseLimiterCapacity( egressBandwidthLimit, window) diff --git a/internal/ratelimit/limiter_capacity.go b/internal/ratelimit/limiter_capacity.go new file mode 100644 index 0000000000..a4c095ebb0 --- /dev/null +++ b/internal/ratelimit/limiter_capacity.go @@ -0,0 +1,83 @@ +// Copyright 2023 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ratelimit + +import ( + "fmt" + "math" + "time" +) + +// Choose a limiter capacity that ensures that the action gated by the +// limiter will be limited to within a few percent of `rateHz * window` +// for any window of the given size. +// +// This is not be possible for all rates and windows. In that case, an error +// will be returned. +func ChooseLimiterCapacity( + rateHz float64, + window time.Duration) (capacity uint64, err error) { + // Check that the input is reasonable. + if rateHz <= 0 || math.IsInf(rateHz, 0) { + err = fmt.Errorf("Illegal rate: %f", rateHz) + return + } + + if window <= 0 { + err = fmt.Errorf("Illegal window: %v", window) + return + } + + // We cannot help but allow the rate to exceed the configured maximum by some + // factor in an arbitrary window, no matter how small we scale the max + // accumulated credit -- the bucket may be full at the start of the window, + // be immediately exhausted, then be repeatedly exhausted just before filling + // throughout the window. + // + // For example: let the window W = 10 seconds, and the bandwidth B = 20 MiB/s. + // Set the max accumulated credit C = W*B/2 = 100 MiB. Then this + // sequence of events is allowed: + // + // * T=0: Allow through 100 MiB. + // * T=4.999999: Allow through nearly 100 MiB. + // * T=9.999999: Allow through nearly 100 MiB. + // + // Above we allow through nearly 300 MiB, exceeding the allowed bytes for the + // window by nearly 50%. Note however that this trend cannot continue into + // the next window, so this must be a transient spike. + // + // In general if we set C <= W*B/N, then we're off by no more than a factor + // of (N+1)/N within any window of size W. + // + // Choose a reasonable N. + const N = 50 // At most 2% error + + w := float64(window) / float64(time.Second) + capacityFloat := math.Floor(w * rateHz / N) + if !(capacityFloat >= 1 && capacityFloat < float64(math.MaxUint64)) { + err = fmt.Errorf( + "Can't use a token bucket to limit to %f Hz over a window of %v "+ + "(result is a capacity of %f)", + rateHz, + window, + capacityFloat) + + return + } + + capacity = uint64(capacityFloat) + + return +} diff --git a/internal/ratelimit/limiter_capacity_test.go b/internal/ratelimit/limiter_capacity_test.go new file mode 100644 index 0000000000..d5dc037b58 --- /dev/null +++ b/internal/ratelimit/limiter_capacity_test.go @@ -0,0 +1,96 @@ +// Copyright 2023 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ratelimit + +import ( + "fmt" + "testing" + "time" + + . "github.com/jacobsa/ogletest" +) + +func TestLimiterCapacity(t *testing.T) { RunTests(t) } + +//////////////////////////////////////////////////////////////////////// +// Boilerplate +//////////////////////////////////////////////////////////////////////// + +type LimiterCapacityTest struct { +} + +func init() { RegisterTestSuite(&LimiterCapacityTest{}) } + +func rateLessThanOrEqualToZero(rate float64) { + _, err := ChooseLimiterCapacity(rate, 30) + + expectedError := fmt.Errorf("Illegal rate: %f", rate) + + AssertEq(expectedError.Error(), err.Error()) +} + +func (t *LimiterCapacityTest) TestRateLessThanZero() { + var negativeRateHz float64 = -1 + + rateLessThanOrEqualToZero(negativeRateHz) +} + +func (t *LimiterCapacityTest) TestRateEqualToZero() { + var zeroRateHz float64 = 0 + + rateLessThanOrEqualToZero(zeroRateHz) +} + +func windowLessThanOrEqualToZero(window time.Duration) { + _, err := ChooseLimiterCapacity(1, window) + + expectedError := fmt.Errorf("Illegal window: %v", window) + + AssertEq(expectedError.Error(), err.Error()) +} + +func (t *LimiterCapacityTest) TestWindowLessThanZero() { + var negativeWindow time.Duration = -1 + + windowLessThanOrEqualToZero(negativeWindow) +} + +func (t *LimiterCapacityTest) TestWindowEqualToZero() { + var zeroWindow time.Duration = 0 + + windowLessThanOrEqualToZero(zeroWindow) +} + +func (t *LimiterCapacityTest) TestCapacityEqualToZero() { + var rate = 0.5 + var window time.Duration = 1 + + capacity, err := ChooseLimiterCapacity(rate, window) + + expectedError := fmt.Errorf( + "Can't use a token bucket to limit to %f Hz over a window of %v (result is a capacity of %f)", rate, window, float64(capacity)) + AssertEq(expectedError.Error(), err.Error()) +} + +func (t *LimiterCapacityTest) TestExpectedCapacity() { + var rate float64 = 20 + var window = 10 * time.Second + + capacity, err := ChooseLimiterCapacity(rate, window) + // capacity = floor((20.0 * 10)/50) = floor(4.0) = 4 + + ExpectEq(nil, err) + ExpectEq(4, capacity) +} diff --git a/internal/ratelimit/throttle.go b/internal/ratelimit/throttle.go new file mode 100644 index 0000000000..4b45da0976 --- /dev/null +++ b/internal/ratelimit/throttle.go @@ -0,0 +1,58 @@ +// Copyright 2023 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ratelimit + +import ( + "golang.org/x/net/context" + "golang.org/x/time/rate" +) + +// A simple interface for limiting the rate of some event. Unlike TokenBucket, +// does not allow the user control over what time means. +// +// Safe for concurrent access. +type Throttle interface { + // Return the maximum number of tokens that can be requested in a call to + // Wait. + Capacity() (c uint64) + + // Acquire the given number of tokens from the underlying token bucket, then + // sleep until when it says to wake. If the context is cancelled before then, + // return early with an error. + // + // REQUIRES: tokens <= capacity + Wait(ctx context.Context, tokens uint64) (err error) +} + +type limiter struct { + *rate.Limiter +} + +func NewThrottle( + rateHz float64, + capacity uint64) (t Throttle) { + t = &limiter{rate.NewLimiter(rate.Limit(rateHz), int(capacity))} + return +} + +func (l *limiter) Capacity() (c uint64) { + return uint64(l.Burst()) +} + +func (l *limiter) Wait( + ctx context.Context, + tokens uint64) (err error) { + return l.WaitN(ctx, int(tokens)) +} diff --git a/internal/ratelimit/throttle_reader_test.go b/internal/ratelimit/throttle_reader_test.go new file mode 100644 index 0000000000..25e612fefe --- /dev/null +++ b/internal/ratelimit/throttle_reader_test.go @@ -0,0 +1,335 @@ +// Copyright 2023 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ratelimit + +import ( + "errors" + "io" + "testing" + + "golang.org/x/net/context" + + . "github.com/jacobsa/ogletest" +) + +func TestThrottledReader(t *testing.T) { RunTests(t) } + +//////////////////////////////////////////////////////////////////////// +// Helpers +//////////////////////////////////////////////////////////////////////// + +// An io.Reader that defers to a function. +type funcReader struct { + f func([]byte) (int, error) +} + +func (fr *funcReader) Read(p []byte) (n int, err error) { + n, err = fr.f(p) + return +} + +// A throttler that defers to a function. +type funcThrottle struct { + f func(context.Context, uint64) error +} + +func (ft *funcThrottle) Capacity() (c uint64) { + return 1024 +} + +func (ft *funcThrottle) Wait( + ctx context.Context, + tokens uint64) (err error) { + err = ft.f(ctx, tokens) + return +} + +//////////////////////////////////////////////////////////////////////// +// Boilerplate +//////////////////////////////////////////////////////////////////////// + +type ThrottledReaderTest struct { + ctx context.Context + + wrapped funcReader + throttle funcThrottle + + reader io.Reader +} + +var _ SetUpInterface = &ThrottledReaderTest{} + +func init() { RegisterTestSuite(&ThrottledReaderTest{}) } + +func (t *ThrottledReaderTest) SetUp(ti *TestInfo) { + t.ctx = ti.Ctx + + // Set up the default throttle function. + t.throttle.f = func(ctx context.Context, tokens uint64) (err error) { + return + } + + // Set up the reader. + t.reader = ThrottledReader(t.ctx, &t.wrapped, &t.throttle) +} + +//////////////////////////////////////////////////////////////////////// +// Tests +//////////////////////////////////////////////////////////////////////// + +func (t *ThrottledReaderTest) CallsThrottle() { + const readSize = 17 + AssertLe(readSize, t.throttle.Capacity()) + + // Throttle + var throttleCalled bool + t.throttle.f = func(ctx context.Context, tokens uint64) (err error) { + AssertFalse(throttleCalled) + throttleCalled = true + + AssertEq(t.ctx.Err(), ctx.Err()) + AssertEq(t.ctx.Done(), ctx.Done()) + AssertEq(readSize, tokens) + + err = errors.New("") + return + } + + // Call + _, err := t.reader.Read(make([]byte, readSize)) + + ExpectEq("", err.Error()) + ExpectTrue(throttleCalled) +} + +func (t *ThrottledReaderTest) ThrottleReturnsError() { + // Throttle + expectedErr := errors.New("taco") + t.throttle.f = func(ctx context.Context, tokens uint64) (err error) { + err = expectedErr + return + } + + // Call + n, err := t.reader.Read(make([]byte, 1)) + + ExpectEq(0, n) + ExpectEq(expectedErr, err) +} + +func (t *ThrottledReaderTest) CallsWrapped() { + buf := make([]byte, 16) + AssertLe(len(buf), t.throttle.Capacity()) + + // Wrapped + var readCalled bool + t.wrapped.f = func(p []byte) (n int, err error) { + AssertFalse(readCalled) + readCalled = true + + AssertEq(&buf[0], &p[0]) + AssertEq(len(buf), len(p)) + + err = errors.New("") + return + } + + // Call + _, err := t.reader.Read(buf) + + ExpectEq("", err.Error()) + ExpectTrue(readCalled) +} + +func (t *ThrottledReaderTest) WrappedReturnsError() { + // Wrapped + expectedErr := errors.New("taco") + t.wrapped.f = func(p []byte) (n int, err error) { + n = 11 + err = expectedErr + return + } + + // Call + n, err := t.reader.Read(make([]byte, 16)) + + ExpectEq(11, n) + ExpectEq(expectedErr, err) +} + +func (t *ThrottledReaderTest) WrappedReturnsEOF() { + // Wrapped + t.wrapped.f = func(p []byte) (n int, err error) { + n = 11 + err = io.EOF + return + } + + // Call + n, err := t.reader.Read(make([]byte, 16)) + + ExpectEq(11, n) + ExpectEq(io.EOF, err) +} + +func (t *ThrottledReaderTest) WrappedReturnsFullRead() { + const readSize = 17 + AssertLe(readSize, t.throttle.Capacity()) + + // Wrapped + t.wrapped.f = func(p []byte) (n int, err error) { + n = len(p) + return + } + + // Call + n, err := t.reader.Read(make([]byte, readSize)) + + ExpectEq(nil, err) + ExpectEq(readSize, n) +} + +func (t *ThrottledReaderTest) WrappedReturnsShortRead_CallsAgain() { + buf := make([]byte, 16) + AssertLe(len(buf), t.throttle.Capacity()) + + // Wrapped + var callCount int + t.wrapped.f = func(p []byte) (n int, err error) { + AssertLt(callCount, 2) + switch callCount { + case 0: + callCount++ + n = 2 + + case 1: + callCount++ + AssertEq(&buf[2], &p[0]) + AssertEq(len(buf)-2, len(p)) + err = errors.New("") + } + + return + } + + // Call + _, err := t.reader.Read(buf) + + ExpectEq("", err.Error()) + ExpectEq(2, callCount) +} + +func (t *ThrottledReaderTest) WrappedReturnsShortRead_SecondReturnsError() { + // Wrapped + var callCount int + expectedErr := errors.New("taco") + + t.wrapped.f = func(p []byte) (n int, err error) { + AssertLt(callCount, 2) + switch callCount { + case 0: + callCount++ + n = 2 + + case 1: + callCount++ + n = 11 + err = expectedErr + } + + return + } + + // Call + n, err := t.reader.Read(make([]byte, 16)) + + ExpectEq(2+11, n) + ExpectEq(expectedErr, err) +} + +func (t *ThrottledReaderTest) WrappedReturnsShortRead_SecondReturnsEOF() { + // Wrapped + var callCount int + t.wrapped.f = func(p []byte) (n int, err error) { + AssertLt(callCount, 2) + switch callCount { + case 0: + callCount++ + n = 2 + + case 1: + callCount++ + n = 11 + err = io.EOF + } + + return + } + + // Call + n, err := t.reader.Read(make([]byte, 16)) + + ExpectEq(2+11, n) + ExpectEq(io.EOF, err) +} + +func (t *ThrottledReaderTest) WrappedReturnsShortRead_SecondSucceedsInFull() { + // Wrapped + var callCount int + t.wrapped.f = func(p []byte) (n int, err error) { + AssertLt(callCount, 2) + switch callCount { + case 0: + callCount++ + n = 2 + + case 1: + callCount++ + n = len(p) + } + + return + } + + // Call + n, err := t.reader.Read(make([]byte, 16)) + + ExpectEq(16, n) + ExpectEq(nil, err) +} + +func (t *ThrottledReaderTest) ReadSizeIsAboveThrottleCapacity() { + buf := make([]byte, 2048) + AssertGt(len(buf), t.throttle.Capacity()) + + // Wrapped + var readCalled bool + t.wrapped.f = func(p []byte) (n int, err error) { + AssertFalse(readCalled) + readCalled = true + + AssertEq(&buf[0], &p[0]) + ExpectEq(t.throttle.Capacity(), len(p)) + + err = errors.New("") + return + } + + // Call + _, err := t.reader.Read(buf) + + ExpectEq("", err.Error()) + ExpectTrue(readCalled) +} diff --git a/internal/ratelimit/throttle_test.go b/internal/ratelimit/throttle_test.go new file mode 100644 index 0000000000..3a6a7ada67 --- /dev/null +++ b/internal/ratelimit/throttle_test.go @@ -0,0 +1,209 @@ +// Copyright 2023 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// It is performing integration tests for throttle.go +// Set up several test cases where we have N goroutines simulating the arrival of +// packets at a given rate, asking a limiter when to admit them. +// limiter can accept number of packets equivalent to capacity. After that, +// it will wait until limiter get space to receive the new packet. +package ratelimit_test + +import ( + cryptorand "crypto/rand" + "io" + "math/rand" + "runtime" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/googlecloudplatform/gcsfuse/internal/ratelimit" + "golang.org/x/net/context" + + . "github.com/jacobsa/oglematchers" + . "github.com/jacobsa/ogletest" +) + +func TestThrottle(t *testing.T) { RunTests(t) } + +//////////////////////////////////////////////////////////////////////// +// Helpers +//////////////////////////////////////////////////////////////////////// + +func makeSeed() (seed int64) { + var buf [8]byte + _, err := io.ReadFull(cryptorand.Reader, buf[:]) + if err != nil { + panic(err) + } + + seed = (int64(buf[0])>>1)<<56 | + int64(buf[1])<<48 | + int64(buf[2])<<40 | + int64(buf[3])<<32 | + int64(buf[4])<<24 | + int64(buf[5])<<16 | + int64(buf[6])<<8 | + int64(buf[7])<<0 + + return +} + +func processArrivals( + ctx context.Context, + throttle ratelimit.Throttle, + arrivalRateHz float64, + d time.Duration) (processed uint64) { + // Set up an independent source of randomness. + randSrc := rand.New(rand.NewSource(makeSeed())) + + // Tick into a channel at a steady rate, buffering over delays caused by the + // limiter. + arrivalPeriod := time.Duration((1.0 / arrivalRateHz) * float64(time.Second)) + ticks := make(chan struct{}, 3*int(float64(d)/float64(arrivalPeriod))) + + go func() { + ticker := time.NewTicker(arrivalPeriod) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + + case <-ticker.C: + select { + case ticks <- struct{}{}: + default: + panic("Buffer exceeded?") + } + } + } + }() + + // Simulate until we're supposed to stop. + for { + // Accumulate a few packets. + toAccumulate := uint64(randSrc.Int63n(5)) + + var accumulated uint64 + for accumulated < toAccumulate { + select { + case <-ctx.Done(): + return + + case <-ticks: + accumulated++ + } + } + + // Wait. + err := throttle.Wait(ctx, accumulated) + if err != nil { + return + } + + processed += accumulated + } +} + +//////////////////////////////////////////////////////////////////////// +// Boilerplate +//////////////////////////////////////////////////////////////////////// + +type ThrottleTest struct { +} + +func init() { RegisterTestSuite(&ThrottleTest{}) } + +//////////////////////////////////////////////////////////////////////// +// Tests +//////////////////////////////////////////////////////////////////////// + +func (t *ThrottleTest) IntegrationTest() { + runtime.GOMAXPROCS(runtime.NumCPU()) + const perCaseDuration = 1 * time.Second + + // Set up several test cases where we have N goroutines simulating arrival of + // packets at a given rate, asking a limiter when to admit them. + testCases := []struct { + numActors int + arrivalRateHz float64 + limitRateHz float64 + }{ + // Single actor + {1, 150, 200}, + {1, 200, 200}, + {1, 250, 200}, + + // Multiple actors + {4, 150, 200}, + {4, 200, 200}, + {4, 250, 200}, + } + + // Run each test case. + for i, tc := range testCases { + // Create a throttle. + capacity, err := ratelimit.ChooseLimiterCapacity( + tc.limitRateHz, + perCaseDuration) + + AssertEq(nil, err) + + throttle := ratelimit.NewThrottle(tc.limitRateHz, capacity) + + // Start workers. + var wg sync.WaitGroup + var totalProcessed uint64 + + ctx, _ := context.WithDeadline( + context.Background(), + time.Now().Add(perCaseDuration)) + + for i := 0; i < tc.numActors; i++ { + wg.Add(1) + go func() { + defer wg.Done() + processed := processArrivals( + ctx, + throttle, + tc.arrivalRateHz/float64(tc.numActors), + perCaseDuration) + + atomic.AddUint64(&totalProcessed, processed) + }() + } + + // Wait for them all to finish. + wg.Wait() + + // We should have processed about the correct number of arrivals. + smallerRateHz := tc.arrivalRateHz + if smallerRateHz > tc.limitRateHz { + smallerRateHz = tc.limitRateHz + } + + expected := smallerRateHz * (float64(perCaseDuration) / float64(time.Second)) + ExpectThat( + totalProcessed, + AllOf( + GreaterThan(expected*0.90), + LessThan(expected*1.10)), + "Test case %d. expected: %f", + i, + expected) + } +} diff --git a/internal/ratelimit/throttled_bucket.go b/internal/ratelimit/throttled_bucket.go new file mode 100644 index 0000000000..82112ef4b1 --- /dev/null +++ b/internal/ratelimit/throttled_bucket.go @@ -0,0 +1,202 @@ +// Copyright 2023 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ratelimit + +import ( + "io" + + "github.com/jacobsa/gcloud/gcs" + "golang.org/x/net/context" +) + +// Create a bucket that limits the rate at which it calls the wrapped bucket +// using opThrottle, and limits the bandwidth with which it reads from the +// wrapped bucket using egressThrottle. +func NewThrottledBucket( + opThrottle Throttle, + egressThrottle Throttle, + wrapped gcs.Bucket) (b gcs.Bucket) { + b = &throttledBucket{ + opThrottle: opThrottle, + egressThrottle: egressThrottle, + wrapped: wrapped, + } + return +} + +//////////////////////////////////////////////////////////////////////// +// throttledBucket +//////////////////////////////////////////////////////////////////////// + +type throttledBucket struct { + opThrottle Throttle + egressThrottle Throttle + wrapped gcs.Bucket +} + +func (b *throttledBucket) Name() string { + return b.wrapped.Name() +} + +func (b *throttledBucket) NewReader( + ctx context.Context, + req *gcs.ReadObjectRequest) (rc io.ReadCloser, err error) { + // Wait for permission to call through. + + err = b.opThrottle.Wait(ctx, 1) + if err != nil { + return + } + + // Call through. + rc, err = b.wrapped.NewReader(ctx, req) + if err != nil { + return + } + + // Wrap the result in a throttled layer. + rc = &readerCloser{ + Reader: ThrottledReader(ctx, rc, b.egressThrottle), + Closer: rc, + } + + return +} + +func (b *throttledBucket) CreateObject( + ctx context.Context, + req *gcs.CreateObjectRequest) (o *gcs.Object, err error) { + // Wait for permission to call through. + err = b.opThrottle.Wait(ctx, 1) + if err != nil { + return + } + + // Call through. + o, err = b.wrapped.CreateObject(ctx, req) + + return +} + +func (b *throttledBucket) CopyObject( + ctx context.Context, + req *gcs.CopyObjectRequest) (o *gcs.Object, err error) { + // Wait for permission to call through. + err = b.opThrottle.Wait(ctx, 1) + if err != nil { + return + } + + // Call through. + o, err = b.wrapped.CopyObject(ctx, req) + + return +} + +func (b *throttledBucket) ComposeObjects( + ctx context.Context, + req *gcs.ComposeObjectsRequest) (o *gcs.Object, err error) { + // Wait for permission to call through. + err = b.opThrottle.Wait(ctx, 1) + if err != nil { + return + } + + // Call through. + o, err = b.wrapped.ComposeObjects(ctx, req) + + return +} + +func (b *throttledBucket) StatObject( + ctx context.Context, + req *gcs.StatObjectRequest) (o *gcs.Object, err error) { + // Wait for permission to call through. + err = b.opThrottle.Wait(ctx, 1) + if err != nil { + return + } + + // Call through. + o, err = b.wrapped.StatObject(ctx, req) + + return +} + +func (b *throttledBucket) ListObjects( + ctx context.Context, + req *gcs.ListObjectsRequest) (listing *gcs.Listing, err error) { + // Wait for permission to call through. + err = b.opThrottle.Wait(ctx, 1) + if err != nil { + return + } + + // Call through. + listing, err = b.wrapped.ListObjects(ctx, req) + + return +} + +func (b *throttledBucket) UpdateObject( + ctx context.Context, + req *gcs.UpdateObjectRequest) (o *gcs.Object, err error) { + // Wait for permission to call through. + err = b.opThrottle.Wait(ctx, 1) + if err != nil { + return + } + + // Call through. + o, err = b.wrapped.UpdateObject(ctx, req) + + return +} + +func (b *throttledBucket) DeleteObject( + ctx context.Context, + req *gcs.DeleteObjectRequest) (err error) { + // Wait for permission to call through. + err = b.opThrottle.Wait(ctx, 1) + if err != nil { + return + } + + // Call through. + err = b.wrapped.DeleteObject(ctx, req) + + return +} + +//////////////////////////////////////////////////////////////////////// +// readerCloser +//////////////////////////////////////////////////////////////////////// + +// An io.ReadCloser that forwards read requests to an io.Reader and close +// requests to an io.Closer. +type readerCloser struct { + Reader io.Reader + Closer io.Closer +} + +func (rc *readerCloser) Read(p []byte) (n int, err error) { + n, err = rc.Reader.Read(p) + return +} + +func (rc *readerCloser) Close() (err error) { + err = rc.Closer.Close() + return +} diff --git a/internal/ratelimit/throttled_reader.go b/internal/ratelimit/throttled_reader.go new file mode 100644 index 0000000000..5794a02cbe --- /dev/null +++ b/internal/ratelimit/throttled_reader.go @@ -0,0 +1,66 @@ +// Copyright 2023 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ratelimit + +import ( + "io" + + "golang.org/x/net/context" +) + +// Create a reader that limits the bandwidth of reads made from r according to +// the supplied throttler. Reads are assumed to be made under the supplied +// context. +func ThrottledReader( + ctx context.Context, + r io.Reader, + throttle Throttle) io.Reader { + return &throttledReader{ + ctx: ctx, + wrapped: r, + throttle: throttle, + } +} + +type throttledReader struct { + ctx context.Context + wrapped io.Reader + throttle Throttle +} + +func (tr *throttledReader) Read(p []byte) (n int, err error) { + // We can't serve a read larger than the throttle's capacity. + if uint64(len(p)) > tr.throttle.Capacity() { + p = p[:tr.throttle.Capacity()] + } + + // Wait for permission to continue. + err = tr.throttle.Wait(tr.ctx, uint64(len(p))) + if err != nil { + return + } + + // Serve the full amount we acquired from the throttle (unless we hit an + // early error, including EOF). + for len(p) > 0 && err == nil { + var tmp int + tmp, err = tr.wrapped.Read(p) + + n += tmp + p = p[tmp:] + } + + return +}