Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

"Lets take a stab in the dark and see if it works to resolve shutdown races" and it seems to work. #1143

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions internal/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package internal

import (
"fmt"
"sync"
"time"

"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -288,7 +287,6 @@ type cancelCtx struct {

done Channel // closed by the first cancel call.

mu sync.Mutex
canceled bool

children map[canceler]bool // set to nil by the first cancel call
Expand All @@ -310,16 +308,12 @@ func (c *cancelCtx) String() string {
// cancel closes c.done, cancels each of c's children, and, if
// removeFromParent is true, removes c from its parent's children.
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
c.mu.Lock()
if c.canceled {
c.mu.Unlock()
// calling cancel from multiple go routines isn't safe
// avoid a data race by only allowing the first call
return
}
c.canceled = true
c.mu.Unlock()

if err == nil {
panic("context: internal error: missing cancel error")
}
Expand Down
31 changes: 31 additions & 0 deletions internal/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,34 @@ func TestContext_RaceRegression(t *testing.T) {
env.ExecuteWorkflow(wf)
assert.NoError(t, env.GetWorkflowError())
}

func TestContext_RaceRegression_2(t *testing.T) {
/*
It's apparently also possible to race on adding children while propagating the cancel to children.
*/
s := WorkflowTestSuite{}
s.SetLogger(zaptest.NewLogger(t))
env := s.NewTestWorkflowEnvironment()
wf := func(ctx Context) error {
ctx, cancel := WithCancel(ctx)
racyCancel := func(ctx Context) {
defer cancel() // defer is necessary as Sleep will never return due to Goexit
defer func() {
_, ccancel := WithCancel(ctx)
cancel()
ccancel()
}()
_ = Sleep(ctx, time.Hour)
}
// start a handful to increase odds of a race being detected
for i := 0; i < 10; i++ {
Go(ctx, racyCancel)
}

_ = Sleep(ctx, time.Minute) // die early
return nil
}
env.RegisterWorkflow(wf)
env.ExecuteWorkflow(wf)
assert.NoError(t, env.GetWorkflowError())
}
39 changes: 22 additions & 17 deletions internal/internal_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,15 @@ type (
unblockFunc func(status string, stackDepth int) (keepBlocked bool)

coroutineState struct {
name string
dispatcher *dispatcherImpl // dispatcher this context belongs to
aboutToBlock chan bool // used to notify dispatcher that coroutine that owns this context is about to block
unblock chan unblockFunc // used to notify coroutine that it should continue executing.
keptBlocked bool // true indicates that coroutine didn't make any progress since the last yield unblocking
closed bool // indicates that owning coroutine has finished execution
blocked atomic.Bool
panicError *workflowPanicError // non nil if coroutine had unhandled panic
name string
dispatcher *dispatcherImpl // dispatcher this context belongs to
aboutToBlock chan bool // used to notify dispatcher that coroutine that owns this context is about to block
unblock chan unblockFunc // used to notify coroutine that it should continue executing.
keptBlocked bool // true indicates that coroutine didn't make any progress since the last yield unblocking
closed bool // indicates that owning coroutine has finished execution
completedShutdown chan struct{} // closed after .closed is set to true, use to wait for shutdown
blocked atomic.Bool
panicError *workflowPanicError // non nil if coroutine had unhandled panic
}

dispatcherImpl struct {
Expand Down Expand Up @@ -596,7 +597,7 @@ func (c *channelImpl) Receive(ctx Context, valuePtr interface{}) (more bool) {
hasResult = false
v, ok, m := c.receiveAsyncImpl(callback)

if !ok && !m { //channel closed and empty
if !ok && !m { // channel closed and empty
return m
}

Expand All @@ -606,7 +607,7 @@ func (c *channelImpl) Receive(ctx Context, valuePtr interface{}) (more bool) {
state.unblocked()
return m
}
continue //corrupt signal. Drop and reset process
continue // corrupt signal. Drop and reset process
}
for {
if hasResult {
Expand All @@ -615,7 +616,7 @@ func (c *channelImpl) Receive(ctx Context, valuePtr interface{}) (more bool) {
state.unblocked()
return more
}
break //Corrupt signal. Drop and reset process.
break // Corrupt signal. Drop and reset process.
}
state.yield(fmt.Sprintf("blocked on %s.Receive", c.name))
}
Expand All @@ -631,7 +632,7 @@ func (c *channelImpl) ReceiveAsync(valuePtr interface{}) (ok bool) {
func (c *channelImpl) ReceiveAsyncWithMoreFlag(valuePtr interface{}) (ok bool, more bool) {
for {
v, ok, more := c.receiveAsyncImpl(nil)
if !ok && !more { //channel closed and empty
if !ok && !more { // channel closed and empty
return ok, more
}

Expand Down Expand Up @@ -774,7 +775,7 @@ func (c *channelImpl) Close() {
// Takes a value and assigns that 'to' value. logs a metric if it is unable to deserialize
func (c *channelImpl) assignValue(from interface{}, to interface{}) error {
err := decodeAndAssignValue(c.dataConverter, from, to)
//add to metrics
// add to metrics
if err != nil {
c.env.GetLogger().Error(fmt.Sprintf("Corrupt signal received on channel %s. Error deserializing", c.name), zap.Error(err))
c.env.GetMetricsScope().Counter(metrics.CorruptedSignalsCounter).Inc(1)
Expand Down Expand Up @@ -840,6 +841,7 @@ func (s *coroutineState) call() {

func (s *coroutineState) close() {
s.closed = true
close(s.completedShutdown)
s.aboutToBlock <- true
}

Expand All @@ -849,6 +851,8 @@ func (s *coroutineState) exit() {
runtime.Goexit()
return true
}
// wait for it
<-s.completedShutdown
}
}

Expand Down Expand Up @@ -887,10 +891,11 @@ func (d *dispatcherImpl) newNamedCoroutine(ctx Context, name string, f func(ctx

func (d *dispatcherImpl) newState(name string) *coroutineState {
c := &coroutineState{
name: name,
dispatcher: d,
aboutToBlock: make(chan bool, 1),
unblock: make(chan unblockFunc),
name: name,
dispatcher: d,
aboutToBlock: make(chan bool, 1),
unblock: make(chan unblockFunc),
completedShutdown: make(chan struct{}),
}
d.sequence++
d.coroutines = append(d.coroutines, c)
Expand Down