Skip to content

Commit

Permalink
[Disk Manager] add RunSystemTasks to tasks config, run system tasks i…
Browse files Browse the repository at this point in the history
…n controlplane only, clean up lister metrics when collectListerMetricsTask finishes
  • Loading branch information
gy2411 committed Nov 14, 2024
1 parent b432ff6 commit 085a114
Show file tree
Hide file tree
Showing 7 changed files with 305 additions and 77 deletions.
32 changes: 31 additions & 1 deletion cloud/tasks/collect_lister_metrics_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ func (c *collectListerMetricsTask) Run(
defer ticker.Stop()

for range ticker.C {
defer c.cleanupTasksMetrics(
storage.TaskStatusToString(storage.TaskStatusReadyToRun),
)
err := c.collectTasksMetrics(
ctx,
func(context.Context) ([]storage.TaskInfo, error) {
Expand All @@ -59,6 +62,9 @@ func (c *collectListerMetricsTask) Run(
return err
}

defer c.cleanupTasksMetrics(
storage.TaskStatusToString(storage.TaskStatusRunning),
)
err = c.collectTasksMetrics(
ctx,
func(context.Context) ([]storage.TaskInfo, error) {
Expand All @@ -73,6 +79,9 @@ func (c *collectListerMetricsTask) Run(
return err
}

defer c.cleanupTasksMetrics(
storage.TaskStatusToString(storage.TaskStatusReadyToCancel),
)
err = c.collectTasksMetrics(
ctx,
func(context.Context) ([]storage.TaskInfo, error) {
Expand All @@ -88,6 +97,9 @@ func (c *collectListerMetricsTask) Run(
return err
}

defer c.cleanupTasksMetrics(
storage.TaskStatusToString(storage.TaskStatusCancelling),
)
err = c.collectTasksMetrics(
ctx,
func(context.Context) ([]storage.TaskInfo, error) {
Expand All @@ -102,6 +114,7 @@ func (c *collectListerMetricsTask) Run(
return err
}

defer c.cleanupHangingTasksMetrics()
err = c.collectHangingTasksMetrics(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -195,7 +208,7 @@ func (c *collectListerMetricsTask) collectHangingTasksMetrics(
"Task with id %s is not hanging anymore",
id,
)
gauge.Set(0)
gauge.Set(float64(0))
delete(c.hangingTaskGaugesByID, id)
}
}
Expand Down Expand Up @@ -228,3 +241,20 @@ func (c *collectListerMetricsTask) collectHangingTasksMetrics(

return nil
}

func (c *collectListerMetricsTask) cleanupTasksMetrics(sensor string) {
for _, taskType := range c.taskTypes {
subRegistry := c.registry.WithTags(map[string]string{
"type": taskType,
})
subRegistry.Gauge(sensor).Set(float64(0))
}
}

func (c *collectListerMetricsTask) cleanupHangingTasksMetrics() {
c.cleanupTasksMetrics(totalHangingTaskCountGaugeName)

for _, gauge := range c.hangingTaskGaugesByID {
gauge.Set(float64(0))
}
}
2 changes: 2 additions & 0 deletions cloud/tasks/config/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,6 @@ message TasksConfig {
// Needed for tracing.
// Spans of tasks with greater generation id will not be sampled.
optional uint64 MaxSampledTaskGeneration = 32 [default = 100];

optional bool RunSystemTasks = 33 [default = true];
}
107 changes: 107 additions & 0 deletions cloud/tasks/register_system_tasks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package tasks

import (
"context"
"time"

tasks_config "github.com/ydb-platform/nbs/cloud/tasks/config"
"github.com/ydb-platform/nbs/cloud/tasks/metrics"
tasks_storage "github.com/ydb-platform/nbs/cloud/tasks/storage"
)

////////////////////////////////////////////////////////////////////////////////

func RegisterSystemTasks(
ctx context.Context,
registry *Registry,
storage tasks_storage.Storage,
config *tasks_config.TasksConfig,
tasksMetricsRegistry metrics.Registry,
taskScheduler Scheduler,
) error {

err := registry.RegisterForExecution("tasks.Blank", func() Task {
return &blankTask{}
})
if err != nil {
return err
}

endedTaskExpirationTimeout, err := time.ParseDuration(
config.GetEndedTaskExpirationTimeout(),
)
if err != nil {
return err
}

clearEndedTasksTaskScheduleInterval, err := time.ParseDuration(
config.GetClearEndedTasksTaskScheduleInterval(),
)
if err != nil {
return err
}

err = registry.RegisterForExecution("tasks.ClearEndedTasks", func() Task {
return &clearEndedTasksTask{
storage: storage,
expirationTimeout: endedTaskExpirationTimeout,
limit: int(config.GetClearEndedTasksLimit()),
}
})
if err != nil {
return err
}

taskScheduler.ScheduleRegularTasks(
ctx,
"tasks.ClearEndedTasks",
TaskSchedule{
ScheduleInterval: clearEndedTasksTaskScheduleInterval,
MaxTasksInflight: 1,
},
)

listerMetricsCollectionInterval, err := time.ParseDuration(
config.GetListerMetricsCollectionInterval(),
)
if err != nil {
return err
}

err = registry.RegisterForExecution(
"tasks.CollectListerMetrics", func() Task {
return &collectListerMetricsTask{
registry: tasksMetricsRegistry,
storage: storage,
metricsCollectionInterval: listerMetricsCollectionInterval,

// This task is registered in control plane and collects metrics
// for all types of tasks.
taskTypes: registry.TaskTypes(),
hangingTaskGaugesByID: make(map[string]metrics.Gauge),
maxHangingTaskIDsToReport: config.GetMaxHangingTaskIDsToReport(),
}
},
)
if err != nil {
return err
}

collectListerMetricsTaskScheduleInterval, err := time.ParseDuration(
config.GetCollectListerMetricsTaskScheduleInterval(),
)
if err != nil {
return err
}

taskScheduler.ScheduleRegularTasks(
ctx,
"tasks.CollectListerMetrics",
TaskSchedule{
ScheduleInterval: collectListerMetricsTaskScheduleInterval,
MaxTasksInflight: 1,
},
)

return nil
}
12 changes: 12 additions & 0 deletions cloud/tasks/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,18 @@ type Registry struct {
taskFactoriesMutex sync.RWMutex
}

func (r *Registry) TaskTypes() []string {
r.taskFactoriesMutex.RLock()
defer r.taskFactoriesMutex.RUnlock()

var taskTypes []string
for taskType := range r.taskFactories {
taskTypes = append(taskTypes, taskType)
}

return taskTypes
}

func (r *Registry) TaskTypesForExecution() []string {
r.taskFactoriesMutex.RLock()
defer r.taskFactoriesMutex.RUnlock()
Expand Down
87 changes: 11 additions & 76 deletions cloud/tasks/scheduler_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,18 +558,6 @@ func NewScheduler(
return nil, err
}

endedTaskExpirationTimeout, err := time.ParseDuration(config.GetEndedTaskExpirationTimeout())
if err != nil {
return nil, err
}

clearEndedTasksTaskScheduleInterval, err := time.ParseDuration(
config.GetClearEndedTasksTaskScheduleInterval(),
)
if err != nil {
return nil, err
}

s := &scheduler{
registry: registry,
storage: storage,
Expand All @@ -579,72 +567,19 @@ func NewScheduler(
scheduleRegularTasksPeriodMax: scheduleRegularTasksPeriodMax,
}

err = registry.RegisterForExecution("tasks.Blank", func() Task {
return &blankTask{}
})
if err != nil {
return nil, err
}

err = registry.RegisterForExecution("tasks.ClearEndedTasks", func() Task {
return &clearEndedTasksTask{
storage: storage,
expirationTimeout: endedTaskExpirationTimeout,
limit: int(config.GetClearEndedTasksLimit()),
if config.GetRunSystemTasks() {
err = RegisterSystemTasks(
ctx,
s.registry,
s.storage,
config,
metricsRegistry,
s,
)
if err != nil {
return nil, err
}
})
if err != nil {
return nil, err
}

s.ScheduleRegularTasks(
ctx,
"tasks.ClearEndedTasks",
TaskSchedule{
ScheduleInterval: clearEndedTasksTaskScheduleInterval,
MaxTasksInflight: 1,
},
)

listerMetricsCollectionInterval, err := time.ParseDuration(
config.GetListerMetricsCollectionInterval(),
)
if err != nil {
return nil, err
}

err = registry.RegisterForExecution(
"tasks.CollectListerMetrics", func() Task {
return &collectListerMetricsTask{
registry: metricsRegistry,
storage: storage,
metricsCollectionInterval: listerMetricsCollectionInterval,

taskTypes: registry.TaskTypesForExecution(),
hangingTaskGaugesByID: make(map[string]metrics.Gauge),
maxHangingTaskIDsToReport: config.GetMaxHangingTaskIDsToReport(),
}
},
)
if err != nil {
return nil, err
}

collectListerMetricsTaskScheduleInterval, err := time.ParseDuration(
config.GetCollectListerMetricsTaskScheduleInterval(),
)
if err != nil {
return nil, err
}

s.ScheduleRegularTasks(
ctx,
"tasks.CollectListerMetrics",
TaskSchedule{
ScheduleInterval: collectListerMetricsTaskScheduleInterval,
MaxTasksInflight: 1,
},
)

return s, nil
}
Loading

0 comments on commit 085a114

Please sign in to comment.