From d28b329f9429c7c9eff54372fa117e6370821713 Mon Sep 17 00:00:00 2001 From: wangzhuowei Date: Tue, 19 Mar 2024 19:59:56 +0800 Subject: [PATCH] feat: add signal --- lang/channel/singal.go | 63 +++++++++++++++++++++++++++++++++++++ lang/channel/singal_test.go | 47 +++++++++++++++++++++++++++ 2 files changed, 110 insertions(+) create mode 100644 lang/channel/singal.go create mode 100644 lang/channel/singal_test.go diff --git a/lang/channel/singal.go b/lang/channel/singal.go new file mode 100644 index 00000000..561c20f5 --- /dev/null +++ b/lang/channel/singal.go @@ -0,0 +1,63 @@ +package channel + +import ( + "context" + "time" +) + +var ( + _ Signal = (*sigal)(nil) +) + +type Signal interface { + Signal() + Wait(ctx context.Context) bool +} + +type SignalOption func(c *sigal) + +func WithSinalTimeout(timeout time.Duration) SignalOption { + return func(s *sigal) { + s.timeout = timeout + } +} + +func NewSignal(opts ...SignalOption) Signal { + sg := new(sigal) + for _, opt := range opts { + opt(sg) + } + sg.trigger = make(chan struct{}) + return sg +} + +type sigal struct { + trigger chan struct{} + timeout time.Duration +} + +func (s *sigal) Signal() { + select { + case <-s.trigger: + default: + close(s.trigger) + } +} + +func (s *sigal) Wait(ctx context.Context) bool { + if s.timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, s.timeout) + defer cancel() + } + if ctx == nil || ctx.Done() == nil { + <-s.trigger + return true + } + select { + case <-s.trigger: + return true + case <-ctx.Done(): + return false + } +} diff --git a/lang/channel/singal_test.go b/lang/channel/singal_test.go new file mode 100644 index 00000000..1612bf38 --- /dev/null +++ b/lang/channel/singal_test.go @@ -0,0 +1,47 @@ +package channel + +import ( + "context" + "runtime" + "sync/atomic" + "testing" + "time" +) + +func TestSignal(t *testing.T) { + sg := NewSignal() + var finished int32 + emptyCtx := context.Background() + cancelCtx, cancelFunc := context.WithCancel(emptyCtx) + for i := 0; i < 10; i++ { + go func(i int) { + if i%2 == 0 { + sg.Wait(emptyCtx) + } else { + sg.Wait(cancelCtx) + } + atomic.AddInt32(&finished, 1) + }(i) + } + time.Sleep(time.Millisecond * 100) + cancelFunc() + for atomic.LoadInt32(&finished) != int32(5) { + runtime.Gosched() + } + sg.Signal() + for atomic.LoadInt32(&finished) != int32(10) { + runtime.Gosched() + } +} + +func TestSignalTimeout(t *testing.T) { + sg := NewSignal(WithSinalTimeout(time.Millisecond * 200)) + go func() { + time.Sleep(time.Millisecond * 500) + sg.Signal() + }() + begin := time.Now() + sg.Wait(context.Background()) + cost := time.Since(begin) + t.Logf("cost=%dms", cost.Milliseconds()) +}