diff --git a/controllers/drain_controller.go b/controllers/drain_controller.go new file mode 100644 index 0000000000..024abaa9bb --- /dev/null +++ b/controllers/drain_controller.go @@ -0,0 +1,132 @@ +package controllers + +import ( + "context" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/workqueue" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" + + sriovnetworkv1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1" + constants "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts" + "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/utils" +) + +const ( + // Note: if a different logger is used than zap (operator-sdk default), these values would probably need to change. + LogLevelError = iota - 2 + LogLevelWarning + LogLevelInfo + LogLevelDebug +) + +type DrainReconciler struct { + client.Client + Scheme *runtime.Scheme +} + +//+kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch;update;patch +//+kubebuilder:rbac:groups=sriovnetwork.openshift.io,resources=sriovoperatorconfigs,verbs=get;list;watch + +// Reconcile is part of the main kubernetes reconciliation loop which aims to +// move the current state of the cluster closer to the desired state. +// TODO(user): Modify the Reconcile function to compare the state specified by +// the SriovIBNetwork object against the actual cluster state, and then +// perform operations to make the cluster state reflect the state specified by +// the user. +// +// For more details, check Reconcile and its Result here: +// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.8.3/pkg/reconcile +func (dr *DrainReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + // The SriovNetwork CR shall only be defined in operator namespace. + req.Namespace = namespace + reqLogger := log.FromContext(ctx).WithValues("drain", req.NamespacedName) + reqLogger.Info("Reconciling Drain") + + config := &sriovnetworkv1.SriovOperatorConfig{} + err := dr.Get(ctx, types.NamespacedName{ + Name: constants.DefaultConfigName, Namespace: namespace}, config) + if err != nil { + reqLogger.V(LogLevelError).Info("Error occurred on GET SriovOperatorConfig request from API server.", "error:", err) + return reconcile.Result{}, err + } + + nodeList := &corev1.NodeList{} + err = dr.List(context.TODO(), nodeList) + if err != nil { + // Failed to get node list + reqLogger.V(LogLevelError).Info("Error occurred on LIST nodes request from API server.", "error:", err) + return reconcile.Result{}, err + } + + //config.Spec.MaxParallelNodeConfiguration + + drainingNodes := 0 + for _, node := range nodeList.Items { + if utils.NodeHasAnnotation(node, "sriovnetwork.openshift.io/state", "Draining") { + drainingNodes++ + } + } + + if drainingNodes == config.Spec.MaxParallelNodeConfiguration { + return reconcile.Result{}, nil + } + + for _, node := range nodeList.Items { + + if utils.NodeHasAnnotation(node, "sriovnetwork.openshift.io/state", "Drain_Required") { + if drainingNodes < config.Spec.MaxParallelNodeConfiguration { + node.Annotations["sriovnetwork.openshift.io/state"] = "Draining" + if err := dr.Update(ctx, &node); err != nil { + return reconcile.Result{}, err + } + drainingNodes++ + } else { + return reconcile.Result{}, nil + } + } + } + + return reconcile.Result{}, nil +} + +// SetupWithManager sets up the controller with the Manager. +func (r *DrainReconciler) SetupWithManager(mgr ctrl.Manager) error { + // we always add object with a same(static) key to the queue to reduce + // reconciliation count + qHandler := func(q workqueue.RateLimitingInterface) { + q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ + Namespace: "drain-reconcile-namespace", + Name: "drain-upgrade-reconcile-name", + }}) + } + + createUpdateEnqueue := handler.Funcs{ + CreateFunc: func(e event.CreateEvent, q workqueue.RateLimitingInterface) { + qHandler(q) + }, + UpdateFunc: func(e event.UpdateEvent, q workqueue.RateLimitingInterface) { + qHandler(q) + }, + } + + // Watch for spec and annotation changes + nodePredicates := builder.WithPredicates(predicate.AnnotationChangedPredicate{}) + + return ctrl.NewControllerManagedBy(mgr). + For(&sriovnetworkv1.SriovOperatorConfig{}). + // TODO(e0ne): set MaxParallelNodeConfiguration to 1 by default + //WithOptions(controller.Options{MaxParallelNodeConfiguration: 1}). + Watches(&source.Kind{Type: &corev1.Node{}}, createUpdateEnqueue, nodePredicates). + Complete(r) +} diff --git a/main.go b/main.go index f14dee49ce..9f82e48b03 100644 --- a/main.go +++ b/main.go @@ -164,6 +164,13 @@ func main() { setupLog.Error(err, "unable to create controller", "controller", "SriovNetworkPoolConfig") os.Exit(1) } + if err = (&controllers.DrainReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "SriovNetworkPoolConfig") + os.Exit(1) + } // +kubebuilder:scaffold:builder // Create a default SriovNetworkNodePolicy diff --git a/pkg/utils/cluster.go b/pkg/utils/cluster.go index 692afe9cd4..425f475dd8 100644 --- a/pkg/utils/cluster.go +++ b/pkg/utils/cluster.go @@ -48,7 +48,7 @@ func IsSingleNodeCluster(c client.Client) (bool, error) { return k8sSingleNodeClusterStatus(c) } -// IsExternalControlPlaneCluster detects control plane location of the cluster. +// IsExternalControlPlaneClustr detects control plane location of the cluster. // On OpenShift, the control plane topology is configured in configv1.Infrastucture struct. // On kubernetes, it is determined by which node the sriov operator is scheduled on. If operator // pod is schedule on worker node, it is considered as external control plane. @@ -111,3 +111,11 @@ func openshiftControlPlaneTopologyStatus(c client.Client) (configv1.TopologyMode } return infra.Status.ControlPlaneTopology, nil } + +func NodeHasAnnotation(node corev1.Node, annoKey string, value string) bool { + // Check if node already contains annotation + if anno, ok := node.Annotations[annoKey]; ok && (anno == value) { + return true + } + return false +}