Skip to content

Commit

Permalink
Use new logic for drain in config daemon
Browse files Browse the repository at this point in the history
  • Loading branch information
e0ne committed Mar 30, 2023
1 parent c9cf22b commit f679601
Showing 1 changed file with 21 additions and 56 deletions.
77 changes: 21 additions & 56 deletions pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ import (
"k8s.io/client-go/kubernetes"
listerv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubectl/pkg/drain"

Expand Down Expand Up @@ -91,6 +89,7 @@ type Daemon struct {

node *corev1.Node

// TODO(e0ne): remove it
drainable bool

disableDrain bool
Expand All @@ -108,6 +107,7 @@ const (
annoKey = "sriovnetwork.openshift.io/state"
annoIdle = "Idle"
annoDraining = "Draining"
annoDrainRequired = "Drain_Required"
annoMcpPaused = "Draining_MCP_Paused"
syncStatusSucceeded = "Succeeded"
syncStatusFailed = "Failed"
Expand Down Expand Up @@ -496,16 +496,21 @@ func (dn *Daemon) nodeStateSyncHandler() error {
return err
}
}

if dn.nodeHasAnnotation(annoKey, annoDrainRequired) {
glog.Info("nodeStateSyncHandler(): waiting for drain")
return nil
}

if reqDrain {
if !dn.isNodeDraining() {
if !dn.disableDrain {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
glog.Infof("nodeStateSyncHandler(): apply 'Drain_Required' label for node")

glog.Infof("nodeStateSyncHandler(): get drain lock for sriov daemon")
done := make(chan bool)
go dn.getDrainLock(ctx, done)
<-done
if err := dn.applyDrainRequired(); err != nil {
return err
}
return nil
}

if dn.openshiftContext.IsOpenshiftCluster() && !dn.openshiftContext.IsHypershift() {
Expand Down Expand Up @@ -571,6 +576,7 @@ func (dn *Daemon) nodeStateSyncHandler() error {
}

func (dn *Daemon) nodeHasAnnotation(annoKey string, value string) bool {
// TODO(e0ne): re-use cluster.NodeHasAnnotation function
// Check if node already contains annotation
if anno, ok := dn.node.Annotations[annoKey]; ok && (anno == value) {
return true
Expand Down Expand Up @@ -755,55 +761,14 @@ func (dn *Daemon) getNodeMachinePool() error {
return fmt.Errorf("getNodeMachinePool(): Failed to find the MCP of the node")
}

func (dn *Daemon) getDrainLock(ctx context.Context, done chan bool) {
var err error

lock := &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Name: "config-daemon-draining-lock",
Namespace: namespace,
},
Client: dn.kubeClient.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: dn.name,
},
func (dn *Daemon) applyDrainRequired() error {
glog.V(2).Info("applyDrainRequired(): no other node is draining")
err := dn.annotateNode(dn.name, annoDrainRequired)
if err != nil {
glog.Errorf("applyDrainRequired(): Failed to annotate node: %v", err)
return err
}

// start the leader election
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
ReleaseOnCancel: true,
LeaseDuration: 5 * time.Second,
RenewDeadline: 3 * time.Second,
RetryPeriod: 1 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
glog.V(2).Info("getDrainLock(): started leading")
for {
time.Sleep(3 * time.Second)
if dn.node.Annotations[annoKey] == annoMcpPaused {
// The node in Draining_MCP_Paused state, no other node is draining. Skip drainable checking
done <- true
return
}
if dn.drainable {
glog.V(2).Info("getDrainLock(): no other node is draining")
err = dn.annotateNode(dn.name, annoDraining)
if err != nil {
glog.Errorf("getDrainLock(): Failed to annotate node: %v", err)
continue
}
done <- true
return
}
glog.V(2).Info("getDrainLock(): other node is draining, wait...")
}
},
OnStoppedLeading: func() {
glog.V(2).Info("getDrainLock(): stopped leading")
},
},
})
return nil
}

func (dn *Daemon) pauseMCP() error {
Expand Down

0 comments on commit f679601

Please sign in to comment.