From d7b426611cf4b9c832778abdd6e638d585b4132d Mon Sep 17 00:00:00 2001 From: Ketsia Date: Wed, 28 Aug 2024 10:43:31 +0200 Subject: [PATCH] add activity info to activity tracker --- debug/{interfaces.go => types.go} | 8 +++++++ .../common/debug/{interfaces.go => types.go} | 21 ++++++++++++++++++- internal/common/debug/workerstats_noop.go | 12 +++++++++++ internal/internal_task_handlers.go | 11 +++++++++- internal/internal_task_handlers_test.go | 1 + internal/internal_task_pollers.go | 10 +++++++++ internal/internal_worker.go | 4 ++++ internal/internal_worker_test.go | 11 ++++++++-- internal/internal_workflow_testsuite.go | 11 ++++++---- 9 files changed, 81 insertions(+), 8 deletions(-) rename debug/{interfaces.go => types.go} (86%) rename internal/common/debug/{interfaces.go => types.go} (79%) diff --git a/debug/interfaces.go b/debug/types.go similarity index 86% rename from debug/interfaces.go rename to debug/types.go index 6398334cb..fa25eb676 100644 --- a/debug/interfaces.go +++ b/debug/types.go @@ -39,4 +39,12 @@ type ( // stats on the Worker for debugging purposes. // Deprecated: in development and very likely to change WorkerStats = internal.WorkerStats + + // ActivityTracker is a worker option to track executing activities on a worker + // Deprecated: in development and very likely to change + ActivityTracker = internal.ActivityTracker + + // ActivityInfo ... + // Deprecated: in development and very likely to change + ActivityInfo = internal.ActivityInfo ) diff --git a/internal/common/debug/interfaces.go b/internal/common/debug/types.go similarity index 79% rename from internal/common/debug/interfaces.go rename to internal/common/debug/types.go index c07ba2277..0ea5080f8 100644 --- a/internal/common/debug/interfaces.go +++ b/internal/common/debug/types.go @@ -43,6 +43,25 @@ type ( // stats on the Worker for debugging purposes. // Deprecated: in development and very likely to change WorkerStats struct { - PollerTracker PollerTracker + PollerTracker PollerTracker + ActivityTracker ActivityTracker + } + + // ActivityInfo ... + // Deprecated: in development and very likely to change + ActivityInfo struct { + WorkflowID string + RunID string + TaskList string + ActivityType string + } + + // ActivityTracker is a worker option to track executing activities on a worker + // Deprecated: in development and very likely to change + ActivityTracker interface { + // Start ... + Start(info ActivityInfo) Stopper + // Stats ... + Stats() } ) diff --git a/internal/common/debug/workerstats_noop.go b/internal/common/debug/workerstats_noop.go index da96bd6e7..9aa6bd888 100644 --- a/internal/common/debug/workerstats_noop.go +++ b/internal/common/debug/workerstats_noop.go @@ -25,6 +25,8 @@ type ( pollerTrackerNoopImpl struct{} // stopperNoopImpl implements the Stopper interface stopperNoopImpl struct{} + // activityTrackerNoopImpl implements the ActivityTracker interface + activityTrackerNoopImpl struct{} ) func (lc *pollerTrackerNoopImpl) Start() Stopper { return &stopperNoopImpl{} } @@ -33,3 +35,13 @@ func (r *stopperNoopImpl) Stop() {} // NewNoopPollerTracker creates a new PollerTracker instance func NewNoopPollerTracker() PollerTracker { return &pollerTrackerNoopImpl{} } + +func (at *activityTrackerNoopImpl) Start(info ActivityInfo) Stopper { return &stopperNoopImpl{} } +func (at *activityTrackerNoopImpl) Stats() {} + +// NewNoopActivityTracker creates a new PollerTracker instance +func NewNoopActivityTracker() ActivityTracker { return &activityTrackerNoopImpl{} } + +var _ PollerTracker = &pollerTrackerNoopImpl{} +var _ Stopper = &stopperNoopImpl{} +var _ ActivityTracker = &activityTrackerNoopImpl{} diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index c872b87e0..316c2e54d 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -27,6 +27,7 @@ import ( "context" "errors" "fmt" + "go.uber.org/cadence/internal/common/debug" "math" "reflect" "strings" @@ -152,6 +153,7 @@ type ( contextPropagators []ContextPropagator tracer opentracing.Tracer featureFlags FeatureFlags + activityTracker debug.ActivityTracker } // history wrapper method to help information about events. @@ -1417,6 +1419,7 @@ func newActivityTaskHandlerWithCustomProvider( contextPropagators: params.ContextPropagators, tracer: params.Tracer, featureFlags: params.FeatureFlags, + activityTracker: params.WorkerStats.ActivityTracker, } } @@ -1710,7 +1713,13 @@ func (ath *activityTaskHandlerImpl) Execute(taskList string, t *s.PollForActivit } }() } - + debugInfo := debug.ActivityInfo{ + WorkflowID: *t.WorkflowExecution.WorkflowId, + RunID: *t.WorkflowExecution.RunId, + TaskList: ath.taskListName, + ActivityType: activityType, + } + defer ath.activityTracker.Start(debugInfo).Stop() output, err := activityImplementation.Execute(ctx, t.Input) dlCancelFunc() diff --git a/internal/internal_task_handlers_test.go b/internal/internal_task_handlers_test.go index 2fe3a5742..63b813ed9 100644 --- a/internal/internal_task_handlers_test.go +++ b/internal/internal_task_handlers_test.go @@ -1542,6 +1542,7 @@ func (t *TaskHandlersTestSuite) TestActivityExecutionDeadline() { Tracer: opentracing.NoopTracer{}, }, } + ensureRequiredParams(&wep) activityHandler := newActivityTaskHandler(mockService, wep, registry) pats := &s.PollForActivityTaskResponse{ TaskToken: []byte("token"), diff --git a/internal/internal_task_pollers.go b/internal/internal_task_pollers.go index 37c2f3f54..956eb13ca 100644 --- a/internal/internal_task_pollers.go +++ b/internal/internal_task_pollers.go @@ -27,6 +27,7 @@ import ( "context" "errors" "fmt" + "go.uber.org/cadence/internal/common/debug" "sync" "time" @@ -136,6 +137,7 @@ type ( dataConverter DataConverter contextPropagators []ContextPropagator tracer opentracing.Tracer + activityTracker debug.ActivityTracker } localActivityResult struct { @@ -529,6 +531,7 @@ func newLocalActivityPoller(params workerExecutionParameters, laTunnel *localAct dataConverter: params.DataConverter, contextPropagators: params.ContextPropagators, tracer: params.Tracer, + activityTracker: params.WorkerStats.ActivityTracker, } return &localActivityTaskPoller{ basePoller: basePoller{shutdownC: params.WorkerStopChannel}, @@ -658,6 +661,13 @@ func (lath *localActivityTaskHandler) executeLocalActivityTask(task *localActivi laStartTime := time.Now() ctx, span := createOpenTracingActivitySpan(ctx, lath.tracer, time.Now(), task.params.ActivityType, task.params.WorkflowInfo.WorkflowExecution.ID, task.params.WorkflowInfo.WorkflowExecution.RunID) + debugInfo := debug.ActivityInfo{ + WorkflowID: task.params.WorkflowInfo.WorkflowExecution.ID, + RunID: task.params.WorkflowInfo.WorkflowExecution.RunID, + TaskList: task.params.WorkflowInfo.TaskListName, + ActivityType: activityType, + } + defer lath.activityTracker.Start(debugInfo).Stop() defer span.Finish() laResult, err = ae.ExecuteWithActualArgs(ctx, task.params.InputArgs) executionLatency := time.Now().Sub(laStartTime) diff --git a/internal/internal_worker.go b/internal/internal_worker.go index d7f5e0d51..30cdca5f5 100644 --- a/internal/internal_worker.go +++ b/internal/internal_worker.go @@ -193,6 +193,10 @@ func ensureRequiredParams(params *workerExecutionParameters) { params.WorkerStats.PollerTracker = debug.NewNoopPollerTracker() params.Logger.Debug("No PollerTracker configured for WorkerStats option. Will use the default.") } + if params.WorkerStats.ActivityTracker == nil { + params.WorkerStats.ActivityTracker = debug.NewNoopActivityTracker() + params.Logger.Debug("No ActivityTracker configured for WorkerStats option. Will use the default.") + } } // verifyDomainExist does a DescribeDomain operation on the specified domain with backoff/retry diff --git a/internal/internal_worker_test.go b/internal/internal_worker_test.go index e393d1009..fd474aa1b 100644 --- a/internal/internal_worker_test.go +++ b/internal/internal_worker_test.go @@ -1100,7 +1100,10 @@ func TestWorkerOptionDefaults(t *testing.T) { Logger: decisionWorker.executionParameters.Logger, MetricsScope: decisionWorker.executionParameters.MetricsScope, Identity: decisionWorker.executionParameters.Identity, - WorkerStats: debug.WorkerStats{debug.NewNoopPollerTracker()}, + WorkerStats: debug.WorkerStats{ + PollerTracker: debug.NewNoopPollerTracker(), + ActivityTracker: debug.NewNoopActivityTracker(), + }, }, UserContext: decisionWorker.executionParameters.UserContext, } @@ -1161,7 +1164,10 @@ func TestWorkerOptionNonDefaults(t *testing.T) { Logger: options.Logger, MetricsScope: options.MetricsScope, Identity: options.Identity, - WorkerStats: debug.WorkerStats{debug.NewNoopPollerTracker()}, + WorkerStats: debug.WorkerStats{ + PollerTracker: debug.NewNoopPollerTracker(), + ActivityTracker: debug.NewNoopActivityTracker(), + }, }, } @@ -1190,6 +1196,7 @@ func assertWorkerExecutionParamsEqual(t *testing.T, paramsA workerExecutionParam require.Equal(t, paramsA.EnableLoggingInReplay, paramsB.EnableLoggingInReplay) require.Equal(t, paramsA.DisableStickyExecution, paramsB.DisableStickyExecution) require.Equal(t, paramsA.WorkerStats.PollerTracker, paramsB.WorkerStats.PollerTracker) + require.Equal(t, paramsA.WorkerStats.ActivityTracker, paramsB.WorkerStats.ActivityTracker) } /* diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index 0210d1121..cc593c830 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -25,6 +25,7 @@ import ( "context" "errors" "fmt" + "go.uber.org/cadence/internal/common/debug" "reflect" "strings" "sync" @@ -626,10 +627,11 @@ func (env *testWorkflowEnvironmentImpl) executeLocalActivity( }, } taskHandler := localActivityTaskHandler{ - userContext: env.workerOptions.BackgroundActivityContext, - metricsScope: env.metricsScope, - logger: env.logger, - tracer: opentracing.NoopTracer{}, + userContext: env.workerOptions.BackgroundActivityContext, + metricsScope: env.metricsScope, + logger: env.logger, + tracer: opentracing.NoopTracer{}, + activityTracker: debug.NewNoopActivityTracker(), } result := taskHandler.executeLocalActivityTask(task) @@ -1195,6 +1197,7 @@ func (env *testWorkflowEnvironmentImpl) ExecuteLocalActivity(params executeLocal dataConverter: wOptions.DataConverter, tracer: wOptions.Tracer, contextPropagators: wOptions.ContextPropagators, + activityTracker: debug.NewNoopActivityTracker(), } env.localActivities[activityID] = task