Skip to content

Commit

Permalink
Clean up duplicated and obsolete logic
Browse files Browse the repository at this point in the history
  • Loading branch information
e0ne committed Jun 20, 2023
1 parent d3ceae5 commit d525d52
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 42 deletions.
6 changes: 3 additions & 3 deletions controllers/drain_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (dr *DrainReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr

drainingNodes := 0
for _, node := range nodeList.Items {
if utils.NodeHasAnnotation(node, constants.NodeDrainAnnotation, "Draining") || utils.NodeHasAnnotation(node, constants.NodeDrainAnnotation, "Draining_MCP_Paused") {
if utils.NodeHasAnnotation(node, constants.NodeDrainAnnotation, constants.AnnoDraining) || utils.NodeHasAnnotation(node, constants.NodeDrainAnnotation, constants.AnnoMcpPaused) {
drainingNodes++
}
}
Expand All @@ -78,10 +78,10 @@ func (dr *DrainReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
}

for _, node := range nodeList.Items {
if utils.NodeHasAnnotation(node, constants.NodeDrainAnnotation, "Drain_Required") {
if utils.NodeHasAnnotation(node, constants.NodeDrainAnnotation, constants.AnnoDrainRequired) {
if config.Spec.MaxParallelNodeConfiguration == 0 || drainingNodes < config.Spec.MaxParallelNodeConfiguration {
reqLogger.Info("Start draining node", "node", node.Name)
patch := []byte(fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}}}`, constants.NodeDrainAnnotation, "Draining"))
patch := []byte(fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}}}`, constants.NodeDrainAnnotation, constants.AnnoDraining))
err = dr.Client.Patch(context.TODO(), &node, client.RawPatch(types.StrategicMergePatchType, patch))
if err != nil {
return reconcile.Result{}, err
Expand Down
6 changes: 6 additions & 0 deletions pkg/consts/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ const (
DPConfigFileName = "config.json"
OVSHWOLMachineConfigNameSuffix = "ovs-hw-offload"

NodeDrainAnnotation = "sriovnetwork.openshift.io/state"
AnnoIdle = "Idle"
AnnoDrainRequired = "Drain_Required"
AnnoMcpPaused = "Draining_MCP_Paused"
AnnoDraining = "Draining"

LinkTypeEthernet = "ether"
LinkTypeInfiniband = "infiniband"

Expand Down
49 changes: 10 additions & 39 deletions pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/strategicpatch"
Expand Down Expand Up @@ -89,9 +88,6 @@ type Daemon struct {

node *corev1.Node

// TODO(e0ne): remove it
drainable bool

disableDrain bool

nodeLister listerv1.NodeLister
Expand All @@ -104,11 +100,6 @@ type Daemon struct {
const (
rdmaScriptsPath = "/bindata/scripts/enable-rdma.sh"
udevScriptsPath = "/bindata/scripts/load-udev.sh"
annoKey = "sriovnetwork.openshift.io/state"
annoIdle = "Idle"
annoDrainRequired = "Drain_Required"
annoMcpPaused = "Draining_MCP_Paused"
annoDraining = "Draining"
syncStatusSucceeded = "Succeeded"
syncStatusFailed = "Failed"
syncStatusInProgress = "InProgress"
Expand Down Expand Up @@ -274,7 +265,7 @@ func (dn *Daemon) Run(stopCh <-chan struct{}, exitCh <-chan error) error {
}

glog.Info("Starting workers")
// Launch one workers to process
// Launch one worker to process
go wait.Until(dn.runWorker, time.Second, stopCh)
glog.Info("Started workers")

Expand Down Expand Up @@ -377,17 +368,6 @@ func (dn *Daemon) nodeUpdateHandler(old, new interface{}) {
return
}
dn.node = node.DeepCopy()
nodes, err := dn.nodeLister.List(labels.Everything())
if err != nil {
return
}
for _, node := range nodes {
if node.GetName() != dn.name && (node.Annotations[annoKey] == annoDraining || node.Annotations[annoKey] == annoMcpPaused) {
dn.drainable = false
return
}
}
dn.drainable = true
}

func (dn *Daemon) operatorConfigAddHandler(obj interface{}) {
Expand Down Expand Up @@ -497,7 +477,7 @@ func (dn *Daemon) nodeStateSyncHandler() error {
}
}

if dn.nodeHasAnnotation(annoKey, annoDrainRequired) {
if utils.NodeHasAnnotation(*dn.node, consts.NodeDrainAnnotation, consts.AnnoDrainRequired) {
glog.Info("nodeStateSyncHandler(): waiting for drain")
return nil
}
Expand Down Expand Up @@ -561,8 +541,8 @@ func (dn *Daemon) nodeStateSyncHandler() error {
return err
}
} else {
if !dn.nodeHasAnnotation(annoKey, annoIdle) {
if err := dn.annotateNode(dn.name, annoIdle); err != nil {
if !utils.NodeHasAnnotation(*dn.node, consts.NodeDrainAnnotation, consts.AnnoIdle) {
if err := dn.annotateNode(dn.name, consts.AnnoIdle); err != nil {
glog.Errorf("nodeStateSyncHandler(): failed to annotate node: %v", err)
return err
}
Expand All @@ -579,15 +559,6 @@ func (dn *Daemon) nodeStateSyncHandler() error {
return nil
}

func (dn *Daemon) nodeHasAnnotation(annoKey string, value string) bool {
// TODO(e0ne): re-use cluster.NodeHasAnnotation function
// Check if node already contains annotation
if anno, ok := dn.node.Annotations[annoKey]; ok && (anno == value) {
return true
}
return false
}

// isNodeDraining: check if the node is draining
// both Draining and MCP paused labels will return true
func (dn *Daemon) isNodeDraining() bool {
Expand All @@ -596,7 +567,7 @@ func (dn *Daemon) isNodeDraining() bool {
return false
}

return anno == annoDraining || anno == annoMcpPaused
return anno == consts.AnnoDraining || anno == consts.AnnoMcpPaused
}

func (dn *Daemon) completeDrain() error {
Expand All @@ -615,7 +586,7 @@ func (dn *Daemon) completeDrain() error {
}
}

if err := dn.annotateNode(dn.name, annoIdle); err != nil {
if err := dn.annotateNode(dn.name, consts.AnnoIdle); err != nil {
glog.Errorf("completeDrain(): failed to annotate node: %v", err)
return err
}
Expand Down Expand Up @@ -764,7 +735,7 @@ func (dn *Daemon) getNodeMachinePool() error {

func (dn *Daemon) applyDrainRequired() error {
glog.V(2).Info("applyDrainRequired(): no other node is draining")
err := dn.annotateNode(dn.name, annoDrainRequired)
err := dn.annotateNode(dn.name, consts.AnnoDrainRequired)
if err != nil {
glog.Errorf("applyDrainRequired(): Failed to annotate node: %v", err)
return err
Expand All @@ -783,7 +754,7 @@ func (dn *Daemon) pauseMCP() error {

ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
paused := dn.node.Annotations[annoKey] == annoMcpPaused
paused := dn.node.Annotations[consts.NodeDrainAnnotation] == consts.AnnoMcpPaused

mcpEventHandler := func(obj interface{}) {
mcp := obj.(*mcfgv1.MachineConfigPool)
Expand Down Expand Up @@ -816,7 +787,7 @@ func (dn *Daemon) pauseMCP() error {
glog.V(2).Infof("pauseMCP(): Failed to pause MCP %s: %v", dn.mcpName, err)
return
}
err = dn.annotateNode(dn.name, annoMcpPaused)
err = dn.annotateNode(dn.name, consts.AnnoMcpPaused)
if err != nil {
glog.V(2).Infof("pauseMCP(): Failed to annotate node: %v", err)
return
Expand All @@ -832,7 +803,7 @@ func (dn *Daemon) pauseMCP() error {
glog.V(2).Infof("pauseMCP(): fail to resume MCP %s: %v", dn.mcpName, err)
return
}
err = dn.annotateNode(dn.name, annoDraining)
err = dn.annotateNode(dn.name, consts.AnnoDraining)
if err != nil {
glog.V(2).Infof("pauseMCP(): Failed to annotate node: %v", err)
return
Expand Down

0 comments on commit d525d52

Please sign in to comment.