From 085a114380fd091b396b7862eff02ea61644d007 Mon Sep 17 00:00:00 2001 From: gayurgin Date: Thu, 14 Nov 2024 13:46:18 +0300 Subject: [PATCH] [Disk Manager] add RunSystemTasks to tasks config, run system tasks in controlplane only, clean up lister metrics when collectListerMetricsTask finishes --- cloud/tasks/collect_lister_metrics_task.go | 32 ++++- cloud/tasks/config/config.proto | 2 + cloud/tasks/register_system_tasks.go | 107 ++++++++++++++++ cloud/tasks/registry.go | 12 ++ cloud/tasks/scheduler_impl.go | 87 ++----------- cloud/tasks/tasks_tests/tasks_test.go | 141 +++++++++++++++++++++ cloud/tasks/ya.make | 1 + 7 files changed, 305 insertions(+), 77 deletions(-) create mode 100644 cloud/tasks/register_system_tasks.go diff --git a/cloud/tasks/collect_lister_metrics_task.go b/cloud/tasks/collect_lister_metrics_task.go index 233a7d7da9e..e8b37f12e33 100644 --- a/cloud/tasks/collect_lister_metrics_task.go +++ b/cloud/tasks/collect_lister_metrics_task.go @@ -44,6 +44,9 @@ func (c *collectListerMetricsTask) Run( defer ticker.Stop() for range ticker.C { + defer c.cleanupTasksMetrics( + storage.TaskStatusToString(storage.TaskStatusReadyToRun), + ) err := c.collectTasksMetrics( ctx, func(context.Context) ([]storage.TaskInfo, error) { @@ -59,6 +62,9 @@ func (c *collectListerMetricsTask) Run( return err } + defer c.cleanupTasksMetrics( + storage.TaskStatusToString(storage.TaskStatusRunning), + ) err = c.collectTasksMetrics( ctx, func(context.Context) ([]storage.TaskInfo, error) { @@ -73,6 +79,9 @@ func (c *collectListerMetricsTask) Run( return err } + defer c.cleanupTasksMetrics( + storage.TaskStatusToString(storage.TaskStatusReadyToCancel), + ) err = c.collectTasksMetrics( ctx, func(context.Context) ([]storage.TaskInfo, error) { @@ -88,6 +97,9 @@ func (c *collectListerMetricsTask) Run( return err } + defer c.cleanupTasksMetrics( + storage.TaskStatusToString(storage.TaskStatusCancelling), + ) err = c.collectTasksMetrics( ctx, func(context.Context) ([]storage.TaskInfo, error) { @@ -102,6 +114,7 @@ func (c *collectListerMetricsTask) Run( return err } + defer c.cleanupHangingTasksMetrics() err = c.collectHangingTasksMetrics(ctx) if err != nil { return err @@ -195,7 +208,7 @@ func (c *collectListerMetricsTask) collectHangingTasksMetrics( "Task with id %s is not hanging anymore", id, ) - gauge.Set(0) + gauge.Set(float64(0)) delete(c.hangingTaskGaugesByID, id) } } @@ -228,3 +241,20 @@ func (c *collectListerMetricsTask) collectHangingTasksMetrics( return nil } + +func (c *collectListerMetricsTask) cleanupTasksMetrics(sensor string) { + for _, taskType := range c.taskTypes { + subRegistry := c.registry.WithTags(map[string]string{ + "type": taskType, + }) + subRegistry.Gauge(sensor).Set(float64(0)) + } +} + +func (c *collectListerMetricsTask) cleanupHangingTasksMetrics() { + c.cleanupTasksMetrics(totalHangingTaskCountGaugeName) + + for _, gauge := range c.hangingTaskGaugesByID { + gauge.Set(float64(0)) + } +} diff --git a/cloud/tasks/config/config.proto b/cloud/tasks/config/config.proto index 719c90899aa..0d73e872c67 100644 --- a/cloud/tasks/config/config.proto +++ b/cloud/tasks/config/config.proto @@ -56,4 +56,6 @@ message TasksConfig { // Needed for tracing. // Spans of tasks with greater generation id will not be sampled. optional uint64 MaxSampledTaskGeneration = 32 [default = 100]; + + optional bool RunSystemTasks = 33 [default = true]; } diff --git a/cloud/tasks/register_system_tasks.go b/cloud/tasks/register_system_tasks.go new file mode 100644 index 00000000000..bf9a767e288 --- /dev/null +++ b/cloud/tasks/register_system_tasks.go @@ -0,0 +1,107 @@ +package tasks + +import ( + "context" + "time" + + tasks_config "github.com/ydb-platform/nbs/cloud/tasks/config" + "github.com/ydb-platform/nbs/cloud/tasks/metrics" + tasks_storage "github.com/ydb-platform/nbs/cloud/tasks/storage" +) + +//////////////////////////////////////////////////////////////////////////////// + +func RegisterSystemTasks( + ctx context.Context, + registry *Registry, + storage tasks_storage.Storage, + config *tasks_config.TasksConfig, + tasksMetricsRegistry metrics.Registry, + taskScheduler Scheduler, +) error { + + err := registry.RegisterForExecution("tasks.Blank", func() Task { + return &blankTask{} + }) + if err != nil { + return err + } + + endedTaskExpirationTimeout, err := time.ParseDuration( + config.GetEndedTaskExpirationTimeout(), + ) + if err != nil { + return err + } + + clearEndedTasksTaskScheduleInterval, err := time.ParseDuration( + config.GetClearEndedTasksTaskScheduleInterval(), + ) + if err != nil { + return err + } + + err = registry.RegisterForExecution("tasks.ClearEndedTasks", func() Task { + return &clearEndedTasksTask{ + storage: storage, + expirationTimeout: endedTaskExpirationTimeout, + limit: int(config.GetClearEndedTasksLimit()), + } + }) + if err != nil { + return err + } + + taskScheduler.ScheduleRegularTasks( + ctx, + "tasks.ClearEndedTasks", + TaskSchedule{ + ScheduleInterval: clearEndedTasksTaskScheduleInterval, + MaxTasksInflight: 1, + }, + ) + + listerMetricsCollectionInterval, err := time.ParseDuration( + config.GetListerMetricsCollectionInterval(), + ) + if err != nil { + return err + } + + err = registry.RegisterForExecution( + "tasks.CollectListerMetrics", func() Task { + return &collectListerMetricsTask{ + registry: tasksMetricsRegistry, + storage: storage, + metricsCollectionInterval: listerMetricsCollectionInterval, + + // This task is registered in control plane and collects metrics + // for all types of tasks. + taskTypes: registry.TaskTypes(), + hangingTaskGaugesByID: make(map[string]metrics.Gauge), + maxHangingTaskIDsToReport: config.GetMaxHangingTaskIDsToReport(), + } + }, + ) + if err != nil { + return err + } + + collectListerMetricsTaskScheduleInterval, err := time.ParseDuration( + config.GetCollectListerMetricsTaskScheduleInterval(), + ) + if err != nil { + return err + } + + taskScheduler.ScheduleRegularTasks( + ctx, + "tasks.CollectListerMetrics", + TaskSchedule{ + ScheduleInterval: collectListerMetricsTaskScheduleInterval, + MaxTasksInflight: 1, + }, + ) + + return nil +} diff --git a/cloud/tasks/registry.go b/cloud/tasks/registry.go index 1227b799678..ecb69b0625a 100644 --- a/cloud/tasks/registry.go +++ b/cloud/tasks/registry.go @@ -18,6 +18,18 @@ type Registry struct { taskFactoriesMutex sync.RWMutex } +func (r *Registry) TaskTypes() []string { + r.taskFactoriesMutex.RLock() + defer r.taskFactoriesMutex.RUnlock() + + var taskTypes []string + for taskType := range r.taskFactories { + taskTypes = append(taskTypes, taskType) + } + + return taskTypes +} + func (r *Registry) TaskTypesForExecution() []string { r.taskFactoriesMutex.RLock() defer r.taskFactoriesMutex.RUnlock() diff --git a/cloud/tasks/scheduler_impl.go b/cloud/tasks/scheduler_impl.go index 1bbb84bcf03..b9a3f411b96 100644 --- a/cloud/tasks/scheduler_impl.go +++ b/cloud/tasks/scheduler_impl.go @@ -558,18 +558,6 @@ func NewScheduler( return nil, err } - endedTaskExpirationTimeout, err := time.ParseDuration(config.GetEndedTaskExpirationTimeout()) - if err != nil { - return nil, err - } - - clearEndedTasksTaskScheduleInterval, err := time.ParseDuration( - config.GetClearEndedTasksTaskScheduleInterval(), - ) - if err != nil { - return nil, err - } - s := &scheduler{ registry: registry, storage: storage, @@ -579,72 +567,19 @@ func NewScheduler( scheduleRegularTasksPeriodMax: scheduleRegularTasksPeriodMax, } - err = registry.RegisterForExecution("tasks.Blank", func() Task { - return &blankTask{} - }) - if err != nil { - return nil, err - } - - err = registry.RegisterForExecution("tasks.ClearEndedTasks", func() Task { - return &clearEndedTasksTask{ - storage: storage, - expirationTimeout: endedTaskExpirationTimeout, - limit: int(config.GetClearEndedTasksLimit()), + if config.GetRunSystemTasks() { + err = RegisterSystemTasks( + ctx, + s.registry, + s.storage, + config, + metricsRegistry, + s, + ) + if err != nil { + return nil, err } - }) - if err != nil { - return nil, err - } - - s.ScheduleRegularTasks( - ctx, - "tasks.ClearEndedTasks", - TaskSchedule{ - ScheduleInterval: clearEndedTasksTaskScheduleInterval, - MaxTasksInflight: 1, - }, - ) - - listerMetricsCollectionInterval, err := time.ParseDuration( - config.GetListerMetricsCollectionInterval(), - ) - if err != nil { - return nil, err - } - - err = registry.RegisterForExecution( - "tasks.CollectListerMetrics", func() Task { - return &collectListerMetricsTask{ - registry: metricsRegistry, - storage: storage, - metricsCollectionInterval: listerMetricsCollectionInterval, - - taskTypes: registry.TaskTypesForExecution(), - hangingTaskGaugesByID: make(map[string]metrics.Gauge), - maxHangingTaskIDsToReport: config.GetMaxHangingTaskIDsToReport(), - } - }, - ) - if err != nil { - return nil, err } - collectListerMetricsTaskScheduleInterval, err := time.ParseDuration( - config.GetCollectListerMetricsTaskScheduleInterval(), - ) - if err != nil { - return nil, err - } - - s.ScheduleRegularTasks( - ctx, - "tasks.CollectListerMetrics", - TaskSchedule{ - ScheduleInterval: collectListerMetricsTaskScheduleInterval, - MaxTasksInflight: 1, - }, - ) - return s, nil } diff --git a/cloud/tasks/tasks_tests/tasks_test.go b/cloud/tasks/tasks_tests/tasks_test.go index a384a992d5b..a45114f6ae0 100644 --- a/cloud/tasks/tasks_tests/tasks_test.go +++ b/cloud/tasks/tasks_tests/tasks_test.go @@ -312,6 +312,12 @@ func registerLongTask(registry *tasks.Registry) error { }) } +func registerLongTaskNotForExecution(registry *tasks.Registry) error { + return registry.Register("long", func() tasks.Task { + return &longTask{} + }) +} + func scheduleLongTask( ctx context.Context, scheduler tasks.Scheduler, @@ -1307,3 +1313,138 @@ func TestHangingTasksMetrics(t *testing.T) { gaugeUnsetWg.Wait() registry.AssertAllExpectations(t) } + +func TestHangingMetricsInitialization(t *testing.T) { + ctx, cancel := context.WithCancel(newContext()) + defer cancel() + + db, err := newYDB(ctx) + require.NoError(t, err) + defer db.Close(ctx) + + registry := mocks.NewIgnoreUnknownCallsRegistryMock() + + config := newHangingTaskTestConfig() + + s := createServicesWithConfig(t, ctx, db, config, registry) + err = registerHangingTask(s.registry) + require.NoError(t, err) + err = registerLongTaskNotForExecution(s.registry) + require.NoError(t, err) + + err = s.startRunners(ctx) + require.NoError(t, err) + + gaugeSetForExecutingTask := make(chan struct{}, 1) + gaugeSetForNonExecutingTask := make(chan struct{}, 1) + + registry.GetGauge( + "totalHangingTaskCount", + map[string]string{"type": "tasks.hanging"}, + ).On("Set", float64(0)).Return(mock.Anything).Run( + func(args mock.Arguments) { + select { + case gaugeSetForExecutingTask <- struct{}{}: + default: + } + }, + ) + + registry.GetGauge( + "totalHangingTaskCount", + map[string]string{"type": "long"}, + ).On("Set", float64(0)).Return(mock.Anything).Run( + func(args mock.Arguments) { + select { + case gaugeSetForNonExecutingTask <- struct{}{}: + default: + } + }, + ) + + select { + case <-gaugeSetForExecutingTask: + } + select { + case <-gaugeSetForNonExecutingTask: + } + + registry.AssertAllExpectations(t) +} + +func TestHangingTasksMetricsCleanup(t *testing.T) { + ctx, cancel := context.WithCancel(newContext()) + defer cancel() + + db, err := newYDB(ctx) + require.NoError(t, err) + // Do not defer db.Close(ctx) in this test. + + registry := mocks.NewIgnoreUnknownCallsRegistryMock() + + config := newHangingTaskTestConfig() + + s := createServicesWithConfig(t, ctx, db, config, registry) + err = registerHangingTask(s.registry) + require.NoError(t, err) + + err = s.startRunners(ctx) + require.NoError(t, err) + + reqCtx := getRequestContext(t, ctx) + taskID, err := scheduleHangingTask(reqCtx, s.scheduler) + require.NoError(t, err) + + gaugeSetWg := sync.WaitGroup{} + gaugeUnsetWg := sync.WaitGroup{} + + totalHangingTaskCountGaugeSetCall := registry.GetGauge( + "totalHangingTaskCount", + map[string]string{"type": "tasks.hanging"}, + ).On( + "Set", + float64(1), + ).Return(mock.Anything) + + gaugeSetWg.Add(1) + hangingTasksGaugeSetCall := registry.GetGauge( + "hangingTasks", + map[string]string{"type": "tasks.hanging", "id": taskID}, + ).On("Set", float64(1)).Return(mock.Anything).Run( + func(args mock.Arguments) { + gaugeSetWg.Done() + }, + ) + + registry.GetGauge( + "totalHangingTaskCount", + map[string]string{"type": "tasks.hanging"}, + ).On( + "Set", + float64(0), + ).NotBefore( + totalHangingTaskCountGaugeSetCall, + ).Return(mock.Anything) + + gaugeUnsetWg.Add(1) + registry.GetGauge( + "hangingTasks", + map[string]string{"type": "tasks.hanging", "id": taskID}, + ).On( + "Set", + float64(0), + ).NotBefore( + hangingTasksGaugeSetCall, + ).Return(mock.Anything).Run( + func(args mock.Arguments) { + gaugeUnsetWg.Done() + }, + ) + + gaugeSetWg.Wait() + // Close connection to YDB to enforce collectListerMetricsTask failure. + err = db.Close(ctx) + require.NoError(t, err) + gaugeUnsetWg.Wait() + registry.AssertAllExpectations(t) +} diff --git a/cloud/tasks/ya.make b/cloud/tasks/ya.make index 000ae3b100d..c581fe7b3d7 100644 --- a/cloud/tasks/ya.make +++ b/cloud/tasks/ya.make @@ -7,6 +7,7 @@ SRCS( controller.go execution_context.go lister.go + register_system_tasks.go registry.go runner.go runner_metrics.go