Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ketsiambaku committed Aug 29, 2024
1 parent a045790 commit a1086da
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 31 deletions.
35 changes: 14 additions & 21 deletions internal/common/debug/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type (
// activityTrackerImpl implements the ActivityTracker interface
activityTrackerImpl struct {
sync.RWMutex
m map[ActivityInfo]int64
activityCount map[ActivityInfo]int64
}

// activityStopperImpl implements the Stopper interface
Expand All @@ -58,16 +58,16 @@ var _ Stopper = &activityStopperImpl{}

func (ati *activityTrackerImpl) Start(info ActivityInfo) Stopper {
ati.Lock()
ati.m[info]++
ati.Unlock()
defer ati.Unlock()
ati.activityCount[info]++
return &activityStopperImpl{info: info, tracker: ati}
}

func (ati *activityTrackerImpl) Stats() Activities {
var activities Activities
ati.RLock()
defer ati.RUnlock()
for a, count := range ati.m {
for a, count := range ati.activityCount {
if count > 0 {
activities = append(activities, struct {
Info ActivityInfo
Expand All @@ -82,7 +82,10 @@ func (asi *activityStopperImpl) Stop() {
asi.Do(func() {
asi.tracker.Lock()
defer asi.tracker.Unlock()
asi.tracker.m[asi.info]--
asi.tracker.activityCount[asi.info]--
if asi.tracker.activityCount[asi.info] == 0 {
delete(asi.tracker.activityCount, asi.info)
}
})
}

Expand Down Expand Up @@ -122,20 +125,16 @@ func Example() {
fmt.Println(fmt.Sprintf("poller stats: %d", pollerTracker.Stats()))

var activityTracker ActivityTracker
activityTracker = &activityTrackerImpl{m: make(map[ActivityInfo]int64)}
activityTracker = &activityTrackerImpl{activityCount: make(map[ActivityInfo]int64)}

info1 := ActivityInfo{
WorkflowID: "id1",
RunID: "rid1",
TaskList: "task-list",
ActivityType: "activity",
ActivityType: "activity1",
}

info2 := ActivityInfo{
WorkflowID: "id2",
RunID: "rid2",
TaskList: "task-list",
ActivityType: "activity",
ActivityType: "activity2",
}

stopper1 = activityTracker.Start(info1)
Expand All @@ -161,30 +160,24 @@ func Example() {
// [
// {
// "Info": {
// "WorkflowID": "id1",
// "RunID": "rid1",
// "TaskList": "task-list",
// "ActivityType": "activity"
// "ActivityType": "activity1"
// },
// "Count": 1
// },
// {
// "Info": {
// "WorkflowID": "id2",
// "RunID": "rid2",
// "TaskList": "task-list",
// "ActivityType": "activity"
// "ActivityType": "activity2"
// },
// "Count": 1
// }
// ]
// [
// {
// "Info": {
// "WorkflowID": "id2",
// "RunID": "rid2",
// "TaskList": "task-list",
// "ActivityType": "activity"
// "ActivityType": "activity2"
// },
// "Count": 1
// }
Expand Down
2 changes: 0 additions & 2 deletions internal/common/debug/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ type (
// ActivityInfo contains details on the executing activity
// Deprecated: in development and very likely to change
ActivityInfo struct {
WorkflowID string
RunID string
TaskList string
ActivityType string
}
Expand Down
6 changes: 2 additions & 4 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1714,13 +1714,11 @@ func (ath *activityTaskHandlerImpl) Execute(taskList string, t *s.PollForActivit
}
}()
}
debugInfo := debug.ActivityInfo{
WorkflowID: *t.WorkflowExecution.WorkflowId,
RunID: *t.WorkflowExecution.RunId,
activityInfo := debug.ActivityInfo{
TaskList: ath.taskListName,
ActivityType: activityType,
}
defer ath.activityTracker.Start(debugInfo).Stop()
defer ath.activityTracker.Start(activityInfo).Stop()
output, err := activityImplementation.Execute(ctx, t.Input)

dlCancelFunc()
Expand Down
6 changes: 2 additions & 4 deletions internal/internal_task_pollers.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,13 +662,11 @@ func (lath *localActivityTaskHandler) executeLocalActivityTask(task *localActivi

laStartTime := time.Now()
ctx, span := createOpenTracingActivitySpan(ctx, lath.tracer, time.Now(), task.params.ActivityType, task.params.WorkflowInfo.WorkflowExecution.ID, task.params.WorkflowInfo.WorkflowExecution.RunID)
debugInfo := debug.ActivityInfo{
WorkflowID: task.params.WorkflowInfo.WorkflowExecution.ID,
RunID: task.params.WorkflowInfo.WorkflowExecution.RunID,
activityInfo := debug.ActivityInfo{
TaskList: task.params.WorkflowInfo.TaskListName,
ActivityType: activityType,
}
defer lath.activityTracker.Start(debugInfo).Stop()
defer lath.activityTracker.Start(activityInfo).Stop()
defer span.Finish()
laResult, err = ae.ExecuteWithActualArgs(ctx, task.params.InputArgs)
executionLatency := time.Now().Sub(laStartTime)
Expand Down

0 comments on commit a1086da

Please sign in to comment.