From e434137751184938472e61fc6175d61552d0d2c4 Mon Sep 17 00:00:00 2001 From: zhangjian16 Date: Fri, 18 Oct 2024 10:52:38 +0800 Subject: [PATCH] =?UTF-8?q?[YUNIKORN-2879]=20[shim]=20yunikorn=20unschedul?= =?UTF-8?q?able=20pods=20pending=20forever=20*=20task=20postfail=20or=20re?= =?UTF-8?q?jected=20reschedule=EE=9B=AA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/cache/task.go | 111 +++++++++++--- pkg/cache/task_state.go | 53 ++++++- pkg/common/constants/constants.go | 7 + pkg/common/utils/utils.go | 18 +++ pkg/shim/scheduler_mock_test.go | 31 +++- pkg/shim/scheduler_test.go | 233 +++++++++++++++++++++++++++++- 6 files changed, 428 insertions(+), 25 deletions(-) diff --git a/pkg/cache/task.go b/pkg/cache/task.go index 8b758b6f6..9c85fed89 100644 --- a/pkg/cache/task.go +++ b/pkg/cache/task.go @@ -40,17 +40,20 @@ import ( ) type Task struct { - taskID string - alias string - applicationID string - application *Application - podStatus v1.PodStatus // pod status, maintained separately for efficiency reasons - context *Context - createTime time.Time - placeholder bool - originator bool - sm *fsm.FSM - + taskID string + alias string + applicationID string + application *Application + podStatus v1.PodStatus // pod status, maintained separately for efficiency reasons + context *Context + createTime time.Time + retryTimer *time.Timer // timer for task retry + placeholder bool + originator bool + sm *fsm.FSM + retryTimeInterval time.Duration + retryNum int + attempt int // mutable resources, require locking allocationKey string nodeName string @@ -109,6 +112,9 @@ func createTaskInternal(tid string, app *Application, resource *si.Resource, if tgName := utils.GetTaskGroupFromPodSpec(pod); tgName != "" { task.taskGroupName = tgName } + task.retryNum = utils.GetTaskRetryNumFromPodSpec(pod) + task.retryTimeInterval = utils.GetTaskRetryTimeIntervalFromPodSpec(pod) + task.initialize() return task } @@ -141,6 +147,13 @@ func (task *Task) GetTaskID() string { return task.taskID } +func (task *Task) GetTaskRetryNum() int { + return task.retryNum +} +func (task *Task) GetTaskRetryTimeInterval() time.Duration { + return task.retryTimeInterval +} + func (task *Task) IsPlaceholder() bool { return task.placeholder } @@ -375,8 +388,7 @@ func (task *Task) postTaskAllocated() { zap.String("podName", task.pod.Name), zap.String("podUID", string(task.pod.UID))) if err := task.context.bindPodVolumes(task.pod); err != nil { - log.Log(log.ShimCacheTask).Error("bind volumes to pod failed", zap.String("taskID", task.taskID), zap.Error(err)) - task.failWithEvent(fmt.Sprintf("bind volumes to pod failed, name: %s, %s", task.alias, err.Error()), "PodVolumesBindFailure") + task.RetryThenFailTask(fmt.Sprintf("bind volumes to pod failed, name: %s, %s", task.alias, err.Error()), "PodVolumesBindFailure") return } @@ -385,8 +397,7 @@ func (task *Task) postTaskAllocated() { zap.String("podUID", string(task.pod.UID))) if err := task.context.apiProvider.GetAPIs().KubeClient.Bind(task.pod, task.nodeName); err != nil { - log.Log(log.ShimCacheTask).Error("bind pod to node failed", zap.String("taskID", task.taskID), zap.Error(err)) - task.failWithEvent(fmt.Sprintf("bind pod to node failed, name: %s, %s", task.alias, err.Error()), "PodBindFailure") + task.RetryThenFailTask(fmt.Sprintf("bind pod to node failed, name: %s, %s", task.alias, err.Error()), "PodBindFailure") return } @@ -453,12 +464,8 @@ func (task *Task) postTaskBound() { } func (task *Task) postTaskRejected() { - // currently, once task is rejected by scheduler, we directly move task to failed state. - // so this function simply triggers the state transition when it is rejected. - // but further, we can introduce retry mechanism if necessary. - dispatcher.Dispatch(NewFailTaskEvent(task.applicationID, task.taskID, - fmt.Sprintf("task %s failed because it is rejected by scheduler", task.alias))) - + // once task is rejected by scheduler, we retry before move task to failed state. + task.RetryThenFailTask(fmt.Sprintf("task %s failed because it is rejected by scheduler", task.alias), "TaskRejected") events.GetRecorder().Eventf(task.pod.DeepCopy(), nil, v1.EventTypeWarning, "TaskRejected", "TaskRejected", "Task %s is rejected by the scheduler", task.alias) @@ -655,3 +662,65 @@ func (task *Task) SetTaskPod(pod *v1.Pod) { task.updateAllocation() } } +func (task *Task) setRetryTimer(timeout time.Duration, currentState string, event RetryTaskEvent) { + log.Log(log.ShimContext).Debug("Task retry timer initiated", + zap.String("appID", task.applicationID), + zap.String("TaskID", task.taskID), + zap.String("state", task.sm.Current()), + zap.Duration("timeout", timeout)) + + task.retryTimer = time.AfterFunc(timeout, task.timeoutRetryTimer(currentState, event)) +} + +func (task *Task) timeoutRetryTimer(expectedState string, event RetryTaskEvent) func() { + return func() { + task.lock.Lock() + defer task.lock.Unlock() + if expectedState == task.sm.Current() { + dispatcher.Dispatch(event) + } + + } +} + +func (task *Task) RetryThenFailTask(errorMessage, actionReason string) { + if task.attempt < task.retryNum { + log.Log(log.ShimCacheTask).Info("task failed, task will retrying", zap.String("taskID", task.taskID), zap.Int("attempt", task.attempt), zap.Int("retryNum", task.retryNum), zap.Duration("retryTimeInterval", task.retryTimeInterval), zap.String("errorMessage", errorMessage)) + task.attempt++ + task.setRetryTimer(task.retryTimeInterval, task.sm.Current(), NewRetryTaskEvent(task.applicationID, task.taskID, "retrying task")) + } else { + log.Log(log.ShimCacheTask).Error("task failed ", zap.String("taskID", task.taskID), zap.String("errorMessage", errorMessage)) + task.failWithEvent(errorMessage, actionReason) + } +} +func (task *Task) setRetryTimer(timeout time.Duration, currentState string, event RetryTaskEvent) { + log.Log(log.ShimContext).Debug("Task retry timer initiated", + zap.String("appID", task.applicationID), + zap.String("TaskID", task.taskID), + zap.String("state", task.sm.Current()), + zap.Duration("timeout", timeout)) + + task.retryTimer = time.AfterFunc(timeout, task.timeoutRetryTimer(currentState, event)) +} + +func (task *Task) timeoutRetryTimer(expectedState string, event RetryTaskEvent) func() { + return func() { + task.lock.Lock() + defer task.lock.Unlock() + if expectedState == task.sm.Current() { + dispatcher.Dispatch(event) + } + + } +} + +func (task *Task) RetryThenFailTask(errorMessage, actionReason string) { + if task.attempt < task.retryNum { + log.Log(log.ShimCacheTask).Info("task failed, task will retrying", zap.String("taskID", task.taskID), zap.Int("attempt", task.attempt), zap.Int("retryNum", task.retryNum), zap.Duration("retryTimeInterval", task.retryTimeInterval), zap.String("errorMessage", errorMessage)) + task.attempt++ + task.setRetryTimer(task.retryTimeInterval, task.sm.Current(), NewRetryTaskEvent(task.applicationID, task.taskID, "retrying task")) + } else { + log.Log(log.ShimCacheTask).Error("task failed ", zap.String("taskID", task.taskID), zap.String("errorMessage", errorMessage)) + task.failWithEvent(errorMessage, actionReason) + } +} diff --git a/pkg/cache/task_state.go b/pkg/cache/task_state.go index d244fa8d1..ef5900342 100644 --- a/pkg/cache/task_state.go +++ b/pkg/cache/task_state.go @@ -47,10 +47,11 @@ const ( TaskFail KillTask TaskKilled + TaskRetry ) func (ae TaskEventType) String() string { - return [...]string{"InitTask", "SubmitTask", "TaskAllocated", "TaskRejected", "TaskBound", "CompleteTask", "TaskFail", "KillTask", "TaskKilled"}[ae] + return [...]string{"InitTask", "SubmitTask", "TaskAllocated", "TaskRejected", "TaskBound", "CompleteTask", "TaskFail", "KillTask", "TaskKilled", "TaskRetry"}[ae] } // ------------------------ @@ -202,6 +203,20 @@ type FailTaskEvent struct { message string } +type RetryTaskEvent struct { + applicationID string + taskID string + event TaskEventType + message string +} + +type RetryTaskEvent struct { + applicationID string + taskID string + event TaskEventType + message string +} + func NewFailTaskEvent(appID string, taskID string, failedMessage string) FailTaskEvent { return FailTaskEvent{ applicationID: appID, @@ -211,6 +226,33 @@ func NewFailTaskEvent(appID string, taskID string, failedMessage string) FailTas } } +func NewRetryTaskEvent(appID string, taskID string, retryMessage string) RetryTaskEvent { + return RetryTaskEvent{ + applicationID: appID, + taskID: taskID, + event: TaskRetry, + message: retryMessage, + } +} + +func (fe RetryTaskEvent) GetEvent() string { + return fe.event.String() +} + +func (fe RetryTaskEvent) GetArgs() []interface{} { + args := make([]interface{}, 1) + args[0] = fe.message + return args +} + +func (fe RetryTaskEvent) GetTaskID() string { + return fe.taskID +} + +func (fe RetryTaskEvent) GetApplicationID() string { + return fe.applicationID +} + func (fe FailTaskEvent) GetEvent() string { return fe.event.String() } @@ -371,6 +413,11 @@ func eventDesc(states *TStates) fsm.Events { Src: []string{states.New, states.Pending, states.Scheduling, states.Rejected, states.Allocated}, Dst: states.Failed, }, + { + Name: TaskRetry.String(), + Src: []string{states.Rejected, states.Allocated}, + Dst: states.Pending, + }, } } @@ -433,6 +480,10 @@ func callbacks(states *TStates) fsm.Callbacks { task := event.Args[0].(*Task) //nolint:errcheck task.beforeTaskCompleted() }, + beforeHook(TaskRetry): func(_ context.Context, event *fsm.Event) { + task := event.Args[0].(*Task) //nolint:errcheck + task.releaseAllocation() + }, SubmitTask.String(): func(_ context.Context, event *fsm.Event) { task := event.Args[0].(*Task) //nolint:errcheck task.handleSubmitTaskEvent() diff --git a/pkg/common/constants/constants.go b/pkg/common/constants/constants.go index 09d9ea139..afc183a1b 100644 --- a/pkg/common/constants/constants.go +++ b/pkg/common/constants/constants.go @@ -20,6 +20,7 @@ package constants import ( siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common" + "time" ) // Common @@ -56,6 +57,9 @@ const DefaultAppNamespace = "default" const DefaultUserLabel = DomainYuniKorn + "username" const DefaultUser = "nobody" +const DefaultTaskRetryTimeInterval = 5 * time.Second +const DefaultTaskRetryNum = 5 + // Spark const SparkLabelAppID = "spark-app-selector" @@ -87,6 +91,9 @@ const SchedulingPolicyParamDelimiter = " " const SchedulingPolicyStyleParam = "gangSchedulingStyle" const SchedulingPolicyStyleParamDefault = "Soft" +const AnnotationTaskRetryName = DomainYuniKorn + "task-retry-num" +const AnnotationTaskRetryIntervalName = DomainYuniKorn + "task-retry-interval" + var SchedulingPolicyStyleParamValues = map[string]string{"Hard": "Hard", "Soft": "Soft"} const ApplicationInsufficientResourcesFailure = "ResourceReservationTimeout" diff --git a/pkg/common/utils/utils.go b/pkg/common/utils/utils.go index f58c96e53..cc0ea0e3a 100644 --- a/pkg/common/utils/utils.go +++ b/pkg/common/utils/utils.go @@ -462,6 +462,24 @@ func GetTaskGroupFromPodSpec(pod *v1.Pod) string { return GetPodAnnotationValue(pod, constants.AnnotationTaskGroupName) } +func GetTaskRetryNumFromPodSpec(pod *v1.Pod) int { + if value := GetPodAnnotationValue(pod, constants.AnnotationTaskRetryName); value != "" { + if v, err := strconv.Atoi(value); err == nil { + return v + } + } + return constants.DefaultTaskRetryNum +} + +func GetTaskRetryTimeIntervalFromPodSpec(pod *v1.Pod) time.Duration { + if value := GetPodAnnotationValue(pod, constants.AnnotationTaskRetryIntervalName); value != "" { + if v, err := time.ParseDuration(value); err == nil { + return v + } + } + return constants.DefaultTaskRetryTimeInterval +} + func GetPlaceholderFlagFromPodSpec(pod *v1.Pod) bool { if value := GetPodAnnotationValue(pod, constants.AnnotationPlaceholderFlag); value != "" { if v, err := strconv.ParseBool(value); err == nil { diff --git a/pkg/shim/scheduler_mock_test.go b/pkg/shim/scheduler_mock_test.go index 7b8de8556..a69b7a01d 100644 --- a/pkg/shim/scheduler_mock_test.go +++ b/pkg/shim/scheduler_mock_test.go @@ -162,13 +162,42 @@ func (fc *MockScheduler) waitAndAssertApplicationState(t *testing.T, appID, expe } } +func (fc *MockScheduler) waitAndAssertTaskStateWithRetryParam(t *testing.T, appID, taskID, expectedState string, retryNum int, retryTimeInterval time.Duration, timeInteval time.Duration) { + app := fc.context.GetApplication(appID) + assert.Equal(t, app != nil, true) + assert.Equal(t, app.GetApplicationID(), appID) + + task := app.GetTask(taskID) + assert.Equal(t, task.GetTaskRetryNum(), retryNum) + assert.Equal(t, task.GetTaskRetryTimeInterval(), retryTimeInterval) + deadline := time.Now().Add(timeInteval) + for { + if task.GetTaskState() == expectedState { + break + } + log.Log(log.Test).Info("waiting for task state", + zap.String("expected", expectedState), + zap.String("actual", task.GetTaskState())) + time.Sleep(time.Second) + if time.Now().After(deadline) { + t.Errorf("task %s doesn't reach expected state in given time, expecting: %s, actual: %s", + taskID, expectedState, task.GetTaskState()) + return + } + } +} + func (fc *MockScheduler) waitAndAssertTaskState(t *testing.T, appID, taskID, expectedState string) { + fc.waitAndAssertTaskStateAfterTimeInterval(t, appID, taskID, expectedState, 10*time.Second) +} + +func (fc *MockScheduler) waitAndAssertTaskStateAfterTimeInterval(t *testing.T, appID, taskID, expectedState string, timeInteval time.Duration) { app := fc.context.GetApplication(appID) assert.Equal(t, app != nil, true) assert.Equal(t, app.GetApplicationID(), appID) task := app.GetTask(taskID) - deadline := time.Now().Add(10 * time.Second) + deadline := time.Now().Add(timeInteval) for { if task.GetTaskState() == expectedState { break diff --git a/pkg/shim/scheduler_test.go b/pkg/shim/scheduler_test.go index e7ee19f3a..2e939a132 100644 --- a/pkg/shim/scheduler_test.go +++ b/pkg/shim/scheduler_test.go @@ -20,7 +20,9 @@ package shim import ( "fmt" + "strconv" "testing" + "time" "gotest.tools/v3/assert" v1 "k8s.io/api/core/v1" @@ -190,7 +192,7 @@ func TestSchedulerRegistrationFailed(t *testing.T) { shim.Stop() } -func TestTaskFailures(t *testing.T) { +func TestTaskRetrySuccessWithDefaultRetryParam(t *testing.T) { configData := ` partitions: - @@ -250,7 +252,80 @@ partitions: // wait for scheduling app and tasks // verify app state cluster.waitAndAssertApplicationState(t, "app0001", cache.ApplicationStates().Running) - cluster.waitAndAssertTaskState(t, "app0001", "task0001", cache.TaskStates().Failed) + //cluster.waitAndAssertTaskState(t, "app0001", "task0001", cache.TaskStates().Failed) + + // retry will succeed + cluster.waitAndAssertTaskStateWithRetryParam(t, "app0001", "task0001", cache.TaskStates().Allocated, constants.DefaultTaskRetryNum, constants.DefaultTaskRetryTimeInterval, 10*time.Second) + cluster.waitAndAssertTaskState(t, "app0001", "task0002", cache.TaskStates().Bound) + + // one task get bound, one ask failed, so we are expecting only 1 allocation in the scheduler + err = cluster.waitAndVerifySchedulerAllocations("root.a", + "[mycluster]default", "app0001", 2) + assert.NilError(t, err, "number of allocations is not expected, error") +} + +func TestTaskRetryFailureDefaultRetryParam(t *testing.T) { + configData := ` +partitions: + - + name: default + queues: + - + name: root + submitacl: "*" + queues: + - + name: a + resources: + guaranteed: + memory: 100000000 + vcore: 10 + max: + memory: 100000000 + vcore: 10 +` + // init and register scheduler + cluster := MockScheduler{} + cluster.init() + assert.NilError(t, cluster.start(), "failed to start cluster") + defer cluster.stop() + + // mock pod bind failures + cluster.apiProvider.MockBindFn(func(pod *v1.Pod, hostID string) error { + if pod.Name == "task0001" { + return fmt.Errorf("mocked error when binding the pod") + } + return nil + }) + + err := cluster.updateConfig(configData, nil) + assert.NilError(t, err, "update config failed") + + nodeLabels := map[string]string{ + "label1": "key1", + "label2": "key2", + } + // register nodes + err = cluster.addNode("test.host.01", nodeLabels, 100000000, 10, 10) + assert.NilError(t, err, "add node failed") + err = cluster.addNode("test.host.02", nodeLabels, 100000000, 10, 10) + assert.NilError(t, err, "add node failed") + + // create app and tasks + taskResource := common.NewResourceBuilder(). + AddResource(siCommon.Memory, 50000000). + AddResource(siCommon.CPU, 5). + Build() + task1 := createTestPod("root.a", "app0001", "task0001", taskResource) + task2 := createTestPod("root.a", "app0001", "task0002", taskResource) + cluster.AddPod(task1) + cluster.AddPod(task2) + + // wait for scheduling app and tasks + // verify app state + cluster.waitAndAssertApplicationState(t, "app0001", cache.ApplicationStates().Running) + // retry will fail + cluster.waitAndAssertTaskStateWithRetryParam(t, "app0001", "task0001", cache.TaskStates().Failed, constants.DefaultTaskRetryNum, constants.DefaultTaskRetryTimeInterval, 60*time.Second) cluster.waitAndAssertTaskState(t, "app0001", "task0002", cache.TaskStates().Bound) // one task get bound, one ask failed, so we are expecting only 1 allocation in the scheduler @@ -259,6 +334,160 @@ partitions: assert.NilError(t, err, "number of allocations is not expected, error") } +func TestTaskRetrySuccessSpecificRetryParam(t *testing.T) { + configData := ` +partitions: + - + name: default + queues: + - + name: root + submitacl: "*" + queues: + - + name: a + resources: + guaranteed: + memory: 100000000 + vcore: 10 + max: + memory: 100000000 + vcore: 10 +` + // init and register scheduler + cluster := MockScheduler{} + cluster.init() + assert.NilError(t, cluster.start(), "failed to start cluster") + defer cluster.stop() + + // mock pod bind failures + cluster.apiProvider.MockBindFn(func(pod *v1.Pod, hostID string) error { + if pod.Name == "task0001" { + return fmt.Errorf("mocked error when binding the pod") + } + return nil + }) + + err := cluster.updateConfig(configData, nil) + assert.NilError(t, err, "update config failed") + + nodeLabels := map[string]string{ + "label1": "key1", + "label2": "key2", + } + // register nodes + err = cluster.addNode("test.host.01", nodeLabels, 100000000, 10, 10) + assert.NilError(t, err, "add node failed") + err = cluster.addNode("test.host.02", nodeLabels, 100000000, 10, 10) + assert.NilError(t, err, "add node failed") + + // create app and tasks + taskResource := common.NewResourceBuilder(). + AddResource(siCommon.Memory, 50000000). + AddResource(siCommon.CPU, 5). + Build() + task1 := createTestPod("root.a", "app0001", "task0001", taskResource) + taskRetryNum := 3 + taskRetryTimeInterval := 2 * time.Second + annotations := map[string]string{ + "yunikorn.apache.org/task-retry-num": strconv.Itoa(taskRetryNum), + "yunikorn.apache.org/task-retry-interval": taskRetryTimeInterval.String(), + } + task1.SetAnnotations(annotations) + task2 := createTestPod("root.a", "app0001", "task0002", taskResource) + cluster.AddPod(task1) + cluster.AddPod(task2) + + // wait for scheduling app and tasks + // verify app state + cluster.waitAndAssertApplicationState(t, "app0001", cache.ApplicationStates().Running) + // retry will fail + cluster.waitAndAssertTaskStateWithRetryParam(t, "app0001", "task0001", cache.TaskStates().Allocated, taskRetryNum, taskRetryTimeInterval, 5*time.Second) + cluster.waitAndAssertTaskState(t, "app0001", "task0002", cache.TaskStates().Bound) + + // one task get bound, one ask failed, so we are expecting only 1 allocation in the scheduler + err = cluster.waitAndVerifySchedulerAllocations("root.a", + "[mycluster]default", "app0001", 2) + assert.NilError(t, err, "number of allocations is not expected, error") +} + +func TestTaskRetryFailureSpecificRetryParam(t *testing.T) { + configData := ` +partitions: + - + name: default + queues: + - + name: root + submitacl: "*" + queues: + - + name: a + resources: + guaranteed: + memory: 100000000 + vcore: 10 + max: + memory: 100000000 + vcore: 10 +` + // init and register scheduler + cluster := MockScheduler{} + cluster.init() + assert.NilError(t, cluster.start(), "failed to start cluster") + defer cluster.stop() + + // mock pod bind failures + cluster.apiProvider.MockBindFn(func(pod *v1.Pod, hostID string) error { + if pod.Name == "task0001" { + return fmt.Errorf("mocked error when binding the pod") + } + return nil + }) + + err := cluster.updateConfig(configData, nil) + assert.NilError(t, err, "update config failed") + + nodeLabels := map[string]string{ + "label1": "key1", + "label2": "key2", + } + // register nodes + err = cluster.addNode("test.host.01", nodeLabels, 100000000, 10, 10) + assert.NilError(t, err, "add node failed") + err = cluster.addNode("test.host.02", nodeLabels, 100000000, 10, 10) + assert.NilError(t, err, "add node failed") + + // create app and tasks + taskResource := common.NewResourceBuilder(). + AddResource(siCommon.Memory, 50000000). + AddResource(siCommon.CPU, 5). + Build() + task1 := createTestPod("root.a", "app0001", "task0001", taskResource) + taskRetryNum := 3 + taskRetryTimeInterval := 2 * time.Second + annotations := map[string]string{ + "yunikorn.apache.org/task-retry-num": strconv.Itoa(taskRetryNum), + "yunikorn.apache.org/task-retry-interval": taskRetryTimeInterval.String(), + } + task1.SetAnnotations(annotations) + task2 := createTestPod("root.a", "app0001", "task0002", taskResource) + cluster.AddPod(task1) + cluster.AddPod(task2) + + // wait for scheduling app and tasks + // verify app state + cluster.waitAndAssertApplicationState(t, "app0001", cache.ApplicationStates().Running) + // retry will fail + cluster.waitAndAssertTaskStateWithRetryParam(t, "app0001", "task0001", cache.TaskStates().Allocated, taskRetryNum, taskRetryTimeInterval, 10*time.Second) + cluster.waitAndAssertTaskState(t, "app0001", "task0002", cache.TaskStates().Bound) + + // one task get bound, one ask failed, so we are expecting only 1 allocation in the scheduler + err = cluster.waitAndVerifySchedulerAllocations("root.a", + "[mycluster]default", "app0001", 2) + assert.NilError(t, err, "number of allocations is not expected, error") +} + // simulate PVC error during Context.AssumePod() call func TestAssumePodError(t *testing.T) { configData := `