From c2ac4bfd9dffb02f575186eb0ec38d0bd73e62cf Mon Sep 17 00:00:00 2001 From: Mikhail Montsev Date: Fri, 26 Jan 2024 15:26:49 +0000 Subject: [PATCH] NBSNEBIUS-89: [Tasks] add 'crontab' parameters to TaskSchedule (not supported yet); refactor tasks/storage.CreateRegularTasks --- cloud/tasks/scheduler.go | 6 ++++++ cloud/tasks/scheduler_impl.go | 3 +++ cloud/tasks/storage/storage.go | 6 ++++++ cloud/tasks/storage/storage_ydb_impl.go | 20 +++++++------------- 4 files changed, 22 insertions(+), 13 deletions(-) diff --git a/cloud/tasks/scheduler.go b/cloud/tasks/scheduler.go index 316808985f9..ee070978d1f 100644 --- a/cloud/tasks/scheduler.go +++ b/cloud/tasks/scheduler.go @@ -13,6 +13,12 @@ import ( type TaskSchedule struct { ScheduleInterval time.Duration MaxTasksInflight int + + // Crontab params. + // Schedules task every day - only 'hour' and 'min' are supported. + UseCrontab bool // If set, ScheduleInterval is ignored. + Hour int // (0 - 23) + Min int // (0 - 59) } //////////////////////////////////////////////////////////////////////////////// diff --git a/cloud/tasks/scheduler_impl.go b/cloud/tasks/scheduler_impl.go index cefb1169723..71ddf027103 100644 --- a/cloud/tasks/scheduler_impl.go +++ b/cloud/tasks/scheduler_impl.go @@ -155,6 +155,9 @@ func (s *scheduler) ScheduleRegularTasks( schedule := tasks_storage.TaskSchedule{ ScheduleInterval: schedule.ScheduleInterval, MaxTasksInflight: schedule.MaxTasksInflight, + + Hour: schedule.Hour, + Min: schedule.Min, } err = s.storage.CreateRegularTasks(ctx, tasks_storage.TaskState{ diff --git a/cloud/tasks/storage/storage.go b/cloud/tasks/storage/storage.go index 19791f54304..afac182f9f0 100644 --- a/cloud/tasks/storage/storage.go +++ b/cloud/tasks/storage/storage.go @@ -258,6 +258,12 @@ type TaskInfo struct { type TaskSchedule struct { ScheduleInterval time.Duration MaxTasksInflight int + + // Crontab params. + // Schedules task every day - only 'hour' and 'min' are supported. + UseCrontab bool // If set, ScheduleInterval is ignored. + Hour int // (0 - 23) + Min int // (0 - 59) } //////////////////////////////////////////////////////////////////////////////// diff --git a/cloud/tasks/storage/storage_ydb_impl.go b/cloud/tasks/storage/storage_ydb_impl.go index caefd2c12f1..492cd84fc80 100644 --- a/cloud/tasks/storage/storage_ydb_impl.go +++ b/cloud/tasks/storage/storage_ydb_impl.go @@ -702,21 +702,19 @@ func (s *storageYDB) createRegularTasks( } } - scheduled := false + shouldSchedule := false if found { schedulingTime := schState.scheduledAt.Add(schedule.ScheduleInterval) if schState.tasksInflight == 0 && state.CreatedAt.After(schedulingTime) { - err := s.addRegularTasks(ctx, tx, state, schedule.MaxTasksInflight) - if err != nil { - return err - } - - schState.tasksInflight = uint64(schedule.MaxTasksInflight) - scheduled = true + shouldSchedule = true } } else { + shouldSchedule = true + } + + if shouldSchedule { err := s.addRegularTasks(ctx, tx, state, schedule.MaxTasksInflight) if err != nil { return err @@ -724,10 +722,6 @@ func (s *storageYDB) createRegularTasks( schState.taskType = state.TaskType schState.tasksInflight = uint64(schedule.MaxTasksInflight) - scheduled = true - } - - if scheduled { schState.scheduledAt = state.CreatedAt _, err = tx.Execute(ctx, fmt.Sprintf(` @@ -754,7 +748,7 @@ func (s *storageYDB) createRegularTasks( return err } - if scheduled { + if shouldSchedule { s.metrics.OnTaskCreated(state, schedule.MaxTasksInflight) }