Skip to content

Commit

Permalink
add activity info to activity tracker
Browse files Browse the repository at this point in the history
  • Loading branch information
ketsiambaku committed Aug 28, 2024
1 parent 827437f commit d7b4266
Show file tree
Hide file tree
Showing 9 changed files with 81 additions and 8 deletions.
8 changes: 8 additions & 0 deletions debug/interfaces.go → debug/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,12 @@ type (
// stats on the Worker for debugging purposes.
// Deprecated: in development and very likely to change
WorkerStats = internal.WorkerStats

// ActivityTracker is a worker option to track executing activities on a worker
// Deprecated: in development and very likely to change
ActivityTracker = internal.ActivityTracker

// ActivityInfo ...
// Deprecated: in development and very likely to change
ActivityInfo = internal.ActivityInfo
)
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,25 @@ type (
// stats on the Worker for debugging purposes.
// Deprecated: in development and very likely to change
WorkerStats struct {
PollerTracker PollerTracker
PollerTracker PollerTracker
ActivityTracker ActivityTracker
}

// ActivityInfo ...
// Deprecated: in development and very likely to change
ActivityInfo struct {
WorkflowID string
RunID string
TaskList string
ActivityType string
}

// ActivityTracker is a worker option to track executing activities on a worker
// Deprecated: in development and very likely to change
ActivityTracker interface {
// Start ...
Start(info ActivityInfo) Stopper
// Stats ...
Stats()
}
)
12 changes: 12 additions & 0 deletions internal/common/debug/workerstats_noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type (
pollerTrackerNoopImpl struct{}
// stopperNoopImpl implements the Stopper interface
stopperNoopImpl struct{}
// activityTrackerNoopImpl implements the ActivityTracker interface
activityTrackerNoopImpl struct{}
)

func (lc *pollerTrackerNoopImpl) Start() Stopper { return &stopperNoopImpl{} }
Expand All @@ -33,3 +35,13 @@ func (r *stopperNoopImpl) Stop() {}

// NewNoopPollerTracker creates a new PollerTracker instance
func NewNoopPollerTracker() PollerTracker { return &pollerTrackerNoopImpl{} }

func (at *activityTrackerNoopImpl) Start(info ActivityInfo) Stopper { return &stopperNoopImpl{} }
func (at *activityTrackerNoopImpl) Stats() {}

// NewNoopActivityTracker creates a new PollerTracker instance
func NewNoopActivityTracker() ActivityTracker { return &activityTrackerNoopImpl{} }

var _ PollerTracker = &pollerTrackerNoopImpl{}
var _ Stopper = &stopperNoopImpl{}
var _ ActivityTracker = &activityTrackerNoopImpl{}
11 changes: 10 additions & 1 deletion internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"context"
"errors"
"fmt"
"go.uber.org/cadence/internal/common/debug"
"math"
"reflect"
"strings"
Expand Down Expand Up @@ -152,6 +153,7 @@ type (
contextPropagators []ContextPropagator
tracer opentracing.Tracer
featureFlags FeatureFlags
activityTracker debug.ActivityTracker
}

// history wrapper method to help information about events.
Expand Down Expand Up @@ -1417,6 +1419,7 @@ func newActivityTaskHandlerWithCustomProvider(
contextPropagators: params.ContextPropagators,
tracer: params.Tracer,
featureFlags: params.FeatureFlags,
activityTracker: params.WorkerStats.ActivityTracker,
}
}

Expand Down Expand Up @@ -1710,7 +1713,13 @@ func (ath *activityTaskHandlerImpl) Execute(taskList string, t *s.PollForActivit
}
}()
}

debugInfo := debug.ActivityInfo{
WorkflowID: *t.WorkflowExecution.WorkflowId,
RunID: *t.WorkflowExecution.RunId,
TaskList: ath.taskListName,
ActivityType: activityType,
}
defer ath.activityTracker.Start(debugInfo).Stop()
output, err := activityImplementation.Execute(ctx, t.Input)

dlCancelFunc()
Expand Down
1 change: 1 addition & 0 deletions internal/internal_task_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1542,6 +1542,7 @@ func (t *TaskHandlersTestSuite) TestActivityExecutionDeadline() {
Tracer: opentracing.NoopTracer{},
},
}
ensureRequiredParams(&wep)
activityHandler := newActivityTaskHandler(mockService, wep, registry)
pats := &s.PollForActivityTaskResponse{
TaskToken: []byte("token"),
Expand Down
10 changes: 10 additions & 0 deletions internal/internal_task_pollers.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"context"
"errors"
"fmt"
"go.uber.org/cadence/internal/common/debug"
"sync"
"time"

Expand Down Expand Up @@ -136,6 +137,7 @@ type (
dataConverter DataConverter
contextPropagators []ContextPropagator
tracer opentracing.Tracer
activityTracker debug.ActivityTracker
}

localActivityResult struct {
Expand Down Expand Up @@ -529,6 +531,7 @@ func newLocalActivityPoller(params workerExecutionParameters, laTunnel *localAct
dataConverter: params.DataConverter,
contextPropagators: params.ContextPropagators,
tracer: params.Tracer,
activityTracker: params.WorkerStats.ActivityTracker,
}
return &localActivityTaskPoller{
basePoller: basePoller{shutdownC: params.WorkerStopChannel},
Expand Down Expand Up @@ -658,6 +661,13 @@ func (lath *localActivityTaskHandler) executeLocalActivityTask(task *localActivi

laStartTime := time.Now()
ctx, span := createOpenTracingActivitySpan(ctx, lath.tracer, time.Now(), task.params.ActivityType, task.params.WorkflowInfo.WorkflowExecution.ID, task.params.WorkflowInfo.WorkflowExecution.RunID)
debugInfo := debug.ActivityInfo{
WorkflowID: task.params.WorkflowInfo.WorkflowExecution.ID,
RunID: task.params.WorkflowInfo.WorkflowExecution.RunID,
TaskList: task.params.WorkflowInfo.TaskListName,
ActivityType: activityType,
}
defer lath.activityTracker.Start(debugInfo).Stop()
defer span.Finish()
laResult, err = ae.ExecuteWithActualArgs(ctx, task.params.InputArgs)
executionLatency := time.Now().Sub(laStartTime)
Expand Down
4 changes: 4 additions & 0 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ func ensureRequiredParams(params *workerExecutionParameters) {
params.WorkerStats.PollerTracker = debug.NewNoopPollerTracker()
params.Logger.Debug("No PollerTracker configured for WorkerStats option. Will use the default.")
}
if params.WorkerStats.ActivityTracker == nil {
params.WorkerStats.ActivityTracker = debug.NewNoopActivityTracker()
params.Logger.Debug("No ActivityTracker configured for WorkerStats option. Will use the default.")
}
}

// verifyDomainExist does a DescribeDomain operation on the specified domain with backoff/retry
Expand Down
11 changes: 9 additions & 2 deletions internal/internal_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1100,7 +1100,10 @@ func TestWorkerOptionDefaults(t *testing.T) {
Logger: decisionWorker.executionParameters.Logger,
MetricsScope: decisionWorker.executionParameters.MetricsScope,
Identity: decisionWorker.executionParameters.Identity,
WorkerStats: debug.WorkerStats{debug.NewNoopPollerTracker()},
WorkerStats: debug.WorkerStats{
PollerTracker: debug.NewNoopPollerTracker(),
ActivityTracker: debug.NewNoopActivityTracker(),
},
},
UserContext: decisionWorker.executionParameters.UserContext,
}
Expand Down Expand Up @@ -1161,7 +1164,10 @@ func TestWorkerOptionNonDefaults(t *testing.T) {
Logger: options.Logger,
MetricsScope: options.MetricsScope,
Identity: options.Identity,
WorkerStats: debug.WorkerStats{debug.NewNoopPollerTracker()},
WorkerStats: debug.WorkerStats{
PollerTracker: debug.NewNoopPollerTracker(),
ActivityTracker: debug.NewNoopActivityTracker(),
},
},
}

Expand Down Expand Up @@ -1190,6 +1196,7 @@ func assertWorkerExecutionParamsEqual(t *testing.T, paramsA workerExecutionParam
require.Equal(t, paramsA.EnableLoggingInReplay, paramsB.EnableLoggingInReplay)
require.Equal(t, paramsA.DisableStickyExecution, paramsB.DisableStickyExecution)
require.Equal(t, paramsA.WorkerStats.PollerTracker, paramsB.WorkerStats.PollerTracker)
require.Equal(t, paramsA.WorkerStats.ActivityTracker, paramsB.WorkerStats.ActivityTracker)
}

/*
Expand Down
11 changes: 7 additions & 4 deletions internal/internal_workflow_testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"context"
"errors"
"fmt"
"go.uber.org/cadence/internal/common/debug"
"reflect"
"strings"
"sync"
Expand Down Expand Up @@ -626,10 +627,11 @@ func (env *testWorkflowEnvironmentImpl) executeLocalActivity(
},
}
taskHandler := localActivityTaskHandler{
userContext: env.workerOptions.BackgroundActivityContext,
metricsScope: env.metricsScope,
logger: env.logger,
tracer: opentracing.NoopTracer{},
userContext: env.workerOptions.BackgroundActivityContext,
metricsScope: env.metricsScope,
logger: env.logger,
tracer: opentracing.NoopTracer{},
activityTracker: debug.NewNoopActivityTracker(),
}

result := taskHandler.executeLocalActivityTask(task)
Expand Down Expand Up @@ -1195,6 +1197,7 @@ func (env *testWorkflowEnvironmentImpl) ExecuteLocalActivity(params executeLocal
dataConverter: wOptions.DataConverter,
tracer: wOptions.Tracer,
contextPropagators: wOptions.ContextPropagators,
activityTracker: debug.NewNoopActivityTracker(),
}

env.localActivities[activityID] = task
Expand Down

0 comments on commit d7b4266

Please sign in to comment.