diff --git a/cloud/tasks/config/config.proto b/cloud/tasks/config/config.proto index 28cba666374..765c208b79c 100644 --- a/cloud/tasks/config/config.proto +++ b/cloud/tasks/config/config.proto @@ -38,11 +38,11 @@ message TasksConfig { repeated string ZoneIds = 23; optional string HearbeatInterval = 24 [default = "30s"]; - // The time window within which the node is considered alive. + // The time window within which the node is considered alive. optional string LivenessWindow = 25 [default = "30s"]; // Feature flag for enabling node eviction policy based - // on the health of the node and the number of running nodes. + // on the health of the node and the number of running nodes. optional bool NodeEvictionEnabled = 26 [default = false]; map InflightTaskPerNodeLimits = 27; // by task type @@ -50,5 +50,6 @@ message TasksConfig { optional string CollectListerMetricsTaskScheduleInterval = 28 [default = "10m"]; optional string ListerMetricsCollectionInterval = 29 [default = "1m"]; optional int64 MaxHangingTaskIDsToReport = 30 [default = 1000]; - optional uint64 MissedEstimatesUntilHanging = 31 [default = 2]; + // Number of missed estimated durations until task is considered hanging. + optional uint64 MissedEstimatesUntilTaskIsHanging = 31 [default = 2]; } diff --git a/cloud/tasks/execution_context.go b/cloud/tasks/execution_context.go index b60ae4b4b6e..1dc2fd266b1 100644 --- a/cloud/tasks/execution_context.go +++ b/cloud/tasks/execution_context.go @@ -30,7 +30,7 @@ type ExecutionContext interface { // Dependencies are automatically added by Scheduler.WaitTask. AddTaskDependency(ctx context.Context, taskID string) error - GetDeadline() time.Time + IsHanging() bool SetEstimate(estimatedDuration time.Duration) @@ -45,13 +45,14 @@ type ExecutionContext interface { //////////////////////////////////////////////////////////////////////////////// type executionContext struct { - task Task - storage storage.Storage - taskState storage.TaskState - taskStateMutex sync.Mutex - finished bool - hangingTaskTimeout time.Duration - missedEstimatesUntilHanging uint64 + task Task + storage storage.Storage + taskState storage.TaskState + taskStateMutex sync.Mutex + finished bool + + hangingTaskTimeout time.Duration + missedEstimatesUntilTaskIsHanging uint64 } // HACK from https://github.com/stretchr/testify/pull/694/files to avoid fake race detection @@ -116,26 +117,28 @@ func (c *executionContext) AddTaskDependency( }) } -func (c *executionContext) GetDeadline() time.Time { +func (c *executionContext) IsHanging() bool { c.taskStateMutex.Lock() defer c.taskStateMutex.Unlock() - var estimatedDuration time.Duration + now := time.Now() defaultDeadline := c.taskState.CreatedAt.Add(c.hangingTaskTimeout) + + var estimatedDuration time.Duration if c.taskState.EstimatedTime.After(c.taskState.CreatedAt) { estimatedDuration = c.taskState.EstimatedTime.Sub(c.taskState.CreatedAt) } else { - return defaultDeadline + return defaultDeadline.After(now) } deadline := c.taskState.CreatedAt.Add( - estimatedDuration * time.Duration(c.missedEstimatesUntilHanging), + estimatedDuration * time.Duration(c.missedEstimatesUntilTaskIsHanging), ) if deadline.Before(defaultDeadline) { - return defaultDeadline + return defaultDeadline.After(now) } - return deadline + return deadline.After(now) } func (c *executionContext) SetEstimate(estimatedDuration time.Duration) { @@ -359,15 +362,16 @@ func newExecutionContext( storage storage.Storage, taskState storage.TaskState, hangingTaskTimeout time.Duration, - missedEstimatesUntilHanging uint64, + missedEstimatesUntilTaskIsHanging uint64, ) *executionContext { return &executionContext{ - task: task, - storage: storage, - taskState: taskState, - hangingTaskTimeout: hangingTaskTimeout, - missedEstimatesUntilHanging: missedEstimatesUntilHanging, + task: task, + storage: storage, + taskState: taskState, + + hangingTaskTimeout: hangingTaskTimeout, + missedEstimatesUntilTaskIsHanging: missedEstimatesUntilTaskIsHanging, } } diff --git a/cloud/tasks/mocks/execution_context_mock.go b/cloud/tasks/mocks/execution_context_mock.go index f08e69e6db1..7ecdb9ae0a9 100644 --- a/cloud/tasks/mocks/execution_context_mock.go +++ b/cloud/tasks/mocks/execution_context_mock.go @@ -48,9 +48,9 @@ func (c *ExecutionContextMock) AddTaskDependency( return args.Error(0) } -func (c *ExecutionContextMock) GetDeadline() time.Time { - c.Called() - return time.Time{} +func (c *ExecutionContextMock) IsHanging() bool { + args := c.Called() + return args.Bool(0) } func (c *ExecutionContextMock) SetEstimate(estimatedDuration time.Duration) { diff --git a/cloud/tasks/runner.go b/cloud/tasks/runner.go index 8147312e56d..ef8d5594866 100644 --- a/cloud/tasks/runner.go +++ b/cloud/tasks/runner.go @@ -70,18 +70,19 @@ type runner interface { //////////////////////////////////////////////////////////////////////////////// type runnerForRun struct { - storage storage.Storage - registry *Registry - metrics runnerMetrics - channel *channel - pingPeriod time.Duration - pingTimeout time.Duration - host string - id string - maxRetriableErrorCount uint64 - maxPanicCount uint64 - hangingTaskTimeout time.Duration - missedEstimatesUntilHanging uint64 + storage storage.Storage + registry *Registry + metrics runnerMetrics + channel *channel + pingPeriod time.Duration + pingTimeout time.Duration + host string + id string + maxRetriableErrorCount uint64 + maxPanicCount uint64 + + hangingTaskTimeout time.Duration + missedEstimatesUntilTaskIsHanging uint64 } func (r *runnerForRun) receiveTask( @@ -319,23 +320,24 @@ func (r *runnerForRun) lockAndExecuteTask( r, taskInfo, r.hangingTaskTimeout, - r.missedEstimatesUntilHanging, + r.missedEstimatesUntilTaskIsHanging, ) } //////////////////////////////////////////////////////////////////////////////// type runnerForCancel struct { - storage storage.Storage - registry *Registry - metrics runnerMetrics - channel *channel - pingPeriod time.Duration - pingTimeout time.Duration - host string - id string - hangingTaskTimeout time.Duration - missedEstimatesUntilHanging uint64 + storage storage.Storage + registry *Registry + metrics runnerMetrics + channel *channel + pingPeriod time.Duration + pingTimeout time.Duration + host string + id string + + hangingTaskTimeout time.Duration + missedEstimatesUntilTaskIsHanging uint64 } func (r *runnerForCancel) receiveTask( @@ -445,7 +447,7 @@ func (r *runnerForCancel) lockAndExecuteTask( r, taskInfo, r.hangingTaskTimeout, - r.missedEstimatesUntilHanging, + r.missedEstimatesUntilTaskIsHanging, ) } @@ -512,7 +514,7 @@ func lockAndExecuteTask( runner runner, taskInfo storage.TaskInfo, hangingTaskTimeout time.Duration, - missedEstimatesUntilHanging uint64, + missedEstimatesUntilTaskIsHanging uint64, ) error { taskState, err := runner.lockTask(ctx, taskInfo) @@ -581,7 +583,7 @@ func lockAndExecuteTask( taskStorage, taskState, hangingTaskTimeout, - missedEstimatesUntilHanging, + missedEstimatesUntilTaskIsHanging, ) pingCtx, cancelPing := context.WithCancel(ctx) @@ -643,7 +645,7 @@ func startRunner( idForCancel string, maxRetriableErrorCount uint64, maxPanicCount uint64, - missedEstimatesUntilHanging uint64, + missedEstimatesUntilTaskIsHanging uint64, ) error { // TODO: More granular control on runners and cancellers. @@ -656,18 +658,19 @@ func startRunner( ) go runnerLoop(ctx, registry, &runnerForRun{ - storage: taskStorage, - registry: registry, - metrics: runnerForRunMetrics, - channel: channelForRun, - pingPeriod: pingPeriod, - pingTimeout: pingTimeout, - host: host, - id: idForRun, - maxRetriableErrorCount: maxRetriableErrorCount, - maxPanicCount: maxPanicCount, - hangingTaskTimeout: hangingTaskTimeout, - missedEstimatesUntilHanging: missedEstimatesUntilHanging, + storage: taskStorage, + registry: registry, + metrics: runnerForRunMetrics, + channel: channelForRun, + pingPeriod: pingPeriod, + pingTimeout: pingTimeout, + host: host, + id: idForRun, + maxRetriableErrorCount: maxRetriableErrorCount, + maxPanicCount: maxPanicCount, + + hangingTaskTimeout: hangingTaskTimeout, + missedEstimatesUntilTaskIsHanging: missedEstimatesUntilTaskIsHanging, }) runnerForCancelMetrics := newRunnerMetrics( @@ -678,16 +681,17 @@ func startRunner( ) go runnerLoop(ctx, registry, &runnerForCancel{ - storage: taskStorage, - registry: registry, - metrics: runnerForCancelMetrics, - channel: channelForCancel, - pingPeriod: pingPeriod, - pingTimeout: pingTimeout, - host: host, - id: idForCancel, - hangingTaskTimeout: hangingTaskTimeout, - missedEstimatesUntilHanging: missedEstimatesUntilHanging, + storage: taskStorage, + registry: registry, + metrics: runnerForCancelMetrics, + channel: channelForCancel, + pingPeriod: pingPeriod, + pingTimeout: pingTimeout, + host: host, + id: idForCancel, + + hangingTaskTimeout: hangingTaskTimeout, + missedEstimatesUntilTaskIsHanging: missedEstimatesUntilTaskIsHanging, }) return nil @@ -708,7 +712,7 @@ func startRunners( host string, maxRetriableErrorCount uint64, maxPanicCount uint64, - missedEstimatesUntilHanging uint64, + missedEstimatesUntilTaskIsHanging uint64, ) error { for i := uint64(0); i < runnerCount; i++ { @@ -728,7 +732,7 @@ func startRunners( fmt.Sprintf("cancel_%v", i), maxRetriableErrorCount, maxPanicCount, - missedEstimatesUntilHanging, + missedEstimatesUntilTaskIsHanging, ) if err != nil { return fmt.Errorf("failed to start runner #%d: %w", i, err) @@ -753,7 +757,7 @@ func startStalkingRunners( host string, maxRetriableErrorCount uint64, maxPanicCount uint64, - missedEstimatesUntilHanging uint64, + missedEstimatesUntilTaskIsHanging uint64, ) error { for i := uint64(0); i < runnerCount; i++ { @@ -773,7 +777,7 @@ func startStalkingRunners( fmt.Sprintf("stalker_cancel_%v", i), maxRetriableErrorCount, maxPanicCount, - missedEstimatesUntilHanging, + missedEstimatesUntilTaskIsHanging, ) if err != nil { return fmt.Errorf("failed to start stalking runner #%d: %w", i, err) @@ -924,7 +928,7 @@ func StartRunners( host, config.GetMaxRetriableErrorCount(), config.GetMaxPanicCount(), - config.GetMissedEstimatesUntilHanging(), + config.GetMissedEstimatesUntilTaskIsHanging(), ) if err != nil { return err @@ -976,7 +980,7 @@ func StartRunners( host, config.GetMaxRetriableErrorCount(), config.GetMaxPanicCount(), - config.GetMissedEstimatesUntilHanging(), + config.GetMissedEstimatesUntilTaskIsHanging(), ) if err != nil { return err diff --git a/cloud/tasks/runner_metrics.go b/cloud/tasks/runner_metrics.go index 48a59342e6b..3a0b877ba5e 100644 --- a/cloud/tasks/runner_metrics.go +++ b/cloud/tasks/runner_metrics.go @@ -112,11 +112,11 @@ func (m *runnerMetricsImpl) OnExecutionStarted(execCtx ExecutionContext) { return case <-time.After(checkTaskHangingPeriod): } - m.checkTaskHanging(ctx, execCtx.GetDeadline()) + m.setTaskHanging(ctx, execCtx.IsHanging()) } }() - m.checkTaskHangingImpl(execCtx.GetDeadline()) + m.setTaskHangingImpl(execCtx.IsHanging()) } m.taskMetrics.inflightTasksGauge.Add(1) @@ -131,7 +131,7 @@ func (m *runnerMetricsImpl) OnExecutionStopped() { return } - m.setTaskHanging(false) + m.setTaskHangingImpl(false) m.taskMetrics.inflightTasksGauge.Add(-1) m.taskMetrics = nil @@ -187,7 +187,7 @@ func (m *runnerMetricsImpl) OnError(err error) { //////////////////////////////////////////////////////////////////////////////// -func (m *runnerMetricsImpl) setTaskHanging(value bool) { +func (m *runnerMetricsImpl) setTaskHangingImpl(value bool) { prevValue := m.taskMetrics.isTaskHanging m.taskMetrics.isTaskHanging = value @@ -204,19 +204,7 @@ func (m *runnerMetricsImpl) setTaskHanging(value bool) { } } -func (m *runnerMetricsImpl) checkTaskHangingImpl(deadline time.Time) { - if m.taskMetrics == nil { - return - } - - m.setTaskHanging(time.Now().After(deadline)) -} - -func (m *runnerMetricsImpl) checkTaskHanging( - ctx context.Context, - deadline time.Time, -) { - +func (m *runnerMetricsImpl) setTaskHanging(ctx context.Context, value bool) { m.taskMetricsMutex.Lock() defer m.taskMetricsMutex.Unlock() @@ -224,7 +212,7 @@ func (m *runnerMetricsImpl) checkTaskHanging( return } - m.checkTaskHangingImpl(deadline) + m.setTaskHangingImpl(value) } //////////////////////////////////////////////////////////////////////////////// diff --git a/cloud/tasks/scheduler_test.go b/cloud/tasks/scheduler_test.go index 1f6fe77bffb..9ea50a5b1d0 100644 --- a/cloud/tasks/scheduler_test.go +++ b/cloud/tasks/scheduler_test.go @@ -86,9 +86,9 @@ func (c *executionContextMock) AddTaskDependency( return args.Error(0) } -func (c *executionContextMock) GetDeadline() time.Time { - c.Called() - return time.Time{} +func (c *executionContextMock) IsHanging() bool { + args := c.Called() + return args.Bool(0) } func (c *executionContextMock) SetEstimate(estimatedDuration time.Duration) { diff --git a/cloud/tasks/storage/compound_storage.go b/cloud/tasks/storage/compound_storage.go index 227e800d162..79d8c19ffdf 100644 --- a/cloud/tasks/storage/compound_storage.go +++ b/cloud/tasks/storage/compound_storage.go @@ -467,16 +467,17 @@ func NewStorage( newStorage := func(storageFolder string, metrics storageMetrics) *storageYDB { return &storageYDB{ - db: db, - folder: storageFolder, - tablesPath: db.AbsolutePath(storageFolder), - taskStallingTimeout: taskStallingTimeout, - updateTaskTimeout: updateTaskTimeout, - livenessWindow: livenessWindow, - ZoneIDs: config.GetZoneIds(), - metrics: metrics, - hangingTaskTimeout: hangingTaskTimeout, - missedEstimatesUntilHanging: config.GetMissedEstimatesUntilHanging(), + db: db, + folder: storageFolder, + tablesPath: db.AbsolutePath(storageFolder), + taskStallingTimeout: taskStallingTimeout, + updateTaskTimeout: updateTaskTimeout, + livenessWindow: livenessWindow, + ZoneIDs: config.GetZoneIds(), + metrics: metrics, + + hangingTaskTimeout: hangingTaskTimeout, + missedEstimatesUntilTaskIsHanging: config.GetMissedEstimatesUntilTaskIsHanging(), } } diff --git a/cloud/tasks/storage/storage_ydb.go b/cloud/tasks/storage/storage_ydb.go index 7b3293df2ad..e050b87bd43 100644 --- a/cloud/tasks/storage/storage_ydb.go +++ b/cloud/tasks/storage/storage_ydb.go @@ -10,16 +10,17 @@ import ( //////////////////////////////////////////////////////////////////////////////// type storageYDB struct { - db *persistence.YDBClient - folder string - tablesPath string - taskStallingTimeout time.Duration - updateTaskTimeout time.Duration - livenessWindow time.Duration - ZoneIDs []string - metrics storageMetrics - hangingTaskTimeout time.Duration - missedEstimatesUntilHanging uint64 + db *persistence.YDBClient + folder string + tablesPath string + taskStallingTimeout time.Duration + updateTaskTimeout time.Duration + livenessWindow time.Duration + ZoneIDs []string + metrics storageMetrics + + hangingTaskTimeout time.Duration + missedEstimatesUntilTaskIsHanging uint64 } func (s *storageYDB) CreateTask( diff --git a/cloud/tasks/storage/storage_ydb_impl.go b/cloud/tasks/storage/storage_ydb_impl.go index 3c88c80bcb9..6c54f529e9b 100644 --- a/cloud/tasks/storage/storage_ydb_impl.go +++ b/cloud/tasks/storage/storage_ydb_impl.go @@ -942,9 +942,9 @@ func (s *storageYDB) listHangingTasks( declare $limit as Uint64; declare $except_task_types as List; declare $hanging_task_timeout as Interval; - declare $missed_estimates_until_hanging as Uint64; + declare $missed_estimates_until_task_is_hanging as Uint64; declare $now as Timestamp; - + $task_ids = ( select id from ready_to_run UNION ALL select id from running UNION ALL @@ -952,17 +952,17 @@ func (s *storageYDB) listHangingTasks( select id from cancelling ); select * from tasks - where id in $task_ids and + where id in $task_ids and ( (ListLength($except_task_types) == 0) or (task_type not in $except_task_types) - ) and + ) and ( - (estimated_time == DateTime::FromSeconds(0) and $now >= created_at + $hanging_task_timeout) or + (estimated_time == DateTime::FromSeconds(0) and $now >= created_at + $hanging_task_timeout) or ( estimated_time > created_at and $now >= MAX_OF( - created_at + (estimated_time - created_at) * $missed_estimates_until_hanging, + created_at + (estimated_time - created_at) * $missed_estimates_until_task_is_hanging, created_at + $hanging_task_timeout ) ) @@ -978,8 +978,8 @@ func (s *storageYDB) listHangingTasks( persistence.IntervalValue(s.hangingTaskTimeout), ), persistence.ValueParam( - "$missed_estimates_until_hanging", - persistence.Uint64Value(s.missedEstimatesUntilHanging), + "$missed_estimates_until_task_is_hanging", + persistence.Uint64Value(s.missedEstimatesUntilTaskIsHanging), ), persistence.ValueParam("$now", persistence.TimestampValue(now)), )