Skip to content

Commit

Permalink
Add ActivityTracker to worker stats option (#1362)
Browse files Browse the repository at this point in the history
* Add activity tracker option and emit activity counter

* update workflow test suite

* Revert "update workflow test suite"

This reverts commit e6798e0.

* Revert "Add activity tracker option and emit activity counter"

This reverts commit 0755ceb.

* add activity info to activity tracker

* remove idls change

* add example

* run linter and add type comments

* add more type comments

* address comments
  • Loading branch information
ketsiambaku authored Aug 29, 2024
1 parent ed58224 commit 3d320e7
Show file tree
Hide file tree
Showing 11 changed files with 205 additions and 16 deletions.
12 changes: 12 additions & 0 deletions debug/interfaces.go → debug/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,16 @@ type (
// stats on the Worker for debugging purposes.
// Deprecated: in development and very likely to change
WorkerStats = internal.WorkerStats

// ActivityTracker is a worker option to track executing activities on a worker
// Deprecated: in development and very likely to change
ActivityTracker = internal.ActivityTracker

// ActivityInfo contains details on the executing activity
// Deprecated: in development and very likely to change
ActivityInfo = internal.ActivityInfo

// Activities is a list of executing activities on the worker
// Deprecated: in development and very likely to change
Activities = internal.Activities
)
121 changes: 113 additions & 8 deletions internal/common/debug/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
package debug

import (
"encoding/json"
"fmt"
"sync"

"go.uber.org/atomic"
)
Expand All @@ -36,8 +38,57 @@ type (
stopperImpl struct {
pollerTracker *pollerTrackerImpl
}

// activityTrackerImpl implements the ActivityTracker interface
activityTrackerImpl struct {
sync.RWMutex
activityCount map[ActivityInfo]int64
}

// activityStopperImpl implements the Stopper interface
activityStopperImpl struct {
sync.Once
info ActivityInfo
tracker *activityTrackerImpl
}
)

var _ ActivityTracker = &activityTrackerImpl{}
var _ Stopper = &activityStopperImpl{}

func (ati *activityTrackerImpl) Start(info ActivityInfo) Stopper {
ati.Lock()
defer ati.Unlock()
ati.activityCount[info]++
return &activityStopperImpl{info: info, tracker: ati}
}

func (ati *activityTrackerImpl) Stats() Activities {
var activities Activities
ati.RLock()
defer ati.RUnlock()
for a, count := range ati.activityCount {
if count > 0 {
activities = append(activities, struct {
Info ActivityInfo
Count int64
}{Info: a, Count: count})
}
}
return activities
}

func (asi *activityStopperImpl) Stop() {
asi.Do(func() {
asi.tracker.Lock()
defer asi.tracker.Unlock()
asi.tracker.activityCount[asi.info]--
if asi.tracker.activityCount[asi.info] == 0 {
delete(asi.tracker.activityCount, asi.info)
}
})
}

func (p *pollerTrackerImpl) Start() Stopper {
p.pollerCount.Inc()
return &stopperImpl{
Expand All @@ -58,24 +109,78 @@ func Example() {
pollerTracker = &pollerTrackerImpl{}

// Initially, poller count should be 0
fmt.Println(fmt.Sprintf("stats: %d", pollerTracker.Stats()))
fmt.Println(fmt.Sprintf("poller stats: %d", pollerTracker.Stats()))

// Start a poller and verify that the count increments
stopper1 := pollerTracker.Start()
fmt.Println(fmt.Sprintf("stats: %d", pollerTracker.Stats()))
fmt.Println(fmt.Sprintf("poller stats: %d", pollerTracker.Stats()))

// Start another poller and verify that the count increments again
stopper2 := pollerTracker.Start()
fmt.Println(fmt.Sprintf("stats: %d", pollerTracker.Stats()))
fmt.Println(fmt.Sprintf("poller stats: %d", pollerTracker.Stats()))

// Stop the pollers and verify the counter
stopper1.Stop()
stopper2.Stop()
fmt.Println(fmt.Sprintf("stats: %d", pollerTracker.Stats()))
fmt.Println(fmt.Sprintf("poller stats: %d", pollerTracker.Stats()))

var activityTracker ActivityTracker
activityTracker = &activityTrackerImpl{activityCount: make(map[ActivityInfo]int64)}

info1 := ActivityInfo{
TaskList: "task-list",
ActivityType: "activity1",
}

info2 := ActivityInfo{
TaskList: "task-list",
ActivityType: "activity2",
}

stopper1 = activityTracker.Start(info1)
stopper2 = activityTracker.Start(info2)
jsonActivities, _ := json.MarshalIndent(activityTracker.Stats(), "", " ")
fmt.Println(string(jsonActivities))

stopper1.Stop()
stopper1.Stop()
jsonActivities, _ = json.MarshalIndent(activityTracker.Stats(), "", " ")

fmt.Println(string(jsonActivities))
stopper2.Stop()

jsonActivities, _ = json.MarshalIndent(activityTracker.Stats(), "", " ")
fmt.Println(string(jsonActivities))

// Output:
// stats: 0
// stats: 1
// stats: 2
// stats: 0
// poller stats: 0
// poller stats: 1
// poller stats: 2
// poller stats: 0
// [
// {
// "Info": {
// "TaskList": "task-list",
// "ActivityType": "activity1"
// },
// "Count": 1
// },
// {
// "Info": {
// "TaskList": "task-list",
// "ActivityType": "activity2"
// },
// "Count": 1
// }
// ]
// [
// {
// "Info": {
// "TaskList": "task-list",
// "ActivityType": "activity2"
// },
// "Count": 1
// }
// ]
// null
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,30 @@ type (
// stats on the Worker for debugging purposes.
// Deprecated: in development and very likely to change
WorkerStats struct {
PollerTracker PollerTracker
PollerTracker PollerTracker
ActivityTracker ActivityTracker
}

// ActivityInfo contains details on the executing activity
// Deprecated: in development and very likely to change
ActivityInfo struct {
TaskList string
ActivityType string
}

// ActivityTracker is a worker option to track executing activities on a worker
// Deprecated: in development and very likely to change
ActivityTracker interface {
// Start records activity execution
Start(info ActivityInfo) Stopper
// Stats returns a list of executing activity info
Stats() Activities
}

// Activities is a list of executing activities on the worker
// Deprecated: in development and very likely to change
Activities []struct {
Info ActivityInfo
Count int64
}
)
12 changes: 12 additions & 0 deletions internal/common/debug/workerstats_noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type (
pollerTrackerNoopImpl struct{}
// stopperNoopImpl implements the Stopper interface
stopperNoopImpl struct{}
// activityTrackerNoopImpl implements the ActivityTracker interface
activityTrackerNoopImpl struct{}
)

func (lc *pollerTrackerNoopImpl) Start() Stopper { return &stopperNoopImpl{} }
Expand All @@ -33,3 +35,13 @@ func (r *stopperNoopImpl) Stop() {}

// NewNoopPollerTracker creates a new PollerTracker instance
func NewNoopPollerTracker() PollerTracker { return &pollerTrackerNoopImpl{} }

func (at *activityTrackerNoopImpl) Start(info ActivityInfo) Stopper { return &stopperNoopImpl{} }
func (at *activityTrackerNoopImpl) Stats() Activities { return nil }

// NewNoopActivityTracker creates a new PollerTracker instance
func NewNoopActivityTracker() ActivityTracker { return &activityTrackerNoopImpl{} }

var _ PollerTracker = &pollerTrackerNoopImpl{}
var _ Stopper = &stopperNoopImpl{}
var _ ActivityTracker = &activityTrackerNoopImpl{}
3 changes: 3 additions & 0 deletions internal/common/debug/workerstats_noop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@ import (

func TestWorkerStats(t *testing.T) {
pollerTracker := NewNoopPollerTracker()
activityTracker := NewNoopActivityTracker()
assert.NotNil(t, pollerTracker)
assert.NotNil(t, pollerTracker.Start())
assert.Equal(t, int32(0), pollerTracker.Stats())
assert.NotPanics(t, pollerTracker.Start().Stop)
assert.NotNil(t, activityTracker.Start(ActivityInfo{}))
assert.Nil(t, activityTracker.Stats())
}
10 changes: 9 additions & 1 deletion internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
"sync/atomic"
"time"

"go.uber.org/cadence/internal/common/debug"

"github.com/opentracing/opentracing-go"
"github.com/uber-go/tally"
"go.uber.org/zap"
Expand Down Expand Up @@ -152,6 +154,7 @@ type (
contextPropagators []ContextPropagator
tracer opentracing.Tracer
featureFlags FeatureFlags
activityTracker debug.ActivityTracker
}

// history wrapper method to help information about events.
Expand Down Expand Up @@ -1417,6 +1420,7 @@ func newActivityTaskHandlerWithCustomProvider(
contextPropagators: params.ContextPropagators,
tracer: params.Tracer,
featureFlags: params.FeatureFlags,
activityTracker: params.WorkerStats.ActivityTracker,
}
}

Expand Down Expand Up @@ -1710,7 +1714,11 @@ func (ath *activityTaskHandlerImpl) Execute(taskList string, t *s.PollForActivit
}
}()
}

activityInfo := debug.ActivityInfo{
TaskList: ath.taskListName,
ActivityType: activityType,
}
defer ath.activityTracker.Start(activityInfo).Stop()
output, err := activityImplementation.Execute(ctx, t.Input)

dlCancelFunc()
Expand Down
1 change: 1 addition & 0 deletions internal/internal_task_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1542,6 +1542,7 @@ func (t *TaskHandlersTestSuite) TestActivityExecutionDeadline() {
Tracer: opentracing.NoopTracer{},
},
}
ensureRequiredParams(&wep)
activityHandler := newActivityTaskHandler(mockService, wep, registry)
pats := &s.PollForActivityTaskResponse{
TaskToken: []byte("token"),
Expand Down
9 changes: 9 additions & 0 deletions internal/internal_task_pollers.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"sync"
"time"

"go.uber.org/cadence/internal/common/debug"

"github.com/opentracing/opentracing-go"
"github.com/pborman/uuid"
"github.com/uber-go/tally"
Expand Down Expand Up @@ -136,6 +138,7 @@ type (
dataConverter DataConverter
contextPropagators []ContextPropagator
tracer opentracing.Tracer
activityTracker debug.ActivityTracker
}

localActivityResult struct {
Expand Down Expand Up @@ -529,6 +532,7 @@ func newLocalActivityPoller(params workerExecutionParameters, laTunnel *localAct
dataConverter: params.DataConverter,
contextPropagators: params.ContextPropagators,
tracer: params.Tracer,
activityTracker: params.WorkerStats.ActivityTracker,
}
return &localActivityTaskPoller{
basePoller: basePoller{shutdownC: params.WorkerStopChannel},
Expand Down Expand Up @@ -658,6 +662,11 @@ func (lath *localActivityTaskHandler) executeLocalActivityTask(task *localActivi

laStartTime := time.Now()
ctx, span := createOpenTracingActivitySpan(ctx, lath.tracer, time.Now(), task.params.ActivityType, task.params.WorkflowInfo.WorkflowExecution.ID, task.params.WorkflowInfo.WorkflowExecution.RunID)
activityInfo := debug.ActivityInfo{
TaskList: task.params.WorkflowInfo.TaskListName,
ActivityType: activityType,
}
defer lath.activityTracker.Start(activityInfo).Stop()
defer span.Finish()
laResult, err = ae.ExecuteWithActualArgs(ctx, task.params.InputArgs)
executionLatency := time.Now().Sub(laStartTime)
Expand Down
4 changes: 4 additions & 0 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ func ensureRequiredParams(params *workerExecutionParameters) {
params.WorkerStats.PollerTracker = debug.NewNoopPollerTracker()
params.Logger.Debug("No PollerTracker configured for WorkerStats option. Will use the default.")
}
if params.WorkerStats.ActivityTracker == nil {
params.WorkerStats.ActivityTracker = debug.NewNoopActivityTracker()
params.Logger.Debug("No ActivityTracker configured for WorkerStats option. Will use the default.")
}
}

// verifyDomainExist does a DescribeDomain operation on the specified domain with backoff/retry
Expand Down
11 changes: 9 additions & 2 deletions internal/internal_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1100,7 +1100,10 @@ func TestWorkerOptionDefaults(t *testing.T) {
Logger: decisionWorker.executionParameters.Logger,
MetricsScope: decisionWorker.executionParameters.MetricsScope,
Identity: decisionWorker.executionParameters.Identity,
WorkerStats: debug.WorkerStats{debug.NewNoopPollerTracker()},
WorkerStats: debug.WorkerStats{
PollerTracker: debug.NewNoopPollerTracker(),
ActivityTracker: debug.NewNoopActivityTracker(),
},
},
UserContext: decisionWorker.executionParameters.UserContext,
}
Expand Down Expand Up @@ -1161,7 +1164,10 @@ func TestWorkerOptionNonDefaults(t *testing.T) {
Logger: options.Logger,
MetricsScope: options.MetricsScope,
Identity: options.Identity,
WorkerStats: debug.WorkerStats{debug.NewNoopPollerTracker()},
WorkerStats: debug.WorkerStats{
PollerTracker: debug.NewNoopPollerTracker(),
ActivityTracker: debug.NewNoopActivityTracker(),
},
},
}

Expand Down Expand Up @@ -1190,6 +1196,7 @@ func assertWorkerExecutionParamsEqual(t *testing.T, paramsA workerExecutionParam
require.Equal(t, paramsA.EnableLoggingInReplay, paramsB.EnableLoggingInReplay)
require.Equal(t, paramsA.DisableStickyExecution, paramsB.DisableStickyExecution)
require.Equal(t, paramsA.WorkerStats.PollerTracker, paramsB.WorkerStats.PollerTracker)
require.Equal(t, paramsA.WorkerStats.ActivityTracker, paramsB.WorkerStats.ActivityTracker)
}

/*
Expand Down
12 changes: 8 additions & 4 deletions internal/internal_workflow_testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"sync"
"time"

"go.uber.org/cadence/internal/common/debug"

"github.com/facebookgo/clock"
"github.com/golang/mock/gomock"
"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -626,10 +628,11 @@ func (env *testWorkflowEnvironmentImpl) executeLocalActivity(
},
}
taskHandler := localActivityTaskHandler{
userContext: env.workerOptions.BackgroundActivityContext,
metricsScope: env.metricsScope,
logger: env.logger,
tracer: opentracing.NoopTracer{},
userContext: env.workerOptions.BackgroundActivityContext,
metricsScope: env.metricsScope,
logger: env.logger,
tracer: opentracing.NoopTracer{},
activityTracker: debug.NewNoopActivityTracker(),
}

result := taskHandler.executeLocalActivityTask(task)
Expand Down Expand Up @@ -1195,6 +1198,7 @@ func (env *testWorkflowEnvironmentImpl) ExecuteLocalActivity(params executeLocal
dataConverter: wOptions.DataConverter,
tracer: wOptions.Tracer,
contextPropagators: wOptions.ContextPropagators,
activityTracker: debug.NewNoopActivityTracker(),
}

env.localActivities[activityID] = task
Expand Down

0 comments on commit 3d320e7

Please sign in to comment.