Skip to content

Commit

Permalink
Sidecar terminator ignore the exit code of the sidecar container
Browse files Browse the repository at this point in the history
Signed-off-by: liuzhenwei <[email protected]>

add ut

Signed-off-by: liuzhenwei <[email protected]>

add crr event handler ut

Signed-off-by: liuzhenwei <[email protected]>

fix crr status

Signed-off-by: liuzhenwei <[email protected]>

fix, support kubelet and crr controller report pod status

Signed-off-by: liuzhenwei <[email protected]>
  • Loading branch information
diannaowa committed Jun 5, 2023
1 parent 40e62c6 commit cc6de91
Show file tree
Hide file tree
Showing 5 changed files with 382 additions and 65 deletions.
8 changes: 8 additions & 0 deletions pkg/controller/sidecarterminator/kill_container_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,11 @@ func filterUncompletedSidecars(pod *corev1.Pod, sidecars sets.String) sets.Strin
func getCRRName(pod *corev1.Pod) string {
return fmt.Sprintf("sidecar-termination-%v", pod.UID)
}

func getJobName(pod *corev1.Pod) string {
ref := metav1.GetControllerOf(pod)
if ref == nil {
return ""
}
return ref.Name
}
145 changes: 134 additions & 11 deletions pkg/controller/sidecarterminator/sidecar_terminator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,28 @@ import (
"strings"
"time"

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
"github.com/openkruise/kruise/pkg/features"
utilclient "github.com/openkruise/kruise/pkg/util/client"
utilfeature "github.com/openkruise/kruise/pkg/util/feature"
"github.com/openkruise/kruise/pkg/util/ratelimiter"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

"k8s.io/apimachinery/pkg/types"

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
"github.com/openkruise/kruise/pkg/features"
utilclient "github.com/openkruise/kruise/pkg/util/client"
utilfeature "github.com/openkruise/kruise/pkg/util/feature"
"github.com/openkruise/kruise/pkg/util/ratelimiter"
)

func init() {
Expand Down Expand Up @@ -70,6 +76,7 @@ func newReconciler(mgr manager.Manager) reconcile.Reconciler {
Client: cli,
recorder: recorder,
scheme: mgr.GetScheme(),
clock: clock.RealClock{},
}
}

Expand Down Expand Up @@ -99,6 +106,7 @@ type ReconcileSidecarTerminator struct {
client.Client
recorder record.EventRecorder
scheme *runtime.Scheme
clock clock.Clock
}

// Reconcile get the pod whose sidecar containers should be stopped, and stop them.
Expand Down Expand Up @@ -129,8 +137,8 @@ func (r *ReconcileSidecarTerminator) doReconcile(pod *corev1.Pod) (reconcile.Res
return reconcile.Result{}, nil
}

if containersCompleted(pod, getSidecar(pod)) {
klog.V(3).Infof("SidecarTerminator -- all sidecars of pod(%v/%v) have been completed, no need to process", pod.Namespace, pod.Name)
if containersSucceeded(pod, getSidecar(pod)) {
klog.V(3).Infof("SidecarTerminator -- all sidecars of pod(%v/%v) have been succeeded, no need to process", pod.Namespace, pod.Name)
return reconcile.Result{}, nil
}

Expand All @@ -139,7 +147,8 @@ func (r *ReconcileSidecarTerminator) doReconcile(pod *corev1.Pod) (reconcile.Res
return reconcile.Result{}, nil
}

sidecarNeedToExecuteKillContainer, sidecarNeedToExecuteInPlaceUpdate, err := r.groupSidecars(pod)
sidecarNeedToExecuteKillContainer, sidecarNeedToExecuteInPlaceUpdate, sidecarNeedToSyncStatus, err := r.groupSidecars(pod)

if err != nil {
return reconcile.Result{}, err
}
Expand All @@ -152,23 +161,126 @@ func (r *ReconcileSidecarTerminator) doReconcile(pod *corev1.Pod) (reconcile.Res
return reconcile.Result{}, err
}

if sidecarNeedToSyncStatus.Len() > 0 {
if err := r.syncSidecarStatus(pod, sidecarNeedToSyncStatus); err != nil {
return reconcile.Result{}, err
}
}

return reconcile.Result{}, nil
}

func (r *ReconcileSidecarTerminator) groupSidecars(pod *corev1.Pod) (sets.String, sets.String, error) {
func (r *ReconcileSidecarTerminator) syncSidecarStatus(pod *corev1.Pod, sidecars sets.String) error {
//
if deduceWhetherTheJobIsCompletedFromThePod(pod) {
return nil
}

var changed bool
newSidecarStatus := make(map[string]corev1.ContainerStatus)
for i := range pod.Spec.Containers {
status := &pod.Status.ContainerStatuses[i]
if !sidecars.Has(status.Name) {
continue
}
changed = true
// skip sync status of sidecar container if job has completed.
// the real status that reported by kubelet will be store into the state of sidecar container.
// the pod is repeatedly processed by job controller until to job reaches completed phase. because kubelet and the logic bellow will report different status of the container.
if status.State.Terminated != nil && status.State.Terminated.ExitCode != int32(0) {
klog.V(3).Infof("SidecarTerminator -- ignore the non-zero exitCode of the sidecar container %s/%s", pod.Name, status.Name)
newStatus := *status.DeepCopy()
newStatus.Ready = false
newStatus.Started = &newStatus.Ready
newStatus.State = corev1.ContainerState{
Terminated: &corev1.ContainerStateTerminated{
ExitCode: int32(0),
Reason: "Completed",
StartedAt: status.State.Terminated.StartedAt,
FinishedAt: status.State.Terminated.FinishedAt,
ContainerID: status.ContainerID,
},
}
newSidecarStatus[status.Name] = newStatus

} else if status.State.Terminated == nil && status.State.Running != nil {
klog.V(3).Infof("SidecarTerminator -- sync the status of the sidecar container %s/%s,crr has reached the completed phase", pod.Name, status.Name)
newStatus := *status.DeepCopy()
newStatus.Ready = false
newStatus.Started = &newStatus.Ready
newStatus.State = corev1.ContainerState{
Terminated: &corev1.ContainerStateTerminated{
ExitCode: int32(0),
Reason: "Completed",
StartedAt: status.State.Running.StartedAt,
FinishedAt: metav1.NewTime(r.clock.Now()),
ContainerID: status.ContainerID,
},
}
newSidecarStatus[status.Name] = newStatus
}

}
var err error
if changed {
err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
latestPod := &corev1.Pod{}
if err = r.Get(context.TODO(), types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}, latestPod); err != nil {
return err
}
for i := range latestPod.Spec.Containers {
for name, status := range newSidecarStatus {
if latestPod.Status.ContainerStatuses[i].Name == name {
latestPod.Status.ContainerStatuses[i] = status
}
}
}

// we should let job reaches completed phase, if main container has already reached succeeded phase, ignore exitCode of sidecar container.
if getSidecar(latestPod).Len() == len(newSidecarStatus) {
if containersSucceeded(latestPod, getMain(latestPod)) {
latestPod.Status.Phase = corev1.PodSucceeded
for i, condition := range latestPod.Status.Conditions {
if condition.Type == corev1.PodReady || condition.Type == corev1.ContainersReady {
latestPod.Status.Conditions[i].Reason = "PodCompleted"
latestPod.Status.Conditions[i].Status = corev1.ConditionTrue
}
}
} else {
latestPod.Status.Phase = corev1.PodFailed
for i, condition := range latestPod.Status.Conditions {
if condition.Type == corev1.PodReady || condition.Type == corev1.ContainersReady {
latestPod.Status.Conditions[i].Reason = "PodFailed"
latestPod.Status.Conditions[i].Status = corev1.ConditionFalse
}
}
}

}

return r.Status().Update(context.TODO(), latestPod)
})
}

return err
}

func (r *ReconcileSidecarTerminator) groupSidecars(pod *corev1.Pod) (sets.String, sets.String, sets.String, error) {
runningOnVK, err := IsPodRunningOnVirtualKubelet(pod, r.Client)
if err != nil {
return nil, nil, client.IgnoreNotFound(err)
return nil, nil, nil, client.IgnoreNotFound(err)
}

inPlaceUpdate := sets.NewString()
killContainer := sets.NewString()
syncStatusContainer := sets.NewString()
for i := range pod.Spec.Containers {
container := &pod.Spec.Containers[i]
for j := range container.Env {
if !runningOnVK && container.Env[j].Name == appsv1alpha1.KruiseTerminateSidecarEnv &&
strings.EqualFold(container.Env[j].Value, "true") {
killContainer.Insert(container.Name)
syncStatusContainer.Insert(container.Name)
break
}
if container.Env[j].Name == appsv1alpha1.KruiseTerminateSidecarWithImageEnv &&
Expand All @@ -177,7 +289,7 @@ func (r *ReconcileSidecarTerminator) groupSidecars(pod *corev1.Pod) (sets.String
}
}
}
return killContainer, inPlaceUpdate, nil
return killContainer, inPlaceUpdate, syncStatusContainer, nil
}

func containersCompleted(pod *corev1.Pod, containers sets.String) bool {
Expand Down Expand Up @@ -208,3 +320,14 @@ func containersSucceeded(pod *corev1.Pod, containers sets.String) bool {
}
return true
}

func deduceWhetherTheJobIsCompletedFromThePod(pod *corev1.Pod) bool {
mainContainers := getMain(pod)
if containersCompleted(pod, mainContainers) && containersSucceeded(pod, mainContainers) {
return pod.Status.Phase == corev1.PodSucceeded
}
if containersCompleted(pod, mainContainers) && !containersSucceeded(pod, mainContainers) {
return pod.Status.Phase == corev1.PodFailed
}
return false
}
Loading

0 comments on commit cc6de91

Please sign in to comment.