diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index 727ffe642..e64d6a5f7 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -33,8 +33,6 @@ import ( "k8s.io/client-go/kubernetes" listerv1 "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/leaderelection" - "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/client-go/util/workqueue" "k8s.io/kubectl/pkg/drain" @@ -91,6 +89,7 @@ type Daemon struct { node *corev1.Node + // TODO(e0ne): remove it drainable bool disableDrain bool @@ -108,6 +107,7 @@ const ( annoKey = "sriovnetwork.openshift.io/state" annoIdle = "Idle" annoDraining = "Draining" + annoDrainRequired = "Drain_Required" annoMcpPaused = "Draining_MCP_Paused" syncStatusSucceeded = "Succeeded" syncStatusFailed = "Failed" @@ -496,16 +496,21 @@ func (dn *Daemon) nodeStateSyncHandler() error { return err } } + + if dn.nodeHasAnnotation(annoKey, annoDrainRequired) { + glog.Info("nodeStateSyncHandler(): waiting for drain") + return nil + } + if reqDrain { if !dn.isNodeDraining() { if !dn.disableDrain { - ctx, cancel := context.WithCancel(context.TODO()) - defer cancel() + glog.Infof("nodeStateSyncHandler(): apply 'Drain_Required' label for node") - glog.Infof("nodeStateSyncHandler(): get drain lock for sriov daemon") - done := make(chan bool) - go dn.getDrainLock(ctx, done) - <-done + if err := dn.applyDrainRequired(); err != nil { + return err + } + return nil } if dn.openshiftContext.IsOpenshiftCluster() && !dn.openshiftContext.IsHypershift() { @@ -571,6 +576,7 @@ func (dn *Daemon) nodeStateSyncHandler() error { } func (dn *Daemon) nodeHasAnnotation(annoKey string, value string) bool { + // TODO(e0ne): re-use cluster.NodeHasAnnotation function // Check if node already contains annotation if anno, ok := dn.node.Annotations[annoKey]; ok && (anno == value) { return true @@ -755,55 +761,14 @@ func (dn *Daemon) getNodeMachinePool() error { return fmt.Errorf("getNodeMachinePool(): Failed to find the MCP of the node") } -func (dn *Daemon) getDrainLock(ctx context.Context, done chan bool) { - var err error - - lock := &resourcelock.LeaseLock{ - LeaseMeta: metav1.ObjectMeta{ - Name: "config-daemon-draining-lock", - Namespace: namespace, - }, - Client: dn.kubeClient.CoordinationV1(), - LockConfig: resourcelock.ResourceLockConfig{ - Identity: dn.name, - }, +func (dn *Daemon) applyDrainRequired() error { + glog.V(2).Info("applyDrainRequired(): no other node is draining") + err := dn.annotateNode(dn.name, annoDrainRequired) + if err != nil { + glog.Errorf("applyDrainRequired(): Failed to annotate node: %v", err) + return err } - - // start the leader election - leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ - Lock: lock, - ReleaseOnCancel: true, - LeaseDuration: 5 * time.Second, - RenewDeadline: 3 * time.Second, - RetryPeriod: 1 * time.Second, - Callbacks: leaderelection.LeaderCallbacks{ - OnStartedLeading: func(ctx context.Context) { - glog.V(2).Info("getDrainLock(): started leading") - for { - time.Sleep(3 * time.Second) - if dn.node.Annotations[annoKey] == annoMcpPaused { - // The node in Draining_MCP_Paused state, no other node is draining. Skip drainable checking - done <- true - return - } - if dn.drainable { - glog.V(2).Info("getDrainLock(): no other node is draining") - err = dn.annotateNode(dn.name, annoDraining) - if err != nil { - glog.Errorf("getDrainLock(): Failed to annotate node: %v", err) - continue - } - done <- true - return - } - glog.V(2).Info("getDrainLock(): other node is draining, wait...") - } - }, - OnStoppedLeading: func() { - glog.V(2).Info("getDrainLock(): stopped leading") - }, - }, - }) + return nil } func (dn *Daemon) pauseMCP() error {