Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel SR-IOV configuration #427

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions api/v1/sriovoperatorconfig_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ type SriovOperatorConfigSpec struct {
// Default mode: daemon
// +kubebuilder:validation:Enum=daemon;systemd
ConfigurationMode ConfigurationModeType `json:"configurationMode,omitempty"`
// How many nodes can be configured in parallel
// 0 means no limit, all nodes will be configured in parallel
// +kubebuilder:default:=1
// +kubebuilder:validation:Minimum:=0
MaxParallelNodeConfiguration int `json:"maxParallelNodeConfiguration,omitempty"`
}

// SriovOperatorConfigStatus defines the observed state of SriovOperatorConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ spec:
maximum: 2
minimum: 0
type: integer
maxParallelNodeConfiguration:
default: 1
description: How many nodes can be configured in parallel 0 means
no limit, all nodes will be configured in parallel
minimum: 0
type: integer
type: object
status:
description: SriovOperatorConfigStatus defines the observed state of SriovOperatorConfig
Expand Down
129 changes: 129 additions & 0 deletions controllers/drain_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package controllers

import (
"context"
"fmt"
"sort"
"strings"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

sriovnetworkv1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1"
constants "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts"
"github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/utils"
)

type DrainReconciler struct {
client.Client
Scheme *runtime.Scheme
}

//+kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch;update;patch
//+kubebuilder:rbac:groups=sriovnetwork.openshift.io,resources=sriovoperatorconfigs,verbs=get;list;watch

// Reconcile is part of the main kubernetes reconciliation loop which aims to
adrianchiris marked this conversation as resolved.
Show resolved Hide resolved
// move the current state of the cluster closer to the desired state.
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
func (dr *DrainReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
req.Namespace = namespace
reqLogger := log.FromContext(ctx).WithValues("drain", req.NamespacedName)
adrianchiris marked this conversation as resolved.
Show resolved Hide resolved
reqLogger.Info("Reconciling Drain")

config := &sriovnetworkv1.SriovOperatorConfig{}
err := dr.Get(ctx, types.NamespacedName{
Name: constants.DefaultConfigName, Namespace: namespace}, config)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will fail when you enqueue a reconcile request with "drain-reconcile-namespace" namespace.

perhaps you need to identify this type of request first, or enqueue with sriov operator namespace so namespace field in request will be correct.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if err != nil {
reqLogger.Error(err, "Error occurred on GET SriovOperatorConfig request from API server.")
return reconcile.Result{}, err
}

nodeList := &corev1.NodeList{}
err = dr.List(ctx, nodeList)
if err != nil {
// Failed to get node list
reqLogger.Error(err, "Error occurred on LIST nodes request from API server")
return reconcile.Result{}, err
}

// sort nodeList to iterate in the same order each reconcile loop
sort.Slice(nodeList.Items, func(i, j int) bool {
return strings.Compare(nodeList.Items[i].Name, nodeList.Items[j].Name) == -1
})

reqLogger.Info("Max node allowed to be draining at the same time", "MaxParallelNodeConfiguration", config.Spec.MaxParallelNodeConfiguration)

drainingNodes := 0
for _, node := range nodeList.Items {
if utils.NodeHasAnnotation(node, constants.NodeDrainAnnotation, constants.AnnoDraining) || utils.NodeHasAnnotation(node, constants.NodeDrainAnnotation, constants.AnnoMcpPaused) {
drainingNodes++
}
}

adrianchiris marked this conversation as resolved.
Show resolved Hide resolved
reqLogger.Info("Count of draining", "drainingNodes", drainingNodes)
if config.Spec.MaxParallelNodeConfiguration != 0 && drainingNodes >= config.Spec.MaxParallelNodeConfiguration {
reqLogger.Info("MaxParallelNodeConfiguration limit reached for draining nodes")
return reconcile.Result{}, nil
e0ne marked this conversation as resolved.
Show resolved Hide resolved
}

for _, node := range nodeList.Items {
if !utils.NodeHasAnnotation(node, constants.NodeDrainAnnotation, constants.AnnoDrainRequired) {
continue
}
if config.Spec.MaxParallelNodeConfiguration == 0 || drainingNodes < config.Spec.MaxParallelNodeConfiguration {
reqLogger.Info("Start draining node", "node", node.Name)
patch := []byte(fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}}}`, constants.NodeDrainAnnotation, constants.AnnoDraining))
err = dr.Client.Patch(context.TODO(), &node, client.RawPatch(types.StrategicMergePatchType, patch))
if err != nil {
reqLogger.Error(err, "Failed to patch node annotations")
return reconcile.Result{}, err
}
drainingNodes++
} else {
reqLogger.Info("Too many nodes to be draining at the moment. Skipping node %s", "node", node.Name)
return reconcile.Result{}, nil
}
}

return reconcile.Result{}, nil
}

// SetupWithManager sets up the controller with the Manager.
func (dr *DrainReconciler) SetupWithManager(mgr ctrl.Manager) error {
// we always add object with a same(static) key to the queue to reduce
// reconciliation count
qHandler := func(q workqueue.RateLimitingInterface) {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Namespace: namespace,
Name: "drain-upgrade-reconcile-name",
}})
}

createUpdateEnqueue := handler.Funcs{
CreateFunc: func(e event.CreateEvent, q workqueue.RateLimitingInterface) {
qHandler(q)
},
UpdateFunc: func(e event.UpdateEvent, q workqueue.RateLimitingInterface) {
qHandler(q)
},
}

// Watch for spec and annotation changes
nodePredicates := builder.WithPredicates(DrainAnnotationPredicate{})

return ctrl.NewControllerManagedBy(mgr).
For(&sriovnetworkv1.SriovOperatorConfig{}).
Watches(&source.Kind{Type: &corev1.Node{}}, createUpdateEnqueue, nodePredicates).
Complete(dr)
}
120 changes: 120 additions & 0 deletions controllers/drain_controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package controllers

import (
goctx "context"
"time"

v1 "k8s.io/api/core/v1"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

sriovnetworkv1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1"
consts "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts"
"github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/utils"
"github.com/k8snetworkplumbingwg/sriov-network-operator/test/util"
)

func createNodeObj(name, anno string) *v1.Node {
node := &v1.Node{}
node.Name = name
node.Annotations = map[string]string{}
node.Annotations[consts.NodeDrainAnnotation] = anno

return node
}

func createNode(node *v1.Node) {
Expect(k8sClient.Create(goctx.TODO(), node)).Should(Succeed())
}

var _ = Describe("Drain Controller", func() {

BeforeEach(func() {
node1 := createNodeObj("node1", "Drain_Required")
node2 := createNodeObj("node2", "Drain_Required")
createNode(node1)
createNode(node2)
})
AfterEach(func() {
node1 := createNodeObj("node1", "Drain_Required")
node2 := createNodeObj("node2", "Drain_Required")
err := k8sClient.Delete(goctx.TODO(), node1)
Expect(err).NotTo(HaveOccurred())
err = k8sClient.Delete(goctx.TODO(), node2)
Expect(err).NotTo(HaveOccurred())
})

Context("Parallel nodes draining", func() {

It("Should drain one node", func() {
config := &sriovnetworkv1.SriovOperatorConfig{}
err := util.WaitForNamespacedObject(config, k8sClient, testNamespace, "default", interval, timeout)
Expect(err).NotTo(HaveOccurred())
config.Spec = sriovnetworkv1.SriovOperatorConfigSpec{
EnableInjector: func() *bool { b := true; return &b }(),
EnableOperatorWebhook: func() *bool { b := true; return &b }(),
MaxParallelNodeConfiguration: 1,
}
updateErr := k8sClient.Update(goctx.TODO(), config)
Expect(updateErr).NotTo(HaveOccurred())
time.Sleep(3 * time.Second)

nodeList := &v1.NodeList{}
listErr := k8sClient.List(ctx, nodeList)
Expect(listErr).NotTo(HaveOccurred())

drainingNodes := 0
for _, node := range nodeList.Items {
if utils.NodeHasAnnotation(node, "sriovnetwork.openshift.io/state", "Draining") {
drainingNodes++
}
}
Expect(drainingNodes).To(Equal(1))
})

It("Should drain two nodes", func() {
config := &sriovnetworkv1.SriovOperatorConfig{}
err := util.WaitForNamespacedObject(config, k8sClient, testNamespace, "default", interval, timeout)
Expect(err).NotTo(HaveOccurred())
config.Spec = sriovnetworkv1.SriovOperatorConfigSpec{
EnableInjector: func() *bool { b := true; return &b }(),
EnableOperatorWebhook: func() *bool { b := true; return &b }(),
MaxParallelNodeConfiguration: 2,
}
updateErr := k8sClient.Update(goctx.TODO(), config)
Expect(updateErr).NotTo(HaveOccurred())
time.Sleep(3 * time.Second)

nodeList := &v1.NodeList{}
listErr := k8sClient.List(ctx, nodeList)
Expect(listErr).NotTo(HaveOccurred())

for _, node := range nodeList.Items {
Expect(utils.NodeHasAnnotation(node, "sriovnetwork.openshift.io/state", "Draining")).To(BeTrue())
}
})

It("Should drain all nodes", func() {
config := &sriovnetworkv1.SriovOperatorConfig{}
err := util.WaitForNamespacedObject(config, k8sClient, testNamespace, "default", interval, timeout)
Expect(err).NotTo(HaveOccurred())
config.Spec = sriovnetworkv1.SriovOperatorConfigSpec{
EnableInjector: func() *bool { b := true; return &b }(),
EnableOperatorWebhook: func() *bool { b := true; return &b }(),
MaxParallelNodeConfiguration: 0,
}
updateErr := k8sClient.Update(goctx.TODO(), config)
Expect(updateErr).NotTo(HaveOccurred())
time.Sleep(3 * time.Second)

nodeList := &v1.NodeList{}
listErr := k8sClient.List(ctx, nodeList)
Expect(listErr).NotTo(HaveOccurred())

for _, node := range nodeList.Items {
Expect(utils.NodeHasAnnotation(node, "sriovnetwork.openshift.io/state", "Draining")).To(BeTrue())
}
})
})
})
47 changes: 47 additions & 0 deletions controllers/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@ package controllers

import (
"bytes"
"context"
"encoding/json"
"os"
"strings"

"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"

constants "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts"
)

Expand All @@ -40,6 +45,48 @@ const (

var namespace = os.Getenv("NAMESPACE")

type DrainAnnotationPredicate struct {
predicate.Funcs
}

func (DrainAnnotationPredicate) Create(e event.CreateEvent) bool {
logger := log.FromContext(context.TODO())
if e.Object == nil {
logger.Error(nil, "Create event has no object for create", "event", e)
return false
}

if _, hasAnno := e.Object.GetAnnotations()[constants.NodeDrainAnnotation]; hasAnno {
logger.Error(nil, "Create event: node has no drain annotation", "event", e)
return true
}
return false
}

func (DrainAnnotationPredicate) Update(e event.UpdateEvent) bool {
logger := log.FromContext(context.TODO())
if e.ObjectOld == nil {
logger.Error(nil, "Update event has no old object to update", "event", e)
return false
}
if e.ObjectNew == nil {
logger.Error(nil, "Update event has no new object for update", "event", e)
return false
}

oldAnno, hasOldAnno := e.ObjectOld.GetAnnotations()[constants.NodeDrainAnnotation]
newAnno, hasNewAnno := e.ObjectNew.GetAnnotations()[constants.NodeDrainAnnotation]

if !hasOldAnno || !hasNewAnno {
logger.Error(nil, "Update event: can not compare annotations", "old", hasOldAnno)
logger.Error(nil, "Update event: can not compare annotations", "new", hasNewAnno)
logger.Error(nil, "Update event: can not compare annotations", "event", e)
return false
}

return oldAnno == newAnno
}

func GetImagePullSecrets() []string {
imagePullSecrets := os.Getenv("IMAGE_PULL_SECRETS")
if imagePullSecrets != "" {
Expand Down
6 changes: 6 additions & 0 deletions controllers/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@ var _ = BeforeSuite(func(done Done) {
}).SetupWithManager(k8sManager)
Expect(err).ToNot(HaveOccurred())

err = (&DrainReconciler{
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
}).SetupWithManager(k8sManager)
Expect(err).ToNot(HaveOccurred())

os.Setenv("RESOURCE_PREFIX", "openshift.io")
os.Setenv("NAMESPACE", "openshift-sriov-network-operator")
os.Setenv("ENABLE_ADMISSION_CONTROLLER", "true")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ spec:
maximum: 2
minimum: 0
type: integer
maxParallelNodeConfiguration:
default: 1
description: How many nodes can be configured in parallel 0 means
no limit, all nodes will be configured in parallel
minimum: 0
type: integer
type: object
status:
description: SriovOperatorConfigStatus defines the observed state of SriovOperatorConfig
Expand Down
7 changes: 7 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,13 @@ func main() {
setupLog.Error(err, "unable to create controller", "controller", "SriovNetworkPoolConfig")
os.Exit(1)
}
if err = (&controllers.DrainReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "DrainReconciler")
os.Exit(1)
}
// +kubebuilder:scaffold:builder

// Create a default SriovNetworkNodePolicy
Expand Down
6 changes: 6 additions & 0 deletions pkg/consts/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ const (
DPConfigFileName = "config.json"
OVSHWOLMachineConfigNameSuffix = "ovs-hw-offload"

NodeDrainAnnotation = "sriovnetwork.openshift.io/state"
AnnoIdle = "Idle"
AnnoDrainRequired = "Drain_Required"
AnnoMcpPaused = "Draining_MCP_Paused"
AnnoDraining = "Draining"

LinkTypeEthernet = "ether"
LinkTypeInfiniband = "infiniband"

Expand Down
Loading
Loading