Skip to content

Commit

Permalink
feat: cond with timeout control
Browse files Browse the repository at this point in the history
  • Loading branch information
joway committed Mar 19, 2024
1 parent 21fc7a1 commit 676a5d5
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 0 deletions.
99 changes: 99 additions & 0 deletions lang/channel/cond.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package channel

import (
"context"
"sync/atomic"
"time"
)

var (
_ Cond = (*cond)(nil)
)

type CondOption func(c *cond)

func WithCondTimeout(timeout time.Duration) CondOption {
return func(c *cond) {
c.timeout = timeout
}
}

type Cond interface {
Signal() bool
Broadcast() bool
Wait(ctx context.Context) bool
}

func NewCond(opts ...CondOption) Cond {
return new(cond)
}

type condSignal = chan struct{}

type cond struct {
signal atomic.Value
timeout time.Duration
}

func (c *cond) Signal() bool {
sv := c.signal.Load()
if sv == nil {
return false
}
signal := sv.(condSignal)
select {
case signal <- struct{}{}:
return true
default:
return false
}
}

func (c *cond) Broadcast() bool {
BROADCAST:
sv := c.signal.Load()
if sv == nil {
return false
}
var signal condSignal = nil
if !c.signal.CompareAndSwap(sv, signal) {
goto BROADCAST
}
signal = sv.(condSignal)
select {
case <-signal:
return false
default:
close(signal)
return true
}
}

func (c *cond) Wait(ctx context.Context) bool {
WAIT:
sv := c.signal.Load()
var signal condSignal
if sv == nil {
signal = make(condSignal)
if !c.signal.CompareAndSwap(nil, signal) {
goto WAIT
}
} else {
signal = sv.(condSignal)
}
if c.timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, c.timeout)
defer cancel()
}
if ctx == nil || ctx.Done() == nil {
<-signal
return true
}
select {
case <-signal:
return true
case <-ctx.Done():
return false
}
}
73 changes: 73 additions & 0 deletions lang/channel/cond_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package channel

import (
"context"
"runtime"
"sync/atomic"
"testing"
"time"
)

func TestCond(t *testing.T) {
cd := NewCond()
var finished int32
emptyCtx := context.Background()
cancelCtx, cancelFunc := context.WithCancel(emptyCtx)
for i := 0; i < 10; i++ {
go func(i int) {
if i%2 == 0 {
cd.Wait(emptyCtx)
} else {
cd.Wait(cancelCtx)
}
atomic.AddInt32(&finished, 1)
}(i)
}
time.Sleep(time.Millisecond * 100)
cancelFunc()
for atomic.LoadInt32(&finished) != int32(5) {
runtime.Gosched()
}
cd.Signal()
for atomic.LoadInt32(&finished) != int32(6) {
runtime.Gosched()
}
cd.Signal()
for atomic.LoadInt32(&finished) != int32(7) {
runtime.Gosched()
}
cd.Broadcast()
cd.Signal()
for atomic.LoadInt32(&finished) != int32(10) {
runtime.Gosched()
}
}

func BenchmarkChanCond(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
ch := make(chan struct{})
go func() {
time.Sleep(time.Millisecond)
close(ch)
}()
select {
case <-ch:
case <-time.After(10 * time.Millisecond):
}
}
}

func BenchmarkCond(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
cd := NewCond(WithCondTimeout(10 * time.Millisecond))
go func() {
time.Sleep(time.Millisecond)
cd.Signal()
}()
cd.Wait(context.Background())
}
}

0 comments on commit 676a5d5

Please sign in to comment.