Skip to content

Commit

Permalink
Implement Drain controller
Browse files Browse the repository at this point in the history
  • Loading branch information
e0ne committed Mar 30, 2023
1 parent 84a9dd3 commit c9cf22b
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 1 deletion.
132 changes: 132 additions & 0 deletions controllers/drain_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package controllers

import (
"context"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

sriovnetworkv1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1"
constants "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts"
"github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/utils"
)

const (
// Note: if a different logger is used than zap (operator-sdk default), these values would probably need to change.
LogLevelError = iota - 2
LogLevelWarning
LogLevelInfo
LogLevelDebug
)

type DrainReconciler struct {
client.Client
Scheme *runtime.Scheme
}

//+kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch;update;patch
//+kubebuilder:rbac:groups=sriovnetwork.openshift.io,resources=sriovoperatorconfigs,verbs=get;list;watch

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
// the SriovIBNetwork object against the actual cluster state, and then
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
func (dr *DrainReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// The SriovNetwork CR shall only be defined in operator namespace.
req.Namespace = namespace
reqLogger := log.FromContext(ctx).WithValues("drain", req.NamespacedName)
reqLogger.Info("Reconciling Drain")

config := &sriovnetworkv1.SriovOperatorConfig{}
err := dr.Get(ctx, types.NamespacedName{
Name: constants.DefaultConfigName, Namespace: namespace}, config)
if err != nil {
reqLogger.V(LogLevelError).Info("Error occurred on GET SriovOperatorConfig request from API server.", "error:", err)
return reconcile.Result{}, err
}

nodeList := &corev1.NodeList{}
err = dr.List(context.TODO(), nodeList)
if err != nil {
// Failed to get node list
reqLogger.V(LogLevelError).Info("Error occurred on LIST nodes request from API server.", "error:", err)
return reconcile.Result{}, err
}

//config.Spec.MaxParallelNodeConfiguration

drainingNodes := 0
for _, node := range nodeList.Items {
if utils.NodeHasAnnotation(node, "sriovnetwork.openshift.io/state", "Draining") {
drainingNodes++
}
}

if drainingNodes == config.Spec.MaxParallelNodeConfiguration {
return reconcile.Result{}, nil
}

for _, node := range nodeList.Items {

if utils.NodeHasAnnotation(node, "sriovnetwork.openshift.io/state", "Drain_Required") {
if drainingNodes < config.Spec.MaxParallelNodeConfiguration {
node.Annotations["sriovnetwork.openshift.io/state"] = "Draining"
if err := dr.Update(ctx, &node); err != nil {
return reconcile.Result{}, err
}
drainingNodes++
} else {
return reconcile.Result{}, nil
}
}
}

return reconcile.Result{}, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *DrainReconciler) SetupWithManager(mgr ctrl.Manager) error {
// we always add object with a same(static) key to the queue to reduce
// reconciliation count
qHandler := func(q workqueue.RateLimitingInterface) {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Namespace: "drain-reconcile-namespace",
Name: "drain-upgrade-reconcile-name",
}})
}

createUpdateEnqueue := handler.Funcs{
CreateFunc: func(e event.CreateEvent, q workqueue.RateLimitingInterface) {
qHandler(q)
},
UpdateFunc: func(e event.UpdateEvent, q workqueue.RateLimitingInterface) {
qHandler(q)
},
}

// Watch for spec and annotation changes
nodePredicates := builder.WithPredicates(predicate.AnnotationChangedPredicate{})

return ctrl.NewControllerManagedBy(mgr).
For(&sriovnetworkv1.SriovOperatorConfig{}).
// TODO(e0ne): set MaxParallelNodeConfiguration to 1 by default
//WithOptions(controller.Options{MaxParallelNodeConfiguration: 1}).
Watches(&source.Kind{Type: &corev1.Node{}}, createUpdateEnqueue, nodePredicates).
Complete(r)
}
7 changes: 7 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,13 @@ func main() {
setupLog.Error(err, "unable to create controller", "controller", "SriovNetworkPoolConfig")
os.Exit(1)
}
if err = (&controllers.DrainReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "SriovNetworkPoolConfig")
os.Exit(1)
}
// +kubebuilder:scaffold:builder

// Create a default SriovNetworkNodePolicy
Expand Down
10 changes: 9 additions & 1 deletion pkg/utils/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func IsSingleNodeCluster(c client.Client) (bool, error) {
return k8sSingleNodeClusterStatus(c)
}

// IsExternalControlPlaneCluster detects control plane location of the cluster.
// IsExternalControlPlaneClustr detects control plane location of the cluster.
// On OpenShift, the control plane topology is configured in configv1.Infrastucture struct.
// On kubernetes, it is determined by which node the sriov operator is scheduled on. If operator
// pod is schedule on worker node, it is considered as external control plane.
Expand Down Expand Up @@ -111,3 +111,11 @@ func openshiftControlPlaneTopologyStatus(c client.Client) (configv1.TopologyMode
}
return infra.Status.ControlPlaneTopology, nil
}

func NodeHasAnnotation(node corev1.Node, annoKey string, value string) bool {
// Check if node already contains annotation
if anno, ok := node.Annotations[annoKey]; ok && (anno == value) {
return true
}
return false
}

0 comments on commit c9cf22b

Please sign in to comment.