Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ActivityTracker to worker stats option #1362

Merged
merged 10 commits into from
Aug 29, 2024
12 changes: 12 additions & 0 deletions debug/interfaces.go → debug/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,16 @@ type (
// stats on the Worker for debugging purposes.
// Deprecated: in development and very likely to change
WorkerStats = internal.WorkerStats

// ActivityTracker is a worker option to track executing activities on a worker
// Deprecated: in development and very likely to change
ActivityTracker = internal.ActivityTracker

// ActivityInfo contains details on the executing activity
// Deprecated: in development and very likely to change
ActivityInfo = internal.ActivityInfo

// Activities is a list of executing activities on the worker
// Deprecated: in development and very likely to change
Activities = internal.Activities
)
121 changes: 113 additions & 8 deletions internal/common/debug/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
package debug

import (
"encoding/json"
"fmt"
"sync"

"go.uber.org/atomic"
)
Expand All @@ -36,8 +38,57 @@ type (
stopperImpl struct {
pollerTracker *pollerTrackerImpl
}

// activityTrackerImpl implements the ActivityTracker interface
activityTrackerImpl struct {
sync.RWMutex
activityCount map[ActivityInfo]int64
}

// activityStopperImpl implements the Stopper interface
activityStopperImpl struct {
sync.Once
info ActivityInfo
tracker *activityTrackerImpl
}
)

var _ ActivityTracker = &activityTrackerImpl{}
var _ Stopper = &activityStopperImpl{}

func (ati *activityTrackerImpl) Start(info ActivityInfo) Stopper {
ati.Lock()
defer ati.Unlock()
ati.activityCount[info]++
return &activityStopperImpl{info: info, tracker: ati}
}

func (ati *activityTrackerImpl) Stats() Activities {
Copy link
Contributor Author

@ketsiambaku ketsiambaku Aug 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can group Activities by activityType or other fields of ActivityInfo if needed internally

var activities Activities
ati.RLock()
defer ati.RUnlock()
for a, count := range ati.activityCount {
if count > 0 {
activities = append(activities, struct {
Info ActivityInfo
Count int64
}{Info: a, Count: count})
}
}
return activities
}

func (asi *activityStopperImpl) Stop() {
asi.Do(func() {
asi.tracker.Lock()
defer asi.tracker.Unlock()
asi.tracker.activityCount[asi.info]--
if asi.tracker.activityCount[asi.info] == 0 {
delete(asi.tracker.activityCount, asi.info)
}
})
}

func (p *pollerTrackerImpl) Start() Stopper {
p.pollerCount.Inc()
return &stopperImpl{
Expand All @@ -58,24 +109,78 @@ func Example() {
pollerTracker = &pollerTrackerImpl{}

// Initially, poller count should be 0
fmt.Println(fmt.Sprintf("stats: %d", pollerTracker.Stats()))
fmt.Println(fmt.Sprintf("poller stats: %d", pollerTracker.Stats()))

// Start a poller and verify that the count increments
stopper1 := pollerTracker.Start()
fmt.Println(fmt.Sprintf("stats: %d", pollerTracker.Stats()))
fmt.Println(fmt.Sprintf("poller stats: %d", pollerTracker.Stats()))

// Start another poller and verify that the count increments again
stopper2 := pollerTracker.Start()
fmt.Println(fmt.Sprintf("stats: %d", pollerTracker.Stats()))
fmt.Println(fmt.Sprintf("poller stats: %d", pollerTracker.Stats()))

// Stop the pollers and verify the counter
stopper1.Stop()
stopper2.Stop()
fmt.Println(fmt.Sprintf("stats: %d", pollerTracker.Stats()))
fmt.Println(fmt.Sprintf("poller stats: %d", pollerTracker.Stats()))

var activityTracker ActivityTracker
activityTracker = &activityTrackerImpl{activityCount: make(map[ActivityInfo]int64)}

info1 := ActivityInfo{
TaskList: "task-list",
ActivityType: "activity1",
}

info2 := ActivityInfo{
TaskList: "task-list",
ActivityType: "activity2",
}

stopper1 = activityTracker.Start(info1)
stopper2 = activityTracker.Start(info2)
jsonActivities, _ := json.MarshalIndent(activityTracker.Stats(), "", " ")
fmt.Println(string(jsonActivities))

stopper1.Stop()
stopper1.Stop()
jsonActivities, _ = json.MarshalIndent(activityTracker.Stats(), "", " ")

fmt.Println(string(jsonActivities))
stopper2.Stop()

jsonActivities, _ = json.MarshalIndent(activityTracker.Stats(), "", " ")
fmt.Println(string(jsonActivities))

// Output:
// stats: 0
// stats: 1
// stats: 2
// stats: 0
// poller stats: 0
// poller stats: 1
// poller stats: 2
// poller stats: 0
// [
// {
// "Info": {
// "TaskList": "task-list",
// "ActivityType": "activity1"
// },
// "Count": 1
// },
// {
// "Info": {
// "TaskList": "task-list",
// "ActivityType": "activity2"
// },
// "Count": 1
// }
// ]
// [
// {
// "Info": {
// "TaskList": "task-list",
// "ActivityType": "activity2"
// },
// "Count": 1
// }
// ]
// null
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,30 @@ type (
// stats on the Worker for debugging purposes.
// Deprecated: in development and very likely to change
WorkerStats struct {
PollerTracker PollerTracker
PollerTracker PollerTracker
ActivityTracker ActivityTracker
}

// ActivityInfo contains details on the executing activity
// Deprecated: in development and very likely to change
ActivityInfo struct {
TaskList string
ActivityType string
}

// ActivityTracker is a worker option to track executing activities on a worker
// Deprecated: in development and very likely to change
ActivityTracker interface {
// Start records activity execution
Start(info ActivityInfo) Stopper
// Stats returns a list of executing activity info
Stats() Activities
}

// Activities is a list of executing activities on the worker
// Deprecated: in development and very likely to change
Activities []struct {
Info ActivityInfo
Count int64
}
)
12 changes: 12 additions & 0 deletions internal/common/debug/workerstats_noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type (
pollerTrackerNoopImpl struct{}
// stopperNoopImpl implements the Stopper interface
stopperNoopImpl struct{}
// activityTrackerNoopImpl implements the ActivityTracker interface
activityTrackerNoopImpl struct{}
)

func (lc *pollerTrackerNoopImpl) Start() Stopper { return &stopperNoopImpl{} }
Expand All @@ -33,3 +35,13 @@ func (r *stopperNoopImpl) Stop() {}

// NewNoopPollerTracker creates a new PollerTracker instance
func NewNoopPollerTracker() PollerTracker { return &pollerTrackerNoopImpl{} }

func (at *activityTrackerNoopImpl) Start(info ActivityInfo) Stopper { return &stopperNoopImpl{} }
func (at *activityTrackerNoopImpl) Stats() Activities { return nil }

// NewNoopActivityTracker creates a new PollerTracker instance
func NewNoopActivityTracker() ActivityTracker { return &activityTrackerNoopImpl{} }

var _ PollerTracker = &pollerTrackerNoopImpl{}
var _ Stopper = &stopperNoopImpl{}
var _ ActivityTracker = &activityTrackerNoopImpl{}
3 changes: 3 additions & 0 deletions internal/common/debug/workerstats_noop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@ import (

func TestWorkerStats(t *testing.T) {
pollerTracker := NewNoopPollerTracker()
activityTracker := NewNoopActivityTracker()
assert.NotNil(t, pollerTracker)
assert.NotNil(t, pollerTracker.Start())
assert.Equal(t, int32(0), pollerTracker.Stats())
assert.NotPanics(t, pollerTracker.Start().Stop)
assert.NotNil(t, activityTracker.Start(ActivityInfo{}))
assert.Nil(t, activityTracker.Stats())
}
10 changes: 9 additions & 1 deletion internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
"sync/atomic"
"time"

"go.uber.org/cadence/internal/common/debug"

"github.com/opentracing/opentracing-go"
"github.com/uber-go/tally"
"go.uber.org/zap"
Expand Down Expand Up @@ -152,6 +154,7 @@ type (
contextPropagators []ContextPropagator
tracer opentracing.Tracer
featureFlags FeatureFlags
activityTracker debug.ActivityTracker
}

// history wrapper method to help information about events.
Expand Down Expand Up @@ -1417,6 +1420,7 @@ func newActivityTaskHandlerWithCustomProvider(
contextPropagators: params.ContextPropagators,
tracer: params.Tracer,
featureFlags: params.FeatureFlags,
activityTracker: params.WorkerStats.ActivityTracker,
}
}

Expand Down Expand Up @@ -1710,7 +1714,11 @@ func (ath *activityTaskHandlerImpl) Execute(taskList string, t *s.PollForActivit
}
}()
}

activityInfo := debug.ActivityInfo{
TaskList: ath.taskListName,
ActivityType: activityType,
}
defer ath.activityTracker.Start(activityInfo).Stop()
output, err := activityImplementation.Execute(ctx, t.Input)

dlCancelFunc()
Expand Down
1 change: 1 addition & 0 deletions internal/internal_task_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1542,6 +1542,7 @@ func (t *TaskHandlersTestSuite) TestActivityExecutionDeadline() {
Tracer: opentracing.NoopTracer{},
},
}
ensureRequiredParams(&wep)
activityHandler := newActivityTaskHandler(mockService, wep, registry)
pats := &s.PollForActivityTaskResponse{
TaskToken: []byte("token"),
Expand Down
9 changes: 9 additions & 0 deletions internal/internal_task_pollers.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"sync"
"time"

"go.uber.org/cadence/internal/common/debug"

"github.com/opentracing/opentracing-go"
"github.com/pborman/uuid"
"github.com/uber-go/tally"
Expand Down Expand Up @@ -136,6 +138,7 @@ type (
dataConverter DataConverter
contextPropagators []ContextPropagator
tracer opentracing.Tracer
activityTracker debug.ActivityTracker
}

localActivityResult struct {
Expand Down Expand Up @@ -529,6 +532,7 @@ func newLocalActivityPoller(params workerExecutionParameters, laTunnel *localAct
dataConverter: params.DataConverter,
contextPropagators: params.ContextPropagators,
tracer: params.Tracer,
activityTracker: params.WorkerStats.ActivityTracker,
}
return &localActivityTaskPoller{
basePoller: basePoller{shutdownC: params.WorkerStopChannel},
Expand Down Expand Up @@ -658,6 +662,11 @@ func (lath *localActivityTaskHandler) executeLocalActivityTask(task *localActivi

laStartTime := time.Now()
ctx, span := createOpenTracingActivitySpan(ctx, lath.tracer, time.Now(), task.params.ActivityType, task.params.WorkflowInfo.WorkflowExecution.ID, task.params.WorkflowInfo.WorkflowExecution.RunID)
activityInfo := debug.ActivityInfo{
TaskList: task.params.WorkflowInfo.TaskListName,
ActivityType: activityType,
}
defer lath.activityTracker.Start(activityInfo).Stop()
defer span.Finish()
laResult, err = ae.ExecuteWithActualArgs(ctx, task.params.InputArgs)
executionLatency := time.Now().Sub(laStartTime)
Expand Down
4 changes: 4 additions & 0 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ func ensureRequiredParams(params *workerExecutionParameters) {
params.WorkerStats.PollerTracker = debug.NewNoopPollerTracker()
params.Logger.Debug("No PollerTracker configured for WorkerStats option. Will use the default.")
}
if params.WorkerStats.ActivityTracker == nil {
params.WorkerStats.ActivityTracker = debug.NewNoopActivityTracker()
params.Logger.Debug("No ActivityTracker configured for WorkerStats option. Will use the default.")
}
}

// verifyDomainExist does a DescribeDomain operation on the specified domain with backoff/retry
Expand Down
11 changes: 9 additions & 2 deletions internal/internal_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1100,7 +1100,10 @@ func TestWorkerOptionDefaults(t *testing.T) {
Logger: decisionWorker.executionParameters.Logger,
MetricsScope: decisionWorker.executionParameters.MetricsScope,
Identity: decisionWorker.executionParameters.Identity,
WorkerStats: debug.WorkerStats{debug.NewNoopPollerTracker()},
WorkerStats: debug.WorkerStats{
PollerTracker: debug.NewNoopPollerTracker(),
ActivityTracker: debug.NewNoopActivityTracker(),
},
},
UserContext: decisionWorker.executionParameters.UserContext,
}
Expand Down Expand Up @@ -1161,7 +1164,10 @@ func TestWorkerOptionNonDefaults(t *testing.T) {
Logger: options.Logger,
MetricsScope: options.MetricsScope,
Identity: options.Identity,
WorkerStats: debug.WorkerStats{debug.NewNoopPollerTracker()},
WorkerStats: debug.WorkerStats{
PollerTracker: debug.NewNoopPollerTracker(),
ActivityTracker: debug.NewNoopActivityTracker(),
},
},
}

Expand Down Expand Up @@ -1190,6 +1196,7 @@ func assertWorkerExecutionParamsEqual(t *testing.T, paramsA workerExecutionParam
require.Equal(t, paramsA.EnableLoggingInReplay, paramsB.EnableLoggingInReplay)
require.Equal(t, paramsA.DisableStickyExecution, paramsB.DisableStickyExecution)
require.Equal(t, paramsA.WorkerStats.PollerTracker, paramsB.WorkerStats.PollerTracker)
require.Equal(t, paramsA.WorkerStats.ActivityTracker, paramsB.WorkerStats.ActivityTracker)
}

/*
Expand Down
12 changes: 8 additions & 4 deletions internal/internal_workflow_testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"sync"
"time"

"go.uber.org/cadence/internal/common/debug"

"github.com/facebookgo/clock"
"github.com/golang/mock/gomock"
"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -626,10 +628,11 @@ func (env *testWorkflowEnvironmentImpl) executeLocalActivity(
},
}
taskHandler := localActivityTaskHandler{
userContext: env.workerOptions.BackgroundActivityContext,
metricsScope: env.metricsScope,
logger: env.logger,
tracer: opentracing.NoopTracer{},
userContext: env.workerOptions.BackgroundActivityContext,
metricsScope: env.metricsScope,
logger: env.logger,
tracer: opentracing.NoopTracer{},
activityTracker: debug.NewNoopActivityTracker(),
}

result := taskHandler.executeLocalActivityTask(task)
Expand Down Expand Up @@ -1195,6 +1198,7 @@ func (env *testWorkflowEnvironmentImpl) ExecuteLocalActivity(params executeLocal
dataConverter: wOptions.DataConverter,
tracer: wOptions.Tracer,
contextPropagators: wOptions.ContextPropagators,
activityTracker: debug.NewNoopActivityTracker(),
}

env.localActivities[activityID] = task
Expand Down
Loading