Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Handle panics while polling for tasks (#1352)" #1357

Merged
merged 1 commit into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion internal/common/metrics/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ const (
ActivityLocalDispatchFailedCounter = CadenceMetricsPrefix + "activity-local-dispatch-failed"
ActivityLocalDispatchSucceedCounter = CadenceMetricsPrefix + "activity-local-dispatch-succeed"
WorkerPanicCounter = CadenceMetricsPrefix + "worker-panic"
InternalPanicCounter = CadenceMetricsPrefix + "internal-panic"

UnhandledSignalsCounter = CadenceMetricsPrefix + "unhandled-signals"
CorruptedSignalsCounter = CadenceMetricsPrefix + "corrupted-signals"
Expand Down
1 change: 0 additions & 1 deletion internal/internal_logging_tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ const (
tagVisibilityQuery = "VisibilityQuery"
tagPanicError = "PanicError"
tagPanicStack = "PanicStack"
tagPollerType = "PollerType"
causeTag = "pollerrorcause"
tagWorkflowRuntimeLength = "workflowruntimelength"
tagNonDeterminismDetectionType = "NonDeterminismDetectionType"
Expand Down
61 changes: 25 additions & 36 deletions internal/internal_task_pollers.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/pborman/uuid"
"github.com/uber-go/tally"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
s "go.uber.org/cadence/.gen/go/shared"
Expand Down Expand Up @@ -64,9 +63,7 @@ type (

// basePoller is the base class for all poller implementations
basePoller struct {
shutdownC <-chan struct{}
metricsScope *metrics.TaggedScope
logger *zap.Logger
shutdownC <-chan struct{}
}

// workflowTaskPoller implements polling/processing a workflow task
Expand All @@ -78,6 +75,8 @@ type (
service workflowserviceclient.Interface
taskHandler WorkflowTaskHandler
ldaTunnel *locallyDispatchedActivityTunnel
metricsScope *metrics.TaggedScope
logger *zap.Logger

stickyUUID string
disableStickyExecution bool
Expand All @@ -98,6 +97,8 @@ type (
identity string
service workflowserviceclient.Interface
taskHandler ActivityTaskHandler
metricsScope *metrics.TaggedScope
logger *zap.Logger
activitiesPerSecond float64
featureFlags FeatureFlags
}
Expand All @@ -122,8 +123,10 @@ type (

localActivityTaskPoller struct {
basePoller
handler *localActivityTaskHandler
laTunnel *localActivityTunnel
handler *localActivityTaskHandler
metricsScope tally.Scope
logger *zap.Logger
laTunnel *localActivityTunnel
}

localActivityTaskHandler struct {
Expand Down Expand Up @@ -249,28 +252,20 @@ func (bp *basePoller) doPoll(
var err error
var result interface{}

doneC := make(chan struct{})
ctx, cancel, _ := newChannelContext(context.Background(), featureFlags, chanTimeout(pollTaskServiceTimeOut))
defer cancel()

go func() {
defer cancel()
defer func() {
if p := recover(); p != nil {
bp.metricsScope.Counter(metrics.InternalPanicCounter).Inc(1)
st := getStackTraceRaw("base poller [panic]:", 7, 0)
bp.logger.Error("Unhandled panic",
zap.String(tagPanicError, fmt.Sprintf("%v", p)),
zap.String(tagPanicStack, st))
err = newPanicError(p, st)
}
}()
result, err = pollFunc(ctx)
cancel()
close(doneC)
}()

select {
case <-ctx.Done():
case <-doneC:
return result, err
case <-bp.shutdownC:
cancel()
return nil, errShutdown
}
}
Expand All @@ -284,17 +279,15 @@ func newWorkflowTaskPoller(
params workerExecutionParameters,
) *workflowTaskPoller {
return &workflowTaskPoller{
basePoller: basePoller{
shutdownC: params.WorkerStopChannel,
metricsScope: metrics.NewTaggedScope(params.MetricsScope),
logger: params.Logger.With(zapcore.Field{Key: tagPollerType, Type: zapcore.StringType, String: "Workflow"}),
},
basePoller: basePoller{shutdownC: params.WorkerStopChannel},
service: service,
domain: domain,
taskListName: params.TaskList,
identity: params.Identity,
taskHandler: taskHandler,
ldaTunnel: ldaTunnel,
metricsScope: metrics.NewTaggedScope(params.MetricsScope),
logger: params.Logger,
stickyUUID: uuid.New(),
disableStickyExecution: params.DisableStickyExecution,
StickyScheduleToStartTimeout: params.StickyScheduleToStartTimeout,
Expand Down Expand Up @@ -538,13 +531,11 @@ func newLocalActivityPoller(params workerExecutionParameters, laTunnel *localAct
tracer: params.Tracer,
}
return &localActivityTaskPoller{
basePoller: basePoller{
shutdownC: params.WorkerStopChannel,
metricsScope: metrics.NewTaggedScope(params.MetricsScope),
logger: params.Logger.With(zapcore.Field{Key: tagPollerType, Type: zapcore.StringType, String: "LocalActivity"}),
},
handler: handler,
laTunnel: laTunnel,
basePoller: basePoller{shutdownC: params.WorkerStopChannel},
handler: handler,
metricsScope: params.MetricsScope,
logger: params.Logger,
laTunnel: laTunnel,
}
}

Expand Down Expand Up @@ -999,16 +990,14 @@ func newActivityTaskPoller(taskHandler ActivityTaskHandler, service workflowserv
domain string, params workerExecutionParameters) *activityTaskPoller {

activityTaskPoller := &activityTaskPoller{
basePoller: basePoller{
shutdownC: params.WorkerStopChannel,
logger: params.Logger.With(zap.Field{Key: tagPollerType, Type: zapcore.StringType, String: "Activity"}),
metricsScope: metrics.NewTaggedScope(params.MetricsScope),
},
basePoller: basePoller{shutdownC: params.WorkerStopChannel},
taskHandler: taskHandler,
service: service,
domain: domain,
taskListName: params.TaskList,
identity: params.Identity,
logger: params.Logger,
metricsScope: metrics.NewTaggedScope(params.MetricsScope),
activitiesPerSecond: params.TaskListActivitiesPerSecond,
featureFlags: params.FeatureFlags,
}
Expand Down
82 changes: 0 additions & 82 deletions internal/internal_task_pollers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,9 @@ import (
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/yarpc"
"go.uber.org/zap/zaptest"

"go.uber.org/cadence/.gen/go/cadence/workflowservicetest"
"go.uber.org/cadence/.gen/go/shared"
)

func TestLocalActivityPanic(t *testing.T) {
Expand All @@ -59,80 +54,3 @@ func TestLocalActivityPanic(t *testing.T) {
assert.Contains(t, perr.StackTrace(), "panic")
assert.Contains(t, perr.StackTrace(), t.Name(), "should mention the source location of the local activity that panicked")
}

func TestActivityTaskPollerHandlesPanics(t *testing.T) {
ctrl := gomock.NewController(t)
service := workflowservicetest.NewMockClient(ctrl)
service.EXPECT().PollForActivityTask(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(_ context.Context, _ *shared.PollForActivityTaskRequest, opts ...yarpc.CallOption) (*shared.PollForActivityTaskResponse, error) {
panic("oh no")
})
workerStopChannel := make(chan struct{}, 1)
activityPoller := newActivityTaskPoller(nil, service, "test", workerExecutionParameters{
TaskList: "tasklist",

WorkerStopChannel: workerStopChannel,
WorkerOptions: WorkerOptions{
Identity: "identity",
Logger: zaptest.NewLogger(t),
},
})

result, err := activityPoller.PollTask()

assert.Nil(t, result)
var panicErr *PanicError
assert.ErrorAs(t, err, &panicErr)
assert.Equal(t, "oh no", panicErr.value)
}

func TestActivityTaskPollerHandlesCancel(t *testing.T) {
ctrl := gomock.NewController(t)
service := workflowservicetest.NewMockClient(ctrl)
workerStopChannel := make(chan struct{}, 1)
pollBlockingChannel := make(chan struct{}, 1)
defer close(pollBlockingChannel)
activityPoller := newActivityTaskPoller(nil, service, "test", workerExecutionParameters{
TaskList: "tasklist",

WorkerStopChannel: workerStopChannel,
WorkerOptions: WorkerOptions{
Identity: "identity",
Logger: zaptest.NewLogger(t),
},
})
service.EXPECT().PollForActivityTask(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(_ context.Context, _ *shared.PollForActivityTaskRequest, opts ...yarpc.CallOption) (*shared.PollForActivityTaskResponse, error) {
workerStopChannel <- struct{}{}
<-pollBlockingChannel
return nil, nil
})

result, err := activityPoller.PollTask()

assert.Nil(t, result)
assert.ErrorIs(t, err, errShutdown)
}

func TestWorkflowTaskPollerHandlesPanics(t *testing.T) {
ctrl := gomock.NewController(t)
service := workflowservicetest.NewMockClient(ctrl)
service.EXPECT().PollForDecisionTask(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(_ context.Context, _ *shared.PollForDecisionTaskRequest, opts ...yarpc.CallOption) (*shared.PollForDecisionTaskResponse, error) {
panic("oh no")
})
workerStopChannel := make(chan struct{}, 1)
workflowTaskPoller := newWorkflowTaskPoller(nil, nil, service, "test", workerExecutionParameters{
TaskList: "tasklist",

WorkerStopChannel: workerStopChannel,
WorkerOptions: WorkerOptions{
Identity: "identity",
Logger: zaptest.NewLogger(t),
},
})

result, err := workflowTaskPoller.PollTask()

assert.Nil(t, result)
var panicErr *PanicError
assert.ErrorAs(t, err, &panicErr)
assert.Equal(t, "oh no", panicErr.value)
}
Loading