Skip to content

Commit

Permalink
[YUNIKORN-2879] [shim] yunikorn unschedulable pods pending forever
Browse files Browse the repository at this point in the history
* task postfail or rejected reschedule
  • Loading branch information
zhangjian16 committed Oct 18, 2024
1 parent ee8d1c0 commit e434137
Show file tree
Hide file tree
Showing 6 changed files with 428 additions and 25 deletions.
111 changes: 90 additions & 21 deletions pkg/cache/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,20 @@ import (
)

type Task struct {
taskID string
alias string
applicationID string
application *Application
podStatus v1.PodStatus // pod status, maintained separately for efficiency reasons
context *Context
createTime time.Time
placeholder bool
originator bool
sm *fsm.FSM

taskID string
alias string
applicationID string
application *Application
podStatus v1.PodStatus // pod status, maintained separately for efficiency reasons
context *Context
createTime time.Time
retryTimer *time.Timer // timer for task retry
placeholder bool
originator bool
sm *fsm.FSM
retryTimeInterval time.Duration
retryNum int
attempt int
// mutable resources, require locking
allocationKey string
nodeName string
Expand Down Expand Up @@ -109,6 +112,9 @@ func createTaskInternal(tid string, app *Application, resource *si.Resource,
if tgName := utils.GetTaskGroupFromPodSpec(pod); tgName != "" {
task.taskGroupName = tgName
}
task.retryNum = utils.GetTaskRetryNumFromPodSpec(pod)
task.retryTimeInterval = utils.GetTaskRetryTimeIntervalFromPodSpec(pod)

task.initialize()
return task
}
Expand Down Expand Up @@ -141,6 +147,13 @@ func (task *Task) GetTaskID() string {
return task.taskID
}

func (task *Task) GetTaskRetryNum() int {
return task.retryNum
}
func (task *Task) GetTaskRetryTimeInterval() time.Duration {
return task.retryTimeInterval
}

func (task *Task) IsPlaceholder() bool {
return task.placeholder
}
Expand Down Expand Up @@ -375,8 +388,7 @@ func (task *Task) postTaskAllocated() {
zap.String("podName", task.pod.Name),
zap.String("podUID", string(task.pod.UID)))
if err := task.context.bindPodVolumes(task.pod); err != nil {
log.Log(log.ShimCacheTask).Error("bind volumes to pod failed", zap.String("taskID", task.taskID), zap.Error(err))
task.failWithEvent(fmt.Sprintf("bind volumes to pod failed, name: %s, %s", task.alias, err.Error()), "PodVolumesBindFailure")
task.RetryThenFailTask(fmt.Sprintf("bind volumes to pod failed, name: %s, %s", task.alias, err.Error()), "PodVolumesBindFailure")
return
}

Expand All @@ -385,8 +397,7 @@ func (task *Task) postTaskAllocated() {
zap.String("podUID", string(task.pod.UID)))

if err := task.context.apiProvider.GetAPIs().KubeClient.Bind(task.pod, task.nodeName); err != nil {
log.Log(log.ShimCacheTask).Error("bind pod to node failed", zap.String("taskID", task.taskID), zap.Error(err))
task.failWithEvent(fmt.Sprintf("bind pod to node failed, name: %s, %s", task.alias, err.Error()), "PodBindFailure")
task.RetryThenFailTask(fmt.Sprintf("bind pod to node failed, name: %s, %s", task.alias, err.Error()), "PodBindFailure")
return
}

Expand Down Expand Up @@ -453,12 +464,8 @@ func (task *Task) postTaskBound() {
}

func (task *Task) postTaskRejected() {
// currently, once task is rejected by scheduler, we directly move task to failed state.
// so this function simply triggers the state transition when it is rejected.
// but further, we can introduce retry mechanism if necessary.
dispatcher.Dispatch(NewFailTaskEvent(task.applicationID, task.taskID,
fmt.Sprintf("task %s failed because it is rejected by scheduler", task.alias)))

// once task is rejected by scheduler, we retry before move task to failed state.
task.RetryThenFailTask(fmt.Sprintf("task %s failed because it is rejected by scheduler", task.alias), "TaskRejected")
events.GetRecorder().Eventf(task.pod.DeepCopy(), nil,
v1.EventTypeWarning, "TaskRejected", "TaskRejected",
"Task %s is rejected by the scheduler", task.alias)
Expand Down Expand Up @@ -655,3 +662,65 @@ func (task *Task) SetTaskPod(pod *v1.Pod) {
task.updateAllocation()
}
}
func (task *Task) setRetryTimer(timeout time.Duration, currentState string, event RetryTaskEvent) {
log.Log(log.ShimContext).Debug("Task retry timer initiated",
zap.String("appID", task.applicationID),
zap.String("TaskID", task.taskID),
zap.String("state", task.sm.Current()),
zap.Duration("timeout", timeout))

task.retryTimer = time.AfterFunc(timeout, task.timeoutRetryTimer(currentState, event))
}

func (task *Task) timeoutRetryTimer(expectedState string, event RetryTaskEvent) func() {
return func() {
task.lock.Lock()
defer task.lock.Unlock()
if expectedState == task.sm.Current() {
dispatcher.Dispatch(event)
}

}
}

func (task *Task) RetryThenFailTask(errorMessage, actionReason string) {
if task.attempt < task.retryNum {
log.Log(log.ShimCacheTask).Info("task failed, task will retrying", zap.String("taskID", task.taskID), zap.Int("attempt", task.attempt), zap.Int("retryNum", task.retryNum), zap.Duration("retryTimeInterval", task.retryTimeInterval), zap.String("errorMessage", errorMessage))
task.attempt++
task.setRetryTimer(task.retryTimeInterval, task.sm.Current(), NewRetryTaskEvent(task.applicationID, task.taskID, "retrying task"))
} else {
log.Log(log.ShimCacheTask).Error("task failed ", zap.String("taskID", task.taskID), zap.String("errorMessage", errorMessage))
task.failWithEvent(errorMessage, actionReason)
}
}
func (task *Task) setRetryTimer(timeout time.Duration, currentState string, event RetryTaskEvent) {

Check failure on line 696 in pkg/cache/task.go

View workflow job for this annotation

GitHub Actions / build

method Task.setRetryTimer already declared at pkg/cache/task.go:665:19

Check failure on line 696 in pkg/cache/task.go

View workflow job for this annotation

GitHub Actions / build

method Task.setRetryTimer already declared at pkg/cache/task.go:665:19
log.Log(log.ShimContext).Debug("Task retry timer initiated",
zap.String("appID", task.applicationID),
zap.String("TaskID", task.taskID),
zap.String("state", task.sm.Current()),
zap.Duration("timeout", timeout))

task.retryTimer = time.AfterFunc(timeout, task.timeoutRetryTimer(currentState, event))
}

func (task *Task) timeoutRetryTimer(expectedState string, event RetryTaskEvent) func() {

Check failure on line 706 in pkg/cache/task.go

View workflow job for this annotation

GitHub Actions / build

method Task.timeoutRetryTimer already declared at pkg/cache/task.go:675:19
return func() {
task.lock.Lock()
defer task.lock.Unlock()
if expectedState == task.sm.Current() {
dispatcher.Dispatch(event)
}

}
}

func (task *Task) RetryThenFailTask(errorMessage, actionReason string) {

Check failure on line 717 in pkg/cache/task.go

View workflow job for this annotation

GitHub Actions / build

method Task.RetryThenFailTask already declared at pkg/cache/task.go:686:19) (typecheck)
if task.attempt < task.retryNum {
log.Log(log.ShimCacheTask).Info("task failed, task will retrying", zap.String("taskID", task.taskID), zap.Int("attempt", task.attempt), zap.Int("retryNum", task.retryNum), zap.Duration("retryTimeInterval", task.retryTimeInterval), zap.String("errorMessage", errorMessage))
task.attempt++
task.setRetryTimer(task.retryTimeInterval, task.sm.Current(), NewRetryTaskEvent(task.applicationID, task.taskID, "retrying task"))
} else {
log.Log(log.ShimCacheTask).Error("task failed ", zap.String("taskID", task.taskID), zap.String("errorMessage", errorMessage))
task.failWithEvent(errorMessage, actionReason)
}
}
53 changes: 52 additions & 1 deletion pkg/cache/task_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,11 @@ const (
TaskFail
KillTask
TaskKilled
TaskRetry
)

func (ae TaskEventType) String() string {
return [...]string{"InitTask", "SubmitTask", "TaskAllocated", "TaskRejected", "TaskBound", "CompleteTask", "TaskFail", "KillTask", "TaskKilled"}[ae]
return [...]string{"InitTask", "SubmitTask", "TaskAllocated", "TaskRejected", "TaskBound", "CompleteTask", "TaskFail", "KillTask", "TaskKilled", "TaskRetry"}[ae]
}

// ------------------------
Expand Down Expand Up @@ -202,6 +203,20 @@ type FailTaskEvent struct {
message string
}

type RetryTaskEvent struct {

Check failure on line 206 in pkg/cache/task_state.go

View workflow job for this annotation

GitHub Actions / build

other declaration of RetryTaskEvent

Check failure on line 206 in pkg/cache/task_state.go

View workflow job for this annotation

GitHub Actions / build

other declaration of RetryTaskEvent
applicationID string
taskID string
event TaskEventType
message string
}

type RetryTaskEvent struct {

Check failure on line 213 in pkg/cache/task_state.go

View workflow job for this annotation

GitHub Actions / build

RetryTaskEvent redeclared in this block

Check failure on line 213 in pkg/cache/task_state.go

View workflow job for this annotation

GitHub Actions / build

RetryTaskEvent redeclared in this block
applicationID string
taskID string
event TaskEventType
message string
}

func NewFailTaskEvent(appID string, taskID string, failedMessage string) FailTaskEvent {
return FailTaskEvent{
applicationID: appID,
Expand All @@ -211,6 +226,33 @@ func NewFailTaskEvent(appID string, taskID string, failedMessage string) FailTas
}
}

func NewRetryTaskEvent(appID string, taskID string, retryMessage string) RetryTaskEvent {
return RetryTaskEvent{
applicationID: appID,
taskID: taskID,
event: TaskRetry,
message: retryMessage,
}
}

func (fe RetryTaskEvent) GetEvent() string {
return fe.event.String()
}

func (fe RetryTaskEvent) GetArgs() []interface{} {
args := make([]interface{}, 1)
args[0] = fe.message
return args
}

func (fe RetryTaskEvent) GetTaskID() string {
return fe.taskID
}

func (fe RetryTaskEvent) GetApplicationID() string {
return fe.applicationID
}

func (fe FailTaskEvent) GetEvent() string {
return fe.event.String()
}
Expand Down Expand Up @@ -371,6 +413,11 @@ func eventDesc(states *TStates) fsm.Events {
Src: []string{states.New, states.Pending, states.Scheduling, states.Rejected, states.Allocated},
Dst: states.Failed,
},
{
Name: TaskRetry.String(),
Src: []string{states.Rejected, states.Allocated},
Dst: states.Pending,
},
}
}

Expand Down Expand Up @@ -433,6 +480,10 @@ func callbacks(states *TStates) fsm.Callbacks {
task := event.Args[0].(*Task) //nolint:errcheck
task.beforeTaskCompleted()
},
beforeHook(TaskRetry): func(_ context.Context, event *fsm.Event) {
task := event.Args[0].(*Task) //nolint:errcheck
task.releaseAllocation()
},
SubmitTask.String(): func(_ context.Context, event *fsm.Event) {
task := event.Args[0].(*Task) //nolint:errcheck
task.handleSubmitTaskEvent()
Expand Down
7 changes: 7 additions & 0 deletions pkg/common/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package constants

import (
siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
"time"
)

// Common
Expand Down Expand Up @@ -56,6 +57,9 @@ const DefaultAppNamespace = "default"
const DefaultUserLabel = DomainYuniKorn + "username"
const DefaultUser = "nobody"

const DefaultTaskRetryTimeInterval = 5 * time.Second
const DefaultTaskRetryNum = 5

// Spark
const SparkLabelAppID = "spark-app-selector"

Expand Down Expand Up @@ -87,6 +91,9 @@ const SchedulingPolicyParamDelimiter = " "
const SchedulingPolicyStyleParam = "gangSchedulingStyle"
const SchedulingPolicyStyleParamDefault = "Soft"

const AnnotationTaskRetryName = DomainYuniKorn + "task-retry-num"
const AnnotationTaskRetryIntervalName = DomainYuniKorn + "task-retry-interval"

var SchedulingPolicyStyleParamValues = map[string]string{"Hard": "Hard", "Soft": "Soft"}

const ApplicationInsufficientResourcesFailure = "ResourceReservationTimeout"
Expand Down
18 changes: 18 additions & 0 deletions pkg/common/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,24 @@ func GetTaskGroupFromPodSpec(pod *v1.Pod) string {
return GetPodAnnotationValue(pod, constants.AnnotationTaskGroupName)
}

func GetTaskRetryNumFromPodSpec(pod *v1.Pod) int {
if value := GetPodAnnotationValue(pod, constants.AnnotationTaskRetryName); value != "" {
if v, err := strconv.Atoi(value); err == nil {
return v
}
}
return constants.DefaultTaskRetryNum
}

func GetTaskRetryTimeIntervalFromPodSpec(pod *v1.Pod) time.Duration {
if value := GetPodAnnotationValue(pod, constants.AnnotationTaskRetryIntervalName); value != "" {
if v, err := time.ParseDuration(value); err == nil {
return v
}
}
return constants.DefaultTaskRetryTimeInterval
}

func GetPlaceholderFlagFromPodSpec(pod *v1.Pod) bool {
if value := GetPodAnnotationValue(pod, constants.AnnotationPlaceholderFlag); value != "" {
if v, err := strconv.ParseBool(value); err == nil {
Expand Down
31 changes: 30 additions & 1 deletion pkg/shim/scheduler_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,13 +162,42 @@ func (fc *MockScheduler) waitAndAssertApplicationState(t *testing.T, appID, expe
}
}

func (fc *MockScheduler) waitAndAssertTaskStateWithRetryParam(t *testing.T, appID, taskID, expectedState string, retryNum int, retryTimeInterval time.Duration, timeInteval time.Duration) {
app := fc.context.GetApplication(appID)
assert.Equal(t, app != nil, true)
assert.Equal(t, app.GetApplicationID(), appID)

task := app.GetTask(taskID)
assert.Equal(t, task.GetTaskRetryNum(), retryNum)
assert.Equal(t, task.GetTaskRetryTimeInterval(), retryTimeInterval)
deadline := time.Now().Add(timeInteval)
for {
if task.GetTaskState() == expectedState {
break
}
log.Log(log.Test).Info("waiting for task state",
zap.String("expected", expectedState),
zap.String("actual", task.GetTaskState()))
time.Sleep(time.Second)
if time.Now().After(deadline) {
t.Errorf("task %s doesn't reach expected state in given time, expecting: %s, actual: %s",
taskID, expectedState, task.GetTaskState())
return
}
}
}

func (fc *MockScheduler) waitAndAssertTaskState(t *testing.T, appID, taskID, expectedState string) {
fc.waitAndAssertTaskStateAfterTimeInterval(t, appID, taskID, expectedState, 10*time.Second)
}

func (fc *MockScheduler) waitAndAssertTaskStateAfterTimeInterval(t *testing.T, appID, taskID, expectedState string, timeInteval time.Duration) {
app := fc.context.GetApplication(appID)
assert.Equal(t, app != nil, true)
assert.Equal(t, app.GetApplicationID(), appID)

task := app.GetTask(taskID)
deadline := time.Now().Add(10 * time.Second)
deadline := time.Now().Add(timeInteval)
for {
if task.GetTaskState() == expectedState {
break
Expand Down
Loading

0 comments on commit e434137

Please sign in to comment.