Skip to content

Commit

Permalink
NBSNEBIUS-89: [Tasks] add 'crontab' parameters to TaskSchedule (not s…
Browse files Browse the repository at this point in the history
…upported yet); refactor tasks/storage.CreateRegularTasks
  • Loading branch information
Mikhail Montsev committed Jan 26, 2024
1 parent 01ea147 commit c2ac4bf
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 13 deletions.
6 changes: 6 additions & 0 deletions cloud/tasks/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ import (
type TaskSchedule struct {
ScheduleInterval time.Duration
MaxTasksInflight int

// Crontab params.
// Schedules task every day - only 'hour' and 'min' are supported.
UseCrontab bool // If set, ScheduleInterval is ignored.
Hour int // (0 - 23)
Min int // (0 - 59)
}

////////////////////////////////////////////////////////////////////////////////
Expand Down
3 changes: 3 additions & 0 deletions cloud/tasks/scheduler_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ func (s *scheduler) ScheduleRegularTasks(
schedule := tasks_storage.TaskSchedule{
ScheduleInterval: schedule.ScheduleInterval,
MaxTasksInflight: schedule.MaxTasksInflight,

Hour: schedule.Hour,
Min: schedule.Min,
}

err = s.storage.CreateRegularTasks(ctx, tasks_storage.TaskState{
Expand Down
6 changes: 6 additions & 0 deletions cloud/tasks/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,12 @@ type TaskInfo struct {
type TaskSchedule struct {
ScheduleInterval time.Duration
MaxTasksInflight int

// Crontab params.
// Schedules task every day - only 'hour' and 'min' are supported.
UseCrontab bool // If set, ScheduleInterval is ignored.
Hour int // (0 - 23)
Min int // (0 - 59)
}

////////////////////////////////////////////////////////////////////////////////
Expand Down
20 changes: 7 additions & 13 deletions cloud/tasks/storage/storage_ydb_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,32 +702,26 @@ func (s *storageYDB) createRegularTasks(
}
}

scheduled := false
shouldSchedule := false

if found {
schedulingTime := schState.scheduledAt.Add(schedule.ScheduleInterval)

if schState.tasksInflight == 0 && state.CreatedAt.After(schedulingTime) {
err := s.addRegularTasks(ctx, tx, state, schedule.MaxTasksInflight)
if err != nil {
return err
}

schState.tasksInflight = uint64(schedule.MaxTasksInflight)
scheduled = true
shouldSchedule = true
}
} else {
shouldSchedule = true
}

if shouldSchedule {
err := s.addRegularTasks(ctx, tx, state, schedule.MaxTasksInflight)
if err != nil {
return err
}

schState.taskType = state.TaskType
schState.tasksInflight = uint64(schedule.MaxTasksInflight)
scheduled = true
}

if scheduled {
schState.scheduledAt = state.CreatedAt

_, err = tx.Execute(ctx, fmt.Sprintf(`
Expand All @@ -754,7 +748,7 @@ func (s *storageYDB) createRegularTasks(
return err
}

if scheduled {
if shouldSchedule {
s.metrics.OnTaskCreated(state, schedule.MaxTasksInflight)
}

Expand Down

0 comments on commit c2ac4bf

Please sign in to comment.