Skip to content

Commit

Permalink
remove unneded interfaces, extend sized group with the same params as…
Browse files Browse the repository at this point in the history
… err group
  • Loading branch information
umputun committed Feb 3, 2019
1 parent ac098f9 commit 72b3cd4
Show file tree
Hide file tree
Showing 10 changed files with 157 additions and 86 deletions.
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
MIT License

Copyright (c) 2018 Umputun
Copyright (c) 2019 Umputun

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
15 changes: 8 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@ Implements `sync.Locker` interface but for given capacity, thread safe. Lock inc

Mix semaphore and WaitGroup to provide sized waiting group. The result is a wait group allowing limited number of goroutine to run in parallel.

The locking happens inside of goroutine, i.e. **every call will be non-blocked**, but some goroutines may wait if semaphore locked. It means - technically it doesn't limit number of goroutines, but rather number of running (active) goroutines.
By default the locking happens inside of goroutine, i.e. **every call will be non-blocked**, but some goroutines may wait if semaphore locked. It means - technically it doesn't limit number of goroutines, but rather number of running (active) goroutines.
In order to block goroutines from even starting use `Preemptive` option (see below).

```go
swg := syncs.NewSizedGroup(5) // wait group with max size=5
for i :=0; i<10; i++ {
swg.Go(fn func(){
doThings() // only 5 of these will run in parallel
swg.Go(fn func(ctx context.Context){
doThings(ctx) // only 5 of these will run in parallel
})
}
swg.Wait()
Expand All @@ -48,17 +49,17 @@ Works the same as errgrp.Group, i.e. returns first error.
Can work as regular errgrp.Group or with early termination.
Thread safe.

Supports both in-goroutine-wait via `NewErrSizedGroup` as well as outside of goroutine wait with `Preemptive()` option. Another options are `TermOnErr` which will skip (won't start) all other goroutines if any error returned, and `Context`.
Supports both in-goroutine-wait via `NewErrSizedGroup` as well as outside of goroutine wait with `Preemptive` option. Another options are `TermOnErr` which will skip (won't start) all other goroutines if any error returned, and `Context` for early termination/timeouts.

Important! With `Preemptive` Go call **can block**. In case if maximum size reached the call will wait till number of running goroutines
dropped under max. This way we not only limiting number of running goroutines but also number of waiting goroutines.


```go
ewg := syncs.NewErrSizedGroup(5, syncs.Preemptive()) // error wait group with max size=5, don't try to start more if any error happened
ewg := syncs.NewErrSizedGroup(5, syncs.Preemptive) // error wait group with max size=5, don't try to start more if any error happened
for i :=0; i<10; i++ {
ewg.Go(fn func() error { // Go here could be blocked if trying to run >5 at the same time
err := doThings() // only 5 of these will run in parallel
ewg.Go(fn func(ctx context.Context) error { // Go here could be blocked if trying to run >5 at the same time
err := doThings(ctx) // only 5 of these will run in parallel
return err
})
}
Expand Down
60 changes: 10 additions & 50 deletions errsizedgroup.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,18 @@
package syncs

import (
"context"
"fmt"
"log"
"strings"
"sync"
)

// ErrSizedGroup is a SizedGroup with error control. Works the same as errgrp.Group, i.e. returns first error.
// Can work as regular errgrp.Group or with early termination. Thread safe.
// ErrSizedGroup interface enforces constructor usage and doesn't allow direct creation of errSizedGroup
type ErrSizedGroup interface {
Go(fn func() error)
Wait() error
}

type errSizedGroup struct {
wg sync.WaitGroup
sema sync.Locker
ctx context.Context
cancel func()
termOnError bool
preLock bool
type ErrSizedGroup struct {
options
wg sync.WaitGroup
sema sync.Locker

err *multierror
errLock sync.RWMutex
Expand All @@ -32,16 +22,15 @@ type errSizedGroup struct {
// NewErrSizedGroup makes wait group with limited size alive goroutines.
// By default all goroutines will be started but will wait inside. For limited number of goroutines use Preemptive() options.
// TermOnErr will skip (won't start) all other goroutines if any error returned.
func NewErrSizedGroup(size int, options ...ESGOption) ErrSizedGroup {
res := errSizedGroup{
func NewErrSizedGroup(size int, options ...GroupOption) *ErrSizedGroup {

res := ErrSizedGroup{
sema: NewSemaphore(size),
err: new(multierror),
}

for _, opt := range options {
if err := opt(&res); err != nil {
log.Printf("[WARN] failed to set cache option, %v", err)
}
opt(&res.options)
}

return &res
Expand All @@ -50,7 +39,7 @@ func NewErrSizedGroup(size int, options ...ESGOption) ErrSizedGroup {
// Go calls the given function in a new goroutine.
// The first call to return a non-nil error cancels the group if termOnError; its error will be
// returned by Wait. If no termOnError all errors will be collected in multierror.
func (g *errSizedGroup) Go(f func() error) {
func (g *ErrSizedGroup) Go(f func() error) {

g.wg.Add(1)

Expand Down Expand Up @@ -97,43 +86,14 @@ func (g *errSizedGroup) Go(f func() error) {

// Wait blocks until all function calls from the Go method have returned, then
// returns the first all errors (if any) wrapped with multierror from them.
func (g *errSizedGroup) Wait() error {
func (g *ErrSizedGroup) Wait() error {
g.wg.Wait()
if g.cancel != nil {
g.cancel()
}
return g.err.errorOrNil()
}

// ESGOption functional option type
type ESGOption func(esg *errSizedGroup) error

// Context passes ctx and makes it cancelable
func Context(ctx context.Context) ESGOption {
return func(esg *errSizedGroup) error {
ctxWithCancel, cancel := context.WithCancel(ctx)
esg.cancel = cancel
esg.ctx = ctxWithCancel
return nil
}
}

// Preemptive sets locking mode preventing spawning waiting goroutine. May cause Go call to block!
func Preemptive() ESGOption {
return func(esg *errSizedGroup) error {
esg.preLock = true
return nil
}
}

// TermOnErr prevents new goroutines to start after first error
func TermOnErr() ESGOption {
return func(esg *errSizedGroup) error {
esg.termOnError = true
return nil
}
}

type multierror struct {
errors []error
lock sync.Mutex
Expand Down
13 changes: 7 additions & 6 deletions errsizedgroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestErrorSizedGroup(t *testing.T) {
for i := 0; i < 1000; i++ {
i := i
ewg.Go(func() error {
time.Sleep(time.Millisecond)
time.Sleep(time.Millisecond * 10)
atomic.AddUint32(&c, 1)
if i == 100 {
return errors.New("err1")
Expand All @@ -31,15 +31,15 @@ func TestErrorSizedGroup(t *testing.T) {
return nil
})
}
assert.True(t, runtime.NumGoroutine() > 900, "goroutines %d", runtime.NumGoroutine())
assert.True(t, runtime.NumGoroutine() > 500, "goroutines %d", runtime.NumGoroutine())

err := ewg.Wait()
assert.True(t, strings.HasPrefix(err.Error(), "2 error(s) occurred:"))
assert.Equal(t, uint32(1000), c, fmt.Sprintf("%d, not all routines have been executed.", c))
}

func TestErrorSizedGroupPreGo(t *testing.T) {
ewg := NewErrSizedGroup(10, Preemptive())
func TestErrorSizedGroupPreemptive(t *testing.T) {
ewg := NewErrSizedGroup(10, Preemptive)
var c uint32

for i := 0; i < 1000; i++ {
Expand All @@ -58,6 +58,7 @@ func TestErrorSizedGroupPreGo(t *testing.T) {
})
}

assert.True(t, runtime.NumGoroutine() <= 20, "goroutines %d", runtime.NumGoroutine())
err := ewg.Wait()
assert.True(t, strings.HasPrefix(err.Error(), "2 error(s) occurred:"))
assert.Equal(t, uint32(1000), c, fmt.Sprintf("%d, not all routines have been executed.", c))
Expand All @@ -80,7 +81,7 @@ func TestErrorSizedGroupNoError(t *testing.T) {
}

func TestErrorSizedGroupTerm(t *testing.T) {
ewg := NewErrSizedGroup(10, TermOnErr())
ewg := NewErrSizedGroup(10, TermOnErr)
var c uint32

for i := 0; i < 1000; i++ {
Expand All @@ -100,7 +101,7 @@ func TestErrorSizedGroupTerm(t *testing.T) {
}

// illustrates the use of a SizedGroup for concurrent, limited execution of goroutines.
func ExampleErrorSizedGroup_go() {
func ExampleErrSizedGroup_go() {

// create sized waiting group allowing maximum 10 goroutines
grp := NewErrSizedGroup(10)
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module github.com/go-pkgz/syncs

require github.com/stretchr/testify v1.3.0
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
30 changes: 30 additions & 0 deletions group_options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package syncs

import "context"

type options struct {
ctx context.Context
cancel context.CancelFunc
preLock bool
termOnError bool
}

// GroupOption functional option type
type GroupOption func(o *options)

// Context passes ctx and makes it cancelable
func Context(ctx context.Context) GroupOption {
return func(o *options) {
o.ctx, o.cancel = context.WithCancel(ctx)
}
}

// Preemptive sets locking mode preventing spawning waiting goroutine. May cause Go call to block!
func Preemptive(o *options) {
o.preLock = true
}

// TermOnErr prevents new goroutines to start after first error
func TermOnErr(o *options) {
o.termOnError = true
}
14 changes: 8 additions & 6 deletions semaphore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,32 @@ package syncs

import (
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestSemaphore(t *testing.T) {
var after3Locks, after4Locks bool
var locks int32
var sema sync.Locker
go func() {
sema = NewSemaphore(3)
sema.Lock()
atomic.AddInt32(&locks, 1)
sema.Lock()
atomic.AddInt32(&locks, 1)
sema.Lock()
after3Locks = true
atomic.AddInt32(&locks, 1)
sema.Lock()
after4Locks = true
atomic.AddInt32(&locks, 1)
}()

time.Sleep(100 * time.Millisecond)
assert.True(t, after3Locks, "3 locks ok")
assert.False(t, after4Locks, "4 locks should not be able to pass")
assert.Equal(t, int32(3), atomic.LoadInt32(&locks), "3 locks ok, hangs on 4th")

sema.Unlock()
time.Sleep(100 * time.Millisecond)
assert.True(t, after4Locks, "4 locks ok")
assert.Equal(t, int32(4), atomic.LoadInt32(&locks), "4 locks should happen")
}
55 changes: 42 additions & 13 deletions sizedgroup.go
Original file line number Diff line number Diff line change
@@ -1,41 +1,70 @@
package syncs

import "sync"
import (
"context"
"sync"
)

// SizedGroup has the same role as WaitingGroup but adds a limit of the amount of goroutines started concurrently.
// Uses similar Go() scheduling as errgrp.Group, thread safe.
// SizedGroup interface enforces constructor usage and doesn't allow direct creation of sizedGroup
type SizedGroup interface {
Go(fn func())
Wait()
}

type sizedGroup struct {
type SizedGroup struct {
options
wg sync.WaitGroup
sema sync.Locker
}

// NewSizedGroup makes wait group with limited size alive goroutines
func NewSizedGroup(size int) SizedGroup {
return &sizedGroup{sema: NewSemaphore(size)}
func NewSizedGroup(size int, opts ...GroupOption) *SizedGroup {
res := SizedGroup{sema: NewSemaphore(size)}
res.options.ctx = context.Background()
for _, opt := range opts {
opt(&res.options)
}
return &res
}

// Go calls the given function in a new goroutine.
// Every call will be unblocked, but some goroutines may wait if semaphore locked.
func (g *sizedGroup) Go(fn func()) {
func (g *SizedGroup) Go(fn func(ctx context.Context)) {

canceled := func() bool {
select {
case <-g.ctx.Done():
return true
default:
return false
}
}

if canceled() {
return
}

g.wg.Add(1)

if g.preLock {
g.sema.Lock()
}

go func() {
defer g.wg.Done()

g.sema.Lock()
fn()
if canceled() {
return
}

if !g.preLock {
g.sema.Lock()
}

fn(g.ctx)
g.sema.Unlock()
}()
}

// Wait blocks until the SizedGroup counter is zero.
// See sync.WaitGroup documentation for more information.
func (g *sizedGroup) Wait() {
func (g *SizedGroup) Wait() {
g.wg.Wait()
}
Loading

0 comments on commit 72b3cd4

Please sign in to comment.