From 72b3cd427a3495479f72a3a06c8949e39485dfc0 Mon Sep 17 00:00:00 2001 From: Umputun Date: Sat, 2 Feb 2019 22:05:20 -0600 Subject: [PATCH] remove unneded interfaces, extend sized group with the same params as err group --- LICENSE | 2 +- README.md | 15 ++++++----- errsizedgroup.go | 60 ++++++++----------------------------------- errsizedgroup_test.go | 13 +++++----- go.mod | 3 +++ go.sum | 7 +++++ group_options.go | 30 ++++++++++++++++++++++ semaphore_test.go | 14 +++++----- sizedgroup.go | 55 +++++++++++++++++++++++++++++---------- sizedgroup_test.go | 44 ++++++++++++++++++++++++++++--- 10 files changed, 157 insertions(+), 86 deletions(-) create mode 100644 go.mod create mode 100644 go.sum create mode 100644 group_options.go diff --git a/LICENSE b/LICENSE index ca12521..ac54025 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2018 Umputun +Copyright (c) 2019 Umputun Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/README.md b/README.md index 3acf338..9599cb9 100644 --- a/README.md +++ b/README.md @@ -29,13 +29,14 @@ Implements `sync.Locker` interface but for given capacity, thread safe. Lock inc Mix semaphore and WaitGroup to provide sized waiting group. The result is a wait group allowing limited number of goroutine to run in parallel. -The locking happens inside of goroutine, i.e. **every call will be non-blocked**, but some goroutines may wait if semaphore locked. It means - technically it doesn't limit number of goroutines, but rather number of running (active) goroutines. +By default the locking happens inside of goroutine, i.e. **every call will be non-blocked**, but some goroutines may wait if semaphore locked. It means - technically it doesn't limit number of goroutines, but rather number of running (active) goroutines. +In order to block goroutines from even starting use `Preemptive` option (see below). ```go swg := syncs.NewSizedGroup(5) // wait group with max size=5 for i :=0; i<10; i++ { - swg.Go(fn func(){ - doThings() // only 5 of these will run in parallel + swg.Go(fn func(ctx context.Context){ + doThings(ctx) // only 5 of these will run in parallel }) } swg.Wait() @@ -48,17 +49,17 @@ Works the same as errgrp.Group, i.e. returns first error. Can work as regular errgrp.Group or with early termination. Thread safe. -Supports both in-goroutine-wait via `NewErrSizedGroup` as well as outside of goroutine wait with `Preemptive()` option. Another options are `TermOnErr` which will skip (won't start) all other goroutines if any error returned, and `Context`. +Supports both in-goroutine-wait via `NewErrSizedGroup` as well as outside of goroutine wait with `Preemptive` option. Another options are `TermOnErr` which will skip (won't start) all other goroutines if any error returned, and `Context` for early termination/timeouts. Important! With `Preemptive` Go call **can block**. In case if maximum size reached the call will wait till number of running goroutines dropped under max. This way we not only limiting number of running goroutines but also number of waiting goroutines. ```go - ewg := syncs.NewErrSizedGroup(5, syncs.Preemptive()) // error wait group with max size=5, don't try to start more if any error happened + ewg := syncs.NewErrSizedGroup(5, syncs.Preemptive) // error wait group with max size=5, don't try to start more if any error happened for i :=0; i<10; i++ { - ewg.Go(fn func() error { // Go here could be blocked if trying to run >5 at the same time - err := doThings() // only 5 of these will run in parallel + ewg.Go(fn func(ctx context.Context) error { // Go here could be blocked if trying to run >5 at the same time + err := doThings(ctx) // only 5 of these will run in parallel return err }) } diff --git a/errsizedgroup.go b/errsizedgroup.go index bbe02ea..356b050 100644 --- a/errsizedgroup.go +++ b/errsizedgroup.go @@ -1,9 +1,7 @@ package syncs import ( - "context" "fmt" - "log" "strings" "sync" ) @@ -11,18 +9,10 @@ import ( // ErrSizedGroup is a SizedGroup with error control. Works the same as errgrp.Group, i.e. returns first error. // Can work as regular errgrp.Group or with early termination. Thread safe. // ErrSizedGroup interface enforces constructor usage and doesn't allow direct creation of errSizedGroup -type ErrSizedGroup interface { - Go(fn func() error) - Wait() error -} - -type errSizedGroup struct { - wg sync.WaitGroup - sema sync.Locker - ctx context.Context - cancel func() - termOnError bool - preLock bool +type ErrSizedGroup struct { + options + wg sync.WaitGroup + sema sync.Locker err *multierror errLock sync.RWMutex @@ -32,16 +22,15 @@ type errSizedGroup struct { // NewErrSizedGroup makes wait group with limited size alive goroutines. // By default all goroutines will be started but will wait inside. For limited number of goroutines use Preemptive() options. // TermOnErr will skip (won't start) all other goroutines if any error returned. -func NewErrSizedGroup(size int, options ...ESGOption) ErrSizedGroup { - res := errSizedGroup{ +func NewErrSizedGroup(size int, options ...GroupOption) *ErrSizedGroup { + + res := ErrSizedGroup{ sema: NewSemaphore(size), err: new(multierror), } for _, opt := range options { - if err := opt(&res); err != nil { - log.Printf("[WARN] failed to set cache option, %v", err) - } + opt(&res.options) } return &res @@ -50,7 +39,7 @@ func NewErrSizedGroup(size int, options ...ESGOption) ErrSizedGroup { // Go calls the given function in a new goroutine. // The first call to return a non-nil error cancels the group if termOnError; its error will be // returned by Wait. If no termOnError all errors will be collected in multierror. -func (g *errSizedGroup) Go(f func() error) { +func (g *ErrSizedGroup) Go(f func() error) { g.wg.Add(1) @@ -97,7 +86,7 @@ func (g *errSizedGroup) Go(f func() error) { // Wait blocks until all function calls from the Go method have returned, then // returns the first all errors (if any) wrapped with multierror from them. -func (g *errSizedGroup) Wait() error { +func (g *ErrSizedGroup) Wait() error { g.wg.Wait() if g.cancel != nil { g.cancel() @@ -105,35 +94,6 @@ func (g *errSizedGroup) Wait() error { return g.err.errorOrNil() } -// ESGOption functional option type -type ESGOption func(esg *errSizedGroup) error - -// Context passes ctx and makes it cancelable -func Context(ctx context.Context) ESGOption { - return func(esg *errSizedGroup) error { - ctxWithCancel, cancel := context.WithCancel(ctx) - esg.cancel = cancel - esg.ctx = ctxWithCancel - return nil - } -} - -// Preemptive sets locking mode preventing spawning waiting goroutine. May cause Go call to block! -func Preemptive() ESGOption { - return func(esg *errSizedGroup) error { - esg.preLock = true - return nil - } -} - -// TermOnErr prevents new goroutines to start after first error -func TermOnErr() ESGOption { - return func(esg *errSizedGroup) error { - esg.termOnError = true - return nil - } -} - type multierror struct { errors []error lock sync.Mutex diff --git a/errsizedgroup_test.go b/errsizedgroup_test.go index f705ea8..d28a6f7 100644 --- a/errsizedgroup_test.go +++ b/errsizedgroup_test.go @@ -20,7 +20,7 @@ func TestErrorSizedGroup(t *testing.T) { for i := 0; i < 1000; i++ { i := i ewg.Go(func() error { - time.Sleep(time.Millisecond) + time.Sleep(time.Millisecond * 10) atomic.AddUint32(&c, 1) if i == 100 { return errors.New("err1") @@ -31,15 +31,15 @@ func TestErrorSizedGroup(t *testing.T) { return nil }) } - assert.True(t, runtime.NumGoroutine() > 900, "goroutines %d", runtime.NumGoroutine()) + assert.True(t, runtime.NumGoroutine() > 500, "goroutines %d", runtime.NumGoroutine()) err := ewg.Wait() assert.True(t, strings.HasPrefix(err.Error(), "2 error(s) occurred:")) assert.Equal(t, uint32(1000), c, fmt.Sprintf("%d, not all routines have been executed.", c)) } -func TestErrorSizedGroupPreGo(t *testing.T) { - ewg := NewErrSizedGroup(10, Preemptive()) +func TestErrorSizedGroupPreemptive(t *testing.T) { + ewg := NewErrSizedGroup(10, Preemptive) var c uint32 for i := 0; i < 1000; i++ { @@ -58,6 +58,7 @@ func TestErrorSizedGroupPreGo(t *testing.T) { }) } + assert.True(t, runtime.NumGoroutine() <= 20, "goroutines %d", runtime.NumGoroutine()) err := ewg.Wait() assert.True(t, strings.HasPrefix(err.Error(), "2 error(s) occurred:")) assert.Equal(t, uint32(1000), c, fmt.Sprintf("%d, not all routines have been executed.", c)) @@ -80,7 +81,7 @@ func TestErrorSizedGroupNoError(t *testing.T) { } func TestErrorSizedGroupTerm(t *testing.T) { - ewg := NewErrSizedGroup(10, TermOnErr()) + ewg := NewErrSizedGroup(10, TermOnErr) var c uint32 for i := 0; i < 1000; i++ { @@ -100,7 +101,7 @@ func TestErrorSizedGroupTerm(t *testing.T) { } // illustrates the use of a SizedGroup for concurrent, limited execution of goroutines. -func ExampleErrorSizedGroup_go() { +func ExampleErrSizedGroup_go() { // create sized waiting group allowing maximum 10 goroutines grp := NewErrSizedGroup(10) diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..20a3a4d --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module github.com/go-pkgz/syncs + +require github.com/stretchr/testify v1.3.0 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..4347755 --- /dev/null +++ b/go.sum @@ -0,0 +1,7 @@ +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= diff --git a/group_options.go b/group_options.go new file mode 100644 index 0000000..8f27643 --- /dev/null +++ b/group_options.go @@ -0,0 +1,30 @@ +package syncs + +import "context" + +type options struct { + ctx context.Context + cancel context.CancelFunc + preLock bool + termOnError bool +} + +// GroupOption functional option type +type GroupOption func(o *options) + +// Context passes ctx and makes it cancelable +func Context(ctx context.Context) GroupOption { + return func(o *options) { + o.ctx, o.cancel = context.WithCancel(ctx) + } +} + +// Preemptive sets locking mode preventing spawning waiting goroutine. May cause Go call to block! +func Preemptive(o *options) { + o.preLock = true +} + +// TermOnErr prevents new goroutines to start after first error +func TermOnErr(o *options) { + o.termOnError = true +} diff --git a/semaphore_test.go b/semaphore_test.go index 4cf3253..1b6ddd0 100644 --- a/semaphore_test.go +++ b/semaphore_test.go @@ -2,6 +2,7 @@ package syncs import ( "sync" + "sync/atomic" "testing" "time" @@ -9,23 +10,24 @@ import ( ) func TestSemaphore(t *testing.T) { - var after3Locks, after4Locks bool + var locks int32 var sema sync.Locker go func() { sema = NewSemaphore(3) sema.Lock() + atomic.AddInt32(&locks, 1) sema.Lock() + atomic.AddInt32(&locks, 1) sema.Lock() - after3Locks = true + atomic.AddInt32(&locks, 1) sema.Lock() - after4Locks = true + atomic.AddInt32(&locks, 1) }() time.Sleep(100 * time.Millisecond) - assert.True(t, after3Locks, "3 locks ok") - assert.False(t, after4Locks, "4 locks should not be able to pass") + assert.Equal(t, int32(3), atomic.LoadInt32(&locks), "3 locks ok, hangs on 4th") sema.Unlock() time.Sleep(100 * time.Millisecond) - assert.True(t, after4Locks, "4 locks ok") + assert.Equal(t, int32(4), atomic.LoadInt32(&locks), "4 locks should happen") } diff --git a/sizedgroup.go b/sizedgroup.go index 27e57c9..66b09be 100644 --- a/sizedgroup.go +++ b/sizedgroup.go @@ -1,41 +1,70 @@ package syncs -import "sync" +import ( + "context" + "sync" +) // SizedGroup has the same role as WaitingGroup but adds a limit of the amount of goroutines started concurrently. // Uses similar Go() scheduling as errgrp.Group, thread safe. // SizedGroup interface enforces constructor usage and doesn't allow direct creation of sizedGroup -type SizedGroup interface { - Go(fn func()) - Wait() -} - -type sizedGroup struct { +type SizedGroup struct { + options wg sync.WaitGroup sema sync.Locker } // NewSizedGroup makes wait group with limited size alive goroutines -func NewSizedGroup(size int) SizedGroup { - return &sizedGroup{sema: NewSemaphore(size)} +func NewSizedGroup(size int, opts ...GroupOption) *SizedGroup { + res := SizedGroup{sema: NewSemaphore(size)} + res.options.ctx = context.Background() + for _, opt := range opts { + opt(&res.options) + } + return &res } // Go calls the given function in a new goroutine. // Every call will be unblocked, but some goroutines may wait if semaphore locked. -func (g *sizedGroup) Go(fn func()) { +func (g *SizedGroup) Go(fn func(ctx context.Context)) { + + canceled := func() bool { + select { + case <-g.ctx.Done(): + return true + default: + return false + } + } + + if canceled() { + return + } + g.wg.Add(1) + if g.preLock { + g.sema.Lock() + } + go func() { defer g.wg.Done() - g.sema.Lock() - fn() + if canceled() { + return + } + + if !g.preLock { + g.sema.Lock() + } + + fn(g.ctx) g.sema.Unlock() }() } // Wait blocks until the SizedGroup counter is zero. // See sync.WaitGroup documentation for more information. -func (g *sizedGroup) Wait() { +func (g *SizedGroup) Wait() { g.wg.Wait() } diff --git a/sizedgroup_test.go b/sizedgroup_test.go index c16965c..9f27480 100644 --- a/sizedgroup_test.go +++ b/sizedgroup_test.go @@ -1,8 +1,10 @@ package syncs import ( + "context" "fmt" "log" + "runtime" "sync/atomic" "testing" "time" @@ -15,13 +17,49 @@ func TestSizedGroup(t *testing.T) { var c uint32 for i := 0; i < 1000; i++ { - swg.Go(func() { + swg.Go(func(ctx context.Context) { + time.Sleep(5 * time.Millisecond) atomic.AddUint32(&c, 1) }) } + assert.True(t, runtime.NumGoroutine() > 500, "goroutines %d", runtime.NumGoroutine()) + swg.Wait() + assert.Equal(t, uint32(1000), c, fmt.Sprintf("%d, not all routines have been executed", c)) +} + +func TestSizedGroup_Preemptive(t *testing.T) { + swg := NewSizedGroup(10, Preemptive) + var c uint32 + for i := 0; i < 100; i++ { + swg.Go(func(ctx context.Context) { + time.Sleep(5 * time.Millisecond) + atomic.AddUint32(&c, 1) + }) + } + assert.True(t, runtime.NumGoroutine() < 15, "goroutines %d", runtime.NumGoroutine()) + swg.Wait() + assert.Equal(t, uint32(100), c, fmt.Sprintf("%d, not all routines have been executed", c)) +} + +func TestSizedGroup_Canceled(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) + defer cancel() + swg := NewSizedGroup(10, Preemptive, Context(ctx)) + var c uint32 + + for i := 0; i < 100; i++ { + swg.Go(func(ctx context.Context) { + select { + case <-ctx.Done(): + return + case <-time.After(5 * time.Millisecond): + } + atomic.AddUint32(&c, 1) + }) + } swg.Wait() - assert.Equal(t, uint32(1000), c, fmt.Sprintf("%d, not all routines have been executed.", c)) + assert.True(t, c < 100) } // illustrates the use of a SizedGroup for concurrent, limited execution of goroutines. @@ -31,7 +69,7 @@ func ExampleSizedGroup_go() { var c uint32 for i := 0; i < 1000; i++ { - grp.Go(func() { // Go call is non-blocking, like regular go statement + grp.Go(func(ctx context.Context) { // Go call is non-blocking, like regular go statement // do some work in 10 goroutines in parallel atomic.AddUint32(&c, 1) time.Sleep(10 * time.Millisecond)