Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ActivityTracker to worker stats option #1362

Merged
merged 10 commits into from
Aug 29, 2024
12 changes: 12 additions & 0 deletions debug/interfaces.go → debug/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,16 @@ type (
// stats on the Worker for debugging purposes.
// Deprecated: in development and very likely to change
WorkerStats = internal.WorkerStats

// ActivityTracker is a worker option to track executing activities on a worker
// Deprecated: in development and very likely to change
ActivityTracker = internal.ActivityTracker

// ActivityInfo ...
// Deprecated: in development and very likely to change
ActivityInfo = internal.ActivityInfo

// Activities ...
// Deprecated: in development and very likely to change
Activities = internal.Activities
)
129 changes: 120 additions & 9 deletions internal/common/debug/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@
package debug

import (
"encoding/json"
"fmt"

"go.uber.org/atomic"
"sync"
)

type (
Expand All @@ -36,8 +37,54 @@ type (
stopperImpl struct {
pollerTracker *pollerTrackerImpl
}

// activityTrackerImpl implements the ActivityTracker interface
activityTrackerImpl struct {
sync.RWMutex
m map[ActivityInfo]int64
ketsiambaku marked this conversation as resolved.
Show resolved Hide resolved
}

// activityStopperImpl implements the Stopper interface
activityStopperImpl struct {
sync.Once
info ActivityInfo
tracker *activityTrackerImpl
}
)

var _ ActivityTracker = &activityTrackerImpl{}
var _ Stopper = &activityStopperImpl{}

func (ati *activityTrackerImpl) Start(info ActivityInfo) Stopper {
ati.Lock()
ati.m[info]++
ati.Unlock()
ketsiambaku marked this conversation as resolved.
Show resolved Hide resolved
return &activityStopperImpl{info: info, tracker: ati}
}

func (ati *activityTrackerImpl) Stats() Activities {
Copy link
Contributor Author

@ketsiambaku ketsiambaku Aug 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can group Activities by activityType or other fields of ActivityInfo if needed internally

var activities Activities
ati.RLock()
defer ati.RUnlock()
for a, count := range ati.m {
if count > 0 {
activities = append(activities, struct {
Info ActivityInfo
Count int64
}{Info: a, Count: count})
}
}
return activities
}

func (asi *activityStopperImpl) Stop() {
asi.Do(func() {
asi.tracker.Lock()
defer asi.tracker.Unlock()
asi.tracker.m[asi.info]--
3vilhamster marked this conversation as resolved.
Show resolved Hide resolved
})
}

func (p *pollerTrackerImpl) Start() Stopper {
p.pollerCount.Inc()
return &stopperImpl{
Expand All @@ -58,24 +105,88 @@ func Example() {
pollerTracker = &pollerTrackerImpl{}

// Initially, poller count should be 0
fmt.Println(fmt.Sprintf("stats: %d", pollerTracker.Stats()))
fmt.Println(fmt.Sprintf("poller stats: %d", pollerTracker.Stats()))

// Start a poller and verify that the count increments
stopper1 := pollerTracker.Start()
fmt.Println(fmt.Sprintf("stats: %d", pollerTracker.Stats()))
fmt.Println(fmt.Sprintf("poller stats: %d", pollerTracker.Stats()))

// Start another poller and verify that the count increments again
stopper2 := pollerTracker.Start()
fmt.Println(fmt.Sprintf("stats: %d", pollerTracker.Stats()))
fmt.Println(fmt.Sprintf("poller stats: %d", pollerTracker.Stats()))

// Stop the pollers and verify the counter
stopper1.Stop()
stopper2.Stop()
fmt.Println(fmt.Sprintf("stats: %d", pollerTracker.Stats()))
fmt.Println(fmt.Sprintf("poller stats: %d", pollerTracker.Stats()))

var activityTracker ActivityTracker
activityTracker = &activityTrackerImpl{m: make(map[ActivityInfo]int64)}

info1 := ActivityInfo{
WorkflowID: "id1",
RunID: "rid1",
TaskList: "task-list",
ActivityType: "activity",
}

info2 := ActivityInfo{
WorkflowID: "id2",
RunID: "rid2",
TaskList: "task-list",
ActivityType: "activity",
}

stopper1 = activityTracker.Start(info1)
stopper2 = activityTracker.Start(info2)
jsonActivities, _ := json.MarshalIndent(activityTracker.Stats(), "", " ")
fmt.Println(string(jsonActivities))

stopper1.Stop()
stopper1.Stop()
jsonActivities, _ = json.MarshalIndent(activityTracker.Stats(), "", " ")

fmt.Println(string(jsonActivities))
stopper2.Stop()

jsonActivities, _ = json.MarshalIndent(activityTracker.Stats(), "", " ")
fmt.Println(string(jsonActivities))

// Output:
// stats: 0
// stats: 1
// stats: 2
// stats: 0
// poller stats: 0
// poller stats: 1
// poller stats: 2
// poller stats: 0
// [
// {
// "Info": {
// "WorkflowID": "id1",
// "RunID": "rid1",
// "TaskList": "task-list",
// "ActivityType": "activity"
// },
// "Count": 1
// },
// {
// "Info": {
// "WorkflowID": "id2",
// "RunID": "rid2",
// "TaskList": "task-list",
// "ActivityType": "activity"
// },
// "Count": 1
// }
// ]
// [
// {
// "Info": {
// "WorkflowID": "id2",
// "RunID": "rid2",
// "TaskList": "task-list",
// "ActivityType": "activity"
// },
// "Count": 1
// }
// ]
// null
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,31 @@ type (
// stats on the Worker for debugging purposes.
// Deprecated: in development and very likely to change
WorkerStats struct {
PollerTracker PollerTracker
PollerTracker PollerTracker
ActivityTracker ActivityTracker
}

// ActivityInfo ...
// Deprecated: in development and very likely to change
ActivityInfo struct {
WorkflowID string
3vilhamster marked this conversation as resolved.
Show resolved Hide resolved
RunID string
TaskList string
ActivityType string
}

// ActivityTracker is a worker option to track executing activities on a worker
// Deprecated: in development and very likely to change
ActivityTracker interface {
// Start ...
Start(info ActivityInfo) Stopper
// Stats ...
Stats() Activities
}

// Activities ...
Activities []struct {
Info ActivityInfo
Count int64
}
)
12 changes: 12 additions & 0 deletions internal/common/debug/workerstats_noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type (
pollerTrackerNoopImpl struct{}
// stopperNoopImpl implements the Stopper interface
stopperNoopImpl struct{}
// activityTrackerNoopImpl implements the ActivityTracker interface
activityTrackerNoopImpl struct{}
)

func (lc *pollerTrackerNoopImpl) Start() Stopper { return &stopperNoopImpl{} }
Expand All @@ -33,3 +35,13 @@ func (r *stopperNoopImpl) Stop() {}

// NewNoopPollerTracker creates a new PollerTracker instance
func NewNoopPollerTracker() PollerTracker { return &pollerTrackerNoopImpl{} }

func (at *activityTrackerNoopImpl) Start(info ActivityInfo) Stopper { return &stopperNoopImpl{} }
func (at *activityTrackerNoopImpl) Stats() Activities { return nil }

// NewNoopActivityTracker creates a new PollerTracker instance
func NewNoopActivityTracker() ActivityTracker { return &activityTrackerNoopImpl{} }

var _ PollerTracker = &pollerTrackerNoopImpl{}
var _ Stopper = &stopperNoopImpl{}
var _ ActivityTracker = &activityTrackerNoopImpl{}
3 changes: 3 additions & 0 deletions internal/common/debug/workerstats_noop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@ import (

func TestWorkerStats(t *testing.T) {
pollerTracker := NewNoopPollerTracker()
activityTracker := NewNoopActivityTracker()
assert.NotNil(t, pollerTracker)
assert.NotNil(t, pollerTracker.Start())
assert.Equal(t, int32(0), pollerTracker.Stats())
assert.NotPanics(t, pollerTracker.Start().Stop)
assert.NotNil(t, activityTracker.Start(ActivityInfo{}))
assert.Nil(t, activityTracker.Stats())
}
11 changes: 10 additions & 1 deletion internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"context"
"errors"
"fmt"
"go.uber.org/cadence/internal/common/debug"
"math"
"reflect"
"strings"
Expand Down Expand Up @@ -152,6 +153,7 @@ type (
contextPropagators []ContextPropagator
tracer opentracing.Tracer
featureFlags FeatureFlags
activityTracker debug.ActivityTracker
}

// history wrapper method to help information about events.
Expand Down Expand Up @@ -1417,6 +1419,7 @@ func newActivityTaskHandlerWithCustomProvider(
contextPropagators: params.ContextPropagators,
tracer: params.Tracer,
featureFlags: params.FeatureFlags,
activityTracker: params.WorkerStats.ActivityTracker,
}
}

Expand Down Expand Up @@ -1710,7 +1713,13 @@ func (ath *activityTaskHandlerImpl) Execute(taskList string, t *s.PollForActivit
}
}()
}

debugInfo := debug.ActivityInfo{
ketsiambaku marked this conversation as resolved.
Show resolved Hide resolved
WorkflowID: *t.WorkflowExecution.WorkflowId,
RunID: *t.WorkflowExecution.RunId,
TaskList: ath.taskListName,
ActivityType: activityType,
}
defer ath.activityTracker.Start(debugInfo).Stop()
output, err := activityImplementation.Execute(ctx, t.Input)

dlCancelFunc()
Expand Down
1 change: 1 addition & 0 deletions internal/internal_task_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1542,6 +1542,7 @@ func (t *TaskHandlersTestSuite) TestActivityExecutionDeadline() {
Tracer: opentracing.NoopTracer{},
},
}
ensureRequiredParams(&wep)
activityHandler := newActivityTaskHandler(mockService, wep, registry)
pats := &s.PollForActivityTaskResponse{
TaskToken: []byte("token"),
Expand Down
10 changes: 10 additions & 0 deletions internal/internal_task_pollers.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"context"
"errors"
"fmt"
"go.uber.org/cadence/internal/common/debug"
"sync"
"time"

Expand Down Expand Up @@ -136,6 +137,7 @@ type (
dataConverter DataConverter
contextPropagators []ContextPropagator
tracer opentracing.Tracer
activityTracker debug.ActivityTracker
}

localActivityResult struct {
Expand Down Expand Up @@ -529,6 +531,7 @@ func newLocalActivityPoller(params workerExecutionParameters, laTunnel *localAct
dataConverter: params.DataConverter,
contextPropagators: params.ContextPropagators,
tracer: params.Tracer,
activityTracker: params.WorkerStats.ActivityTracker,
}
return &localActivityTaskPoller{
basePoller: basePoller{shutdownC: params.WorkerStopChannel},
Expand Down Expand Up @@ -658,6 +661,13 @@ func (lath *localActivityTaskHandler) executeLocalActivityTask(task *localActivi

laStartTime := time.Now()
ctx, span := createOpenTracingActivitySpan(ctx, lath.tracer, time.Now(), task.params.ActivityType, task.params.WorkflowInfo.WorkflowExecution.ID, task.params.WorkflowInfo.WorkflowExecution.RunID)
debugInfo := debug.ActivityInfo{
ketsiambaku marked this conversation as resolved.
Show resolved Hide resolved
WorkflowID: task.params.WorkflowInfo.WorkflowExecution.ID,
RunID: task.params.WorkflowInfo.WorkflowExecution.RunID,
TaskList: task.params.WorkflowInfo.TaskListName,
ActivityType: activityType,
}
defer lath.activityTracker.Start(debugInfo).Stop()
defer span.Finish()
laResult, err = ae.ExecuteWithActualArgs(ctx, task.params.InputArgs)
executionLatency := time.Now().Sub(laStartTime)
Expand Down
4 changes: 4 additions & 0 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ func ensureRequiredParams(params *workerExecutionParameters) {
params.WorkerStats.PollerTracker = debug.NewNoopPollerTracker()
params.Logger.Debug("No PollerTracker configured for WorkerStats option. Will use the default.")
}
if params.WorkerStats.ActivityTracker == nil {
params.WorkerStats.ActivityTracker = debug.NewNoopActivityTracker()
params.Logger.Debug("No ActivityTracker configured for WorkerStats option. Will use the default.")
}
}

// verifyDomainExist does a DescribeDomain operation on the specified domain with backoff/retry
Expand Down
11 changes: 9 additions & 2 deletions internal/internal_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1100,7 +1100,10 @@ func TestWorkerOptionDefaults(t *testing.T) {
Logger: decisionWorker.executionParameters.Logger,
MetricsScope: decisionWorker.executionParameters.MetricsScope,
Identity: decisionWorker.executionParameters.Identity,
WorkerStats: debug.WorkerStats{debug.NewNoopPollerTracker()},
WorkerStats: debug.WorkerStats{
PollerTracker: debug.NewNoopPollerTracker(),
ActivityTracker: debug.NewNoopActivityTracker(),
},
},
UserContext: decisionWorker.executionParameters.UserContext,
}
Expand Down Expand Up @@ -1161,7 +1164,10 @@ func TestWorkerOptionNonDefaults(t *testing.T) {
Logger: options.Logger,
MetricsScope: options.MetricsScope,
Identity: options.Identity,
WorkerStats: debug.WorkerStats{debug.NewNoopPollerTracker()},
WorkerStats: debug.WorkerStats{
PollerTracker: debug.NewNoopPollerTracker(),
ActivityTracker: debug.NewNoopActivityTracker(),
},
},
}

Expand Down Expand Up @@ -1190,6 +1196,7 @@ func assertWorkerExecutionParamsEqual(t *testing.T, paramsA workerExecutionParam
require.Equal(t, paramsA.EnableLoggingInReplay, paramsB.EnableLoggingInReplay)
require.Equal(t, paramsA.DisableStickyExecution, paramsB.DisableStickyExecution)
require.Equal(t, paramsA.WorkerStats.PollerTracker, paramsB.WorkerStats.PollerTracker)
require.Equal(t, paramsA.WorkerStats.ActivityTracker, paramsB.WorkerStats.ActivityTracker)
}

/*
Expand Down
Loading
Loading