Skip to content

Commit

Permalink
Removing rate limit dependency (#1253)
Browse files Browse the repository at this point in the history
* local changes

* local changes

* local changes

* local changes

* removed token bucket and added unit tests

* fixing lint

* small fix- renaming

* small fix- renaming

* fix lint

* adding licence

* testing

* back to changes

* fixing comments

* removing throttle_test

* adding throttle test

* fixing comment

* fixing lint

* fixed comment

* lint tests

* lint tests

* fixing comment

* empty commit

* empty commit
  • Loading branch information
Tulsishah authored Aug 9, 2023
1 parent 7052657 commit 4efd86e
Show file tree
Hide file tree
Showing 10 changed files with 1,058 additions and 10 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ require (
github.com/jacobsa/oglematchers v0.0.0-20150720000706-141901ea67cd
github.com/jacobsa/oglemock v0.0.0-20150831005832-e94d794d06ff
github.com/jacobsa/ogletest v0.0.0-20170503003838-80d50a735a11
github.com/jacobsa/ratelimit v0.0.0-20150904001804-f5e47030f3b0
github.com/jacobsa/reqtrace v0.0.0-20150505043853-245c9e0234cb
github.com/jacobsa/syncutil v0.0.0-20180201203307-228ac8e5a6c3
github.com/jacobsa/timeutil v0.0.0-20170205232429-577e5acbbcf6
Expand Down Expand Up @@ -55,6 +54,7 @@ require (
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,6 @@ github.com/jacobsa/oglemock v0.0.0-20150831005832-e94d794d06ff h1:2xRHTvkpJ5zJmg
github.com/jacobsa/oglemock v0.0.0-20150831005832-e94d794d06ff/go.mod h1:gJWba/XXGl0UoOmBQKRWCJdHrr3nE0T65t6ioaj3mLI=
github.com/jacobsa/ogletest v0.0.0-20170503003838-80d50a735a11 h1:BMb8s3ENQLt5ulwVIHVDWFHp8eIXmbfSExkvdn9qMXI=
github.com/jacobsa/ogletest v0.0.0-20170503003838-80d50a735a11/go.mod h1:+DBdDyfoO2McrOyDemRBq0q9CMEByef7sYl7JH5Q3BI=
github.com/jacobsa/ratelimit v0.0.0-20150904001804-f5e47030f3b0 h1:6GaIakaFrxn738iBykUc6fyS5sIAKRg/wafwzrzRX30=
github.com/jacobsa/ratelimit v0.0.0-20150904001804-f5e47030f3b0/go.mod h1:5/sdn6lSZE5l3rXMkJGO7Y3MHJImklO43rZx9ouOWYQ=
github.com/jacobsa/reqtrace v0.0.0-20150505043853-245c9e0234cb h1:uSWBjJdMf47kQlXMwWEfmc864bA1wAC+Kl3ApryuG9Y=
github.com/jacobsa/reqtrace v0.0.0-20150505043853-245c9e0234cb/go.mod h1:ivcmUvxXWjb27NsPEaiYK7AidlZXS7oQ5PowUS9z3I4=
github.com/jacobsa/syncutil v0.0.0-20180201203307-228ac8e5a6c3 h1:+gHfvQxomE6fI4zg7QYyaGDCnuw2wylD4i6yzrQvAmY=
Expand Down Expand Up @@ -475,6 +473,8 @@ golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
Expand Down
13 changes: 6 additions & 7 deletions internal/gcsx/bucket_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,16 @@ import (
"path"
"time"

"github.com/googlecloudplatform/gcsfuse/internal/storage"
"github.com/jacobsa/reqtrace"
"golang.org/x/net/context"

"github.com/googlecloudplatform/gcsfuse/internal/canned"
"github.com/googlecloudplatform/gcsfuse/internal/logger"
"github.com/googlecloudplatform/gcsfuse/internal/monitor"
"github.com/googlecloudplatform/gcsfuse/internal/ratelimit"
"github.com/googlecloudplatform/gcsfuse/internal/storage"
"github.com/jacobsa/gcloud/gcs"
"github.com/jacobsa/gcloud/gcs/gcscaching"
"github.com/jacobsa/ratelimit"
"github.com/jacobsa/reqtrace"
"github.com/jacobsa/timeutil"
"golang.org/x/net/context"
)

type BucketConfig struct {
Expand Down Expand Up @@ -117,7 +116,7 @@ func setUpRateLimiting(
// window of the given size.
const window = 8 * time.Hour

opCapacity, err := ratelimit.ChooseTokenBucketCapacity(
opCapacity, err := ratelimit.ChooseLimiterCapacity(
opRateLimitHz,
window)

Expand All @@ -126,7 +125,7 @@ func setUpRateLimiting(
return
}

egressCapacity, err := ratelimit.ChooseTokenBucketCapacity(
egressCapacity, err := ratelimit.ChooseLimiterCapacity(
egressBandwidthLimit,
window)

Expand Down
83 changes: 83 additions & 0 deletions internal/ratelimit/limiter_capacity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2023 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ratelimit

import (
"fmt"
"math"
"time"
)

// Choose a limiter capacity that ensures that the action gated by the
// limiter will be limited to within a few percent of `rateHz * window`
// for any window of the given size.
//
// This is not be possible for all rates and windows. In that case, an error
// will be returned.
func ChooseLimiterCapacity(
rateHz float64,
window time.Duration) (capacity uint64, err error) {
// Check that the input is reasonable.
if rateHz <= 0 || math.IsInf(rateHz, 0) {
err = fmt.Errorf("Illegal rate: %f", rateHz)
return
}

if window <= 0 {
err = fmt.Errorf("Illegal window: %v", window)
return
}

// We cannot help but allow the rate to exceed the configured maximum by some
// factor in an arbitrary window, no matter how small we scale the max
// accumulated credit -- the bucket may be full at the start of the window,
// be immediately exhausted, then be repeatedly exhausted just before filling
// throughout the window.
//
// For example: let the window W = 10 seconds, and the bandwidth B = 20 MiB/s.
// Set the max accumulated credit C = W*B/2 = 100 MiB. Then this
// sequence of events is allowed:
//
// * T=0: Allow through 100 MiB.
// * T=4.999999: Allow through nearly 100 MiB.
// * T=9.999999: Allow through nearly 100 MiB.
//
// Above we allow through nearly 300 MiB, exceeding the allowed bytes for the
// window by nearly 50%. Note however that this trend cannot continue into
// the next window, so this must be a transient spike.
//
// In general if we set C <= W*B/N, then we're off by no more than a factor
// of (N+1)/N within any window of size W.
//
// Choose a reasonable N.
const N = 50 // At most 2% error

w := float64(window) / float64(time.Second)
capacityFloat := math.Floor(w * rateHz / N)
if !(capacityFloat >= 1 && capacityFloat < float64(math.MaxUint64)) {
err = fmt.Errorf(
"Can't use a token bucket to limit to %f Hz over a window of %v "+
"(result is a capacity of %f)",
rateHz,
window,
capacityFloat)

return
}

capacity = uint64(capacityFloat)

return
}
96 changes: 96 additions & 0 deletions internal/ratelimit/limiter_capacity_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright 2023 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ratelimit

import (
"fmt"
"testing"
"time"

. "github.com/jacobsa/ogletest"
)

func TestLimiterCapacity(t *testing.T) { RunTests(t) }

////////////////////////////////////////////////////////////////////////
// Boilerplate
////////////////////////////////////////////////////////////////////////

type LimiterCapacityTest struct {
}

func init() { RegisterTestSuite(&LimiterCapacityTest{}) }

func rateLessThanOrEqualToZero(rate float64) {
_, err := ChooseLimiterCapacity(rate, 30)

expectedError := fmt.Errorf("Illegal rate: %f", rate)

AssertEq(expectedError.Error(), err.Error())
}

func (t *LimiterCapacityTest) TestRateLessThanZero() {
var negativeRateHz float64 = -1

rateLessThanOrEqualToZero(negativeRateHz)
}

func (t *LimiterCapacityTest) TestRateEqualToZero() {
var zeroRateHz float64 = 0

rateLessThanOrEqualToZero(zeroRateHz)
}

func windowLessThanOrEqualToZero(window time.Duration) {
_, err := ChooseLimiterCapacity(1, window)

expectedError := fmt.Errorf("Illegal window: %v", window)

AssertEq(expectedError.Error(), err.Error())
}

func (t *LimiterCapacityTest) TestWindowLessThanZero() {
var negativeWindow time.Duration = -1

windowLessThanOrEqualToZero(negativeWindow)
}

func (t *LimiterCapacityTest) TestWindowEqualToZero() {
var zeroWindow time.Duration = 0

windowLessThanOrEqualToZero(zeroWindow)
}

func (t *LimiterCapacityTest) TestCapacityEqualToZero() {
var rate = 0.5
var window time.Duration = 1

capacity, err := ChooseLimiterCapacity(rate, window)

expectedError := fmt.Errorf(
"Can't use a token bucket to limit to %f Hz over a window of %v (result is a capacity of %f)", rate, window, float64(capacity))
AssertEq(expectedError.Error(), err.Error())
}

func (t *LimiterCapacityTest) TestExpectedCapacity() {
var rate float64 = 20
var window = 10 * time.Second

capacity, err := ChooseLimiterCapacity(rate, window)
// capacity = floor((20.0 * 10)/50) = floor(4.0) = 4

ExpectEq(nil, err)
ExpectEq(4, capacity)
}
58 changes: 58 additions & 0 deletions internal/ratelimit/throttle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2023 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ratelimit

import (
"golang.org/x/net/context"
"golang.org/x/time/rate"
)

// A simple interface for limiting the rate of some event. Unlike TokenBucket,
// does not allow the user control over what time means.
//
// Safe for concurrent access.
type Throttle interface {
// Return the maximum number of tokens that can be requested in a call to
// Wait.
Capacity() (c uint64)

// Acquire the given number of tokens from the underlying token bucket, then
// sleep until when it says to wake. If the context is cancelled before then,
// return early with an error.
//
// REQUIRES: tokens <= capacity
Wait(ctx context.Context, tokens uint64) (err error)
}

type limiter struct {
*rate.Limiter
}

func NewThrottle(
rateHz float64,
capacity uint64) (t Throttle) {
t = &limiter{rate.NewLimiter(rate.Limit(rateHz), int(capacity))}
return
}

func (l *limiter) Capacity() (c uint64) {
return uint64(l.Burst())
}

func (l *limiter) Wait(
ctx context.Context,
tokens uint64) (err error) {
return l.WaitN(ctx, int(tokens))
}
Loading

0 comments on commit 4efd86e

Please sign in to comment.