Skip to content

Commit

Permalink
Merge pull request #4 from un000/rate-limiter-and-leaky-bucket
Browse files Browse the repository at this point in the history
Rate limiter and leaky bucket
  • Loading branch information
un000 authored Jun 3, 2019
2 parents f8be968 + 236a4d2 commit 34ae62e
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 4 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ go get github.com/un000/tailor
## TODO
- [x] Better Test Code Coverage
- [ ] Benchmarks
- [ ] Rate limiter + Leaky Bucket
- [x] Rate limiter + Leaky Bucket

## Example
```
Expand Down
17 changes: 17 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ type options struct {
bufioReaderPoolSize int
pollerTimeout time.Duration
updateLagInterval time.Duration

rateLimiter RateLimiter
leakyBucket bool
}

// withDefaultOptions sets the initial options.
Expand Down Expand Up @@ -75,3 +78,17 @@ func WithUpdateLagInterval(duration time.Duration) Option {
options.updateLagInterval = duration
}
}

// WithRateLimiter is used to rate limit output lines. Watch RateLimiter interface.
func WithRateLimiter(rl RateLimiter) Option {
return func(options *options) {
options.rateLimiter = rl
}
}

// WithLeakyBucket is used to skip a read lines, when a listener is full.
func WithLeakyBucket() Option {
return func(options *options) {
options.leakyBucket = true
}
}
14 changes: 14 additions & 0 deletions rate_limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright 2019 Yegor Myskin. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package tailor

// RateLimiter provides methods to create a custom rate limiter.
type RateLimiter interface {
// Allow says that a line should be sent to a receiver of a lines.
Allow() bool

// Close finalizes rate limiter.
Close()
}
26 changes: 26 additions & 0 deletions rate_limiter_channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package tailor

import (
"time"
)

type ChannelBasedRateLimiter struct {
t *time.Ticker
}

// NewChannelBasedRateLimiter creates an instance of rate limiter, which ticker ticks every period to limit the lps.
func NewChannelBasedRateLimiter(lps int) *ChannelBasedRateLimiter {
return &ChannelBasedRateLimiter{
t: time.NewTicker(time.Second / time.Duration(lps)),
}
}

// Allow will block until the ticker ticks.
func (rl *ChannelBasedRateLimiter) Allow() bool {
_, ok := <-rl.t.C
return ok
}

func (rl *ChannelBasedRateLimiter) Close() {
rl.t.Stop()
}
27 changes: 27 additions & 0 deletions rate_limiter_channel_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright 2019 Yegor Myskin. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package tailor

import (
"testing"
"time"
)

func TestChannelBasedRateLimiterDisallow(t *testing.T) {
l := NewChannelBasedRateLimiter(30)
defer l.Close()

start := time.Now()
for i := 0; i < 100; i++ {
if !l.Allow() {
t.FailNow()
}
}
dur := time.Since(start)

if dur < 3333*time.Millisecond || dur > 3500*time.Millisecond {
t.Errorf("expected duration: ~3.33s, actual: %s", dur)
}
}
4 changes: 4 additions & 0 deletions seeker_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
// Copyright 2019 Yegor Myskin. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package tailor

import (
Expand Down
24 changes: 21 additions & 3 deletions tailor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Tailor struct {

opts options

// stats
lastPos int64
lastSize int64
lag int64
Expand Down Expand Up @@ -90,6 +91,11 @@ func (t *Tailor) readLoop(ctx context.Context) {
}
close(t.lines)
close(t.errs)

if t.opts.rateLimiter != nil {
t.opts.rateLimiter.Close()
}

atomic.StoreInt32(&t.working, 0)
}()

Expand Down Expand Up @@ -183,9 +189,21 @@ func (t *Tailor) readLoop(ctx context.Context) {

pollerTimeout = t.opts.pollerTimeout

t.lines <- Line{
line: line,
fileName: t.fileName,
if t.opts.rateLimiter == nil || t.opts.rateLimiter.Allow() {
line := Line{
line: line,
fileName: t.fileName,
}

if t.opts.leakyBucket {
select {
case t.lines <- line:
default:
}
continue
}

t.lines <- line
}
}
}()
Expand Down

0 comments on commit 34ae62e

Please sign in to comment.