From ac239974e739c4750482379051dbdf4bdd296087 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Mon, 30 Oct 2023 10:27:36 -0400 Subject: [PATCH] Bump admiral and adjust to Federator API changes This also required further propagating ctx params to satisfy the contextcheck linter. Signed-off-by: Tom Pantelis --- go.mod | 12 +++---- go.sum | 26 +++++++-------- pkg/agent/controller/agent.go | 31 +++++++++++------- pkg/agent/controller/cleanup.go | 22 ++++++------- pkg/agent/controller/cleanup_test.go | 2 +- pkg/agent/controller/endpoint_slice.go | 32 +++++++++++-------- .../controller/service_endpoint_slices.go | 18 +++++------ pkg/agent/controller/service_export_client.go | 23 +++++++------ pkg/agent/controller/service_import.go | 28 ++++++++-------- .../controller/service_import_aggregator.go | 22 ++++++------- .../controller/service_import_migrator.go | 6 ++-- pkg/agent/main.go | 5 ++- 12 files changed, 122 insertions(+), 105 deletions(-) diff --git a/go.mod b/go.mod index 421669f62..8bcb65b75 100644 --- a/go.mod +++ b/go.mod @@ -8,15 +8,15 @@ require ( github.com/onsi/gomega v1.27.10 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.17.0 - github.com/submariner-io/admiral v0.16.0-m4.0.20231024075740-7ca36d2067a5 + github.com/submariner-io/admiral v0.16.0-m4.0.20231030142117-1885875b0274 github.com/submariner-io/shipyard v0.16.0-m4 github.com/uw-labs/lichen v0.1.7 - k8s.io/api v0.28.1 - k8s.io/apimachinery v0.28.1 - k8s.io/client-go v0.28.1 + k8s.io/api v0.28.3 + k8s.io/apimachinery v0.28.3 + k8s.io/client-go v0.28.3 k8s.io/klog/v2 v2.100.1 k8s.io/utils v0.0.0-20230726121419-3b25d923346b - sigs.k8s.io/controller-runtime v0.16.2 + sigs.k8s.io/controller-runtime v0.16.3 sigs.k8s.io/mcs-api v0.1.0 ) @@ -25,7 +25,7 @@ require ( github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect - github.com/emicklei/go-restful/v3 v3.9.0 // indirect + github.com/emicklei/go-restful/v3 v3.11.0 // indirect github.com/evanphx/json-patch v5.6.0+incompatible // indirect github.com/evanphx/json-patch/v5 v5.6.0 // indirect github.com/go-logr/logr v1.2.4 // indirect diff --git a/go.sum b/go.sum index 334babb78..30950f87a 100644 --- a/go.sum +++ b/go.sum @@ -74,8 +74,8 @@ github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25Kn github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= github.com/emicklei/go-restful v2.9.5+incompatible/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= -github.com/emicklei/go-restful/v3 v3.9.0 h1:XwGDlfxEnQZzuopoqxwSEllNcCOM9DhhFyhFIIGKwxE= -github.com/emicklei/go-restful/v3 v3.9.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= +github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g= +github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/evanphx/json-patch v4.2.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= @@ -405,8 +405,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= -github.com/submariner-io/admiral v0.16.0-m4.0.20231024075740-7ca36d2067a5 h1:r/wA9Suzyfxpt8LMnBhZDjXLzIBpjAwcBt5FLL//cMU= -github.com/submariner-io/admiral v0.16.0-m4.0.20231024075740-7ca36d2067a5/go.mod h1:bfpKC5z/0nOVjflOmGUkKirF3bOv5mZdRp9kOvBulAc= +github.com/submariner-io/admiral v0.16.0-m4.0.20231030142117-1885875b0274 h1:4sEKAoN2yHLbKJ6mYU1zjIEMZmfMChsXwx20iLDX7tA= +github.com/submariner-io/admiral v0.16.0-m4.0.20231030142117-1885875b0274/go.mod h1:RZU6vXegRHk6kbEvPpgJ3ivsHQjweMwLoEKb/s+YY24= github.com/submariner-io/shipyard v0.16.0-m4 h1:UhxS3w3C+c2kVUrJVH4VMjbhkrgTjzo8oPlo/ANbjvI= github.com/submariner-io/shipyard v0.16.0-m4/go.mod h1:4brXpjvD+OL3/hd8+laET47FeoOsQzkQ74aprhEyfhE= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= @@ -619,21 +619,21 @@ honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= k8s.io/api v0.18.2/go.mod h1:SJCWI7OLzhZSvbY7U8zwNl9UA4o1fizoug34OV/2r78= k8s.io/api v0.18.4/go.mod h1:lOIQAKYgai1+vz9J7YcDZwC26Z0zQewYOGWdyIPUUQ4= -k8s.io/api v0.28.1 h1:i+0O8k2NPBCPYaMB+uCkseEbawEt/eFaiRqUx8aB108= -k8s.io/api v0.28.1/go.mod h1:uBYwID+66wiL28Kn2tBjBYQdEU0Xk0z5qF8bIBqk/Dg= +k8s.io/api v0.28.3 h1:Gj1HtbSdB4P08C8rs9AR94MfSGpRhJgsS+GF9V26xMM= +k8s.io/api v0.28.3/go.mod h1:MRCV/jr1dW87/qJnZ57U5Pak65LGmQVkKTzf3AtKFHc= k8s.io/apiextensions-apiserver v0.18.2/go.mod h1:q3faSnRGmYimiocj6cHQ1I3WpLqmDgJFlKL37fC4ZvY= k8s.io/apiextensions-apiserver v0.18.4/go.mod h1:NYeyeYq4SIpFlPxSAB6jHPIdvu3hL0pc36wuRChybio= -k8s.io/apiextensions-apiserver v0.28.0 h1:CszgmBL8CizEnj4sj7/PtLGey6Na3YgWyGCPONv7E9E= +k8s.io/apiextensions-apiserver v0.28.3 h1:Od7DEnhXHnHPZG+W9I97/fSQkVpVPQx2diy+2EtmY08= k8s.io/apimachinery v0.18.2/go.mod h1:9SnR/e11v5IbyPCGbvJViimtJ0SwHG4nfZFjU77ftcA= k8s.io/apimachinery v0.18.4/go.mod h1:OaXp26zu/5J7p0f92ASynJa1pZo06YlV9fG7BoWbCko= -k8s.io/apimachinery v0.28.1 h1:EJD40og3GizBSV3mkIoXQBsws32okPOy+MkRyzh6nPY= -k8s.io/apimachinery v0.28.1/go.mod h1:X0xh/chESs2hP9koe+SdIAcXWcQ+RM5hy0ZynB+yEvw= +k8s.io/apimachinery v0.28.3 h1:B1wYx8txOaCQG0HmYF6nbpU8dg6HvA06x5tEffvOe7A= +k8s.io/apimachinery v0.28.3/go.mod h1:uQTKmIqs+rAYaq+DFaoD2X7pcjLOqbQX2AOiO0nIpb8= k8s.io/apiserver v0.18.2/go.mod h1:Xbh066NqrZO8cbsoenCwyDJ1OSi8Ag8I2lezeHxzwzw= k8s.io/apiserver v0.18.4/go.mod h1:q+zoFct5ABNnYkGIaGQ3bcbUNdmPyOCoEBcg51LChY8= k8s.io/client-go v0.18.2/go.mod h1:Xcm5wVGXX9HAA2JJ2sSBUn3tCJ+4SVlCbl2MNNv+CIU= k8s.io/client-go v0.18.4/go.mod h1:f5sXwL4yAZRkAtzOxRWUhA/N8XzGCb+nPZI8PfobZ9g= -k8s.io/client-go v0.28.1 h1:pRhMzB8HyLfVwpngWKE8hDcXRqifh1ga2Z/PU9SXVK8= -k8s.io/client-go v0.28.1/go.mod h1:pEZA3FqOsVkCc07pFVzK076R+P/eXqsgx5zuuRWukNE= +k8s.io/client-go v0.28.3 h1:2OqNb72ZuTZPKCl+4gTKvqao0AMOl9f3o2ijbAj3LI4= +k8s.io/client-go v0.28.3/go.mod h1:LTykbBp9gsA7SwqirlCXBWtK0guzfhpoW4qSm7i9dxo= k8s.io/code-generator v0.18.2/go.mod h1:+UHX5rSbxmR8kzS+FAv7um6dtYrZokQvjHpDSYRVkTc= k8s.io/code-generator v0.18.4/go.mod h1:TgNEVx9hCyPGpdtCWA34olQYLkh3ok9ar7XfSsr8b6c= k8s.io/component-base v0.18.2/go.mod h1:kqLlMuhJNHQ9lz8Z7V5bxUUtjFZnrypArGl58gmDfUM= @@ -656,8 +656,8 @@ k8s.io/utils v0.0.0-20230726121419-3b25d923346b h1:sgn3ZU783SCgtaSJjpcVVlRqd6GSn k8s.io/utils v0.0.0-20230726121419-3b25d923346b/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.7/go.mod h1:PHgbrJT7lCHcxMU+mDHEm+nx46H4zuuHZkDP6icnhu0= sigs.k8s.io/controller-runtime v0.6.1/go.mod h1:XRYBPdbf5XJu9kpS84VJiZ7h/u1hF3gEORz0efEja7A= -sigs.k8s.io/controller-runtime v0.16.2 h1:mwXAVuEk3EQf478PQwQ48zGOXvW27UJc8NHktQVuIPU= -sigs.k8s.io/controller-runtime v0.16.2/go.mod h1:vpMu3LpI5sYWtujJOa2uPK61nB5rbwlN7BAB8aSLvGU= +sigs.k8s.io/controller-runtime v0.16.3 h1:2TuvuokmfXvDUamSx1SuAOO3eTyye+47mJCigwG62c4= +sigs.k8s.io/controller-runtime v0.16.3/go.mod h1:j7bialYoSn142nv9sCOJmQgDXQXxnroFU4VnX/brVJ0= sigs.k8s.io/controller-tools v0.3.0/go.mod h1:enhtKGfxZD1GFEoMgP8Fdbu+uKQ/cq1/WGJhdVChfvI= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= diff --git a/pkg/agent/controller/agent.go b/pkg/agent/controller/agent.go index ade0a91a8..bc3a625ed 100644 --- a/pkg/agent/controller/agent.go +++ b/pkg/agent/controller/agent.go @@ -19,6 +19,7 @@ limitations under the License. package controller import ( + "context" "encoding/json" "fmt" "reflect" @@ -179,6 +180,8 @@ func (a *Controller) Start(stopCh <-chan struct{}) error { func (a *Controller) serviceExportToServiceImport(obj runtime.Object, _ int, op syncer.Operation) (runtime.Object, bool) { svcExport := obj.(*mcsv1a1.ServiceExport) + ctx := context.Background() + logger.V(log.DEBUG).Infof("ServiceExport %s/%s %sd", svcExport.Namespace, svcExport.Name, op) if op == syncer.Delete { @@ -188,8 +191,9 @@ func (a *Controller) serviceExportToServiceImport(obj runtime.Object, _ int, op obj, found, err := a.serviceSyncer.GetResource(svcExport.Name, svcExport.Namespace) if err != nil { // some other error. Log and requeue - a.serviceExportClient.updateStatusConditions(svcExport.Name, svcExport.Namespace, newServiceExportCondition(mcsv1a1.ServiceExportValid, - corev1.ConditionUnknown, "ServiceRetrievalFailed", fmt.Sprintf("Error retrieving the Service: %v", err))) + a.serviceExportClient.updateStatusConditions(ctx, svcExport.Name, svcExport.Namespace, + newServiceExportCondition(mcsv1a1.ServiceExportValid, corev1.ConditionUnknown, "ServiceRetrievalFailed", + fmt.Sprintf("Error retrieving the Service: %v", err))) logger.Errorf(err, "Error retrieving Service %s/%s", svcExport.Namespace, svcExport.Name) return nil, true @@ -197,8 +201,9 @@ func (a *Controller) serviceExportToServiceImport(obj runtime.Object, _ int, op if !found { logger.V(log.DEBUG).Infof("Service to be exported (%s/%s) doesn't exist", svcExport.Namespace, svcExport.Name) - a.serviceExportClient.updateStatusConditions(svcExport.Name, svcExport.Namespace, newServiceExportCondition(mcsv1a1.ServiceExportValid, - corev1.ConditionFalse, serviceUnavailable, "Service to be exported doesn't exist")) + a.serviceExportClient.updateStatusConditions(ctx, svcExport.Name, svcExport.Namespace, + newServiceExportCondition(mcsv1a1.ServiceExportValid, corev1.ConditionFalse, serviceUnavailable, + "Service to be exported doesn't exist")) return nil, false } @@ -208,11 +213,12 @@ func (a *Controller) serviceExportToServiceImport(obj runtime.Object, _ int, op svcType, ok := getServiceImportType(svc) if !ok { - a.serviceExportClient.updateStatusConditions(svcExport.Name, svcExport.Namespace, newServiceExportCondition(mcsv1a1.ServiceExportValid, - corev1.ConditionFalse, invalidServiceType, fmt.Sprintf("Service of type %v not supported", svc.Spec.Type))) + a.serviceExportClient.updateStatusConditions(ctx, svcExport.Name, svcExport.Namespace, + newServiceExportCondition(mcsv1a1.ServiceExportValid, corev1.ConditionFalse, invalidServiceType, + fmt.Sprintf("Service of type %v not supported", svc.Spec.Type))) logger.Errorf(nil, "Service type %q not supported for Service (%s/%s)", svc.Spec.Type, svcExport.Namespace, svcExport.Name) - err = a.localServiceImportFederator.Delete(a.newServiceImport(svcExport.Name, svcExport.Namespace)) + err = a.localServiceImportFederator.Delete(ctx, a.newServiceImport(svcExport.Name, svcExport.Namespace)) if err == nil || apierrors.IsNotFound(err) { return nil, false } @@ -246,7 +252,7 @@ func (a *Controller) serviceExportToServiceImport(obj runtime.Object, _ int, op logger.V(log.DEBUG).Infof("Service to be exported (%s/%s) doesn't have a global IP yet", svcExport.Namespace, svcExport.Name) // Globalnet enabled but service doesn't have globalIp yet, Update the status and requeue - a.serviceExportClient.updateStatusConditions(svcExport.Name, svcExport.Namespace, + a.serviceExportClient.updateStatusConditions(ctx, svcExport.Name, svcExport.Namespace, newServiceExportCondition(mcsv1a1.ServiceExportValid, corev1.ConditionFalse, reason, msg)) return nil, true @@ -260,8 +266,8 @@ func (a *Controller) serviceExportToServiceImport(obj runtime.Object, _ int, op serviceImport.Spec.Ports = a.getPortsForService(svc) } - a.serviceExportClient.updateStatusConditions(svcExport.Name, svcExport.Namespace, newServiceExportCondition(mcsv1a1.ServiceExportValid, - corev1.ConditionTrue, "", "")) + a.serviceExportClient.updateStatusConditions(ctx, svcExport.Name, svcExport.Namespace, + newServiceExportCondition(mcsv1a1.ServiceExportValid, corev1.ConditionTrue, "", "")) logger.V(log.DEBUG).Infof("Returning ServiceImport %s/%s: %s", svcExport.Namespace, svcExport.Name, serviceImportStringer{serviceImport}) @@ -329,8 +335,9 @@ func (a *Controller) serviceToRemoteServiceImport(obj runtime.Object, _ int, op serviceImport := a.newServiceImport(svc.Name, svc.Namespace) // Update the status and requeue - a.serviceExportClient.updateStatusConditions(svc.Name, svc.Namespace, newServiceExportCondition(mcsv1a1.ServiceExportValid, - corev1.ConditionFalse, serviceUnavailable, "Service to be exported doesn't exist")) + a.serviceExportClient.updateStatusConditions(context.Background(), svc.Name, svc.Namespace, + newServiceExportCondition(mcsv1a1.ServiceExportValid, corev1.ConditionFalse, serviceUnavailable, + "Service to be exported doesn't exist")) return serviceImport, false } diff --git a/pkg/agent/controller/cleanup.go b/pkg/agent/controller/cleanup.go index 6ece41726..af82d7977 100644 --- a/pkg/agent/controller/cleanup.go +++ b/pkg/agent/controller/cleanup.go @@ -48,12 +48,12 @@ var ( } ) -func (a *Controller) Cleanup() error { +func (a *Controller) Cleanup(ctx context.Context) error { // Delete all ServiceImports from the local cluster skipping those in the broker namespace if the broker is on the // local cluster. siClient := a.serviceImportController.localClient.Resource(serviceImportGVR) - list, err := listResources(siClient, metav1.NamespaceAll, + list, err := listResources(ctx, siClient, metav1.NamespaceAll, &metav1.ListOptions{ FieldSelector: fields.OneTermNotEqualSelector("metadata.namespace", a.serviceImportController.serviceImportAggregator.brokerNamespace).String(), @@ -65,13 +65,13 @@ func (a *Controller) Cleanup() error { for i := range list { _, ok := list[i].GetLabels()[mcsv1a1.LabelServiceName] if ok { - err = a.serviceImportController.Delete(&list[i]) + err = a.serviceImportController.Delete(ctx, &list[i]) if err != nil && !apierrors.IsNotFound(err) { return err } } - err = siClient.Namespace(list[i].GetNamespace()).Delete(context.TODO(), list[i].GetName(), metav1.DeleteOptions{}) + err = siClient.Namespace(list[i].GetNamespace()).Delete(ctx, list[i].GetName(), metav1.DeleteOptions{}) if err != nil && !apierrors.IsNotFound(err) { return err //nolint:wrapcheck // Let the caller wrap @@ -80,7 +80,7 @@ func (a *Controller) Cleanup() error { // Delete all EndpointSlices from the local cluster skipping those in the broker namespace if the broker is on the // local cluster. - err = deleteResources(a.endpointSliceController.syncer.GetLocalClient().Resource(endpointSliceGVR), metav1.NamespaceAll, + err = deleteResources(ctx, a.endpointSliceController.syncer.GetLocalClient().Resource(endpointSliceGVR), metav1.NamespaceAll, &metav1.ListOptions{ FieldSelector: fields.OneTermNotEqualSelector("metadata.namespace", a.serviceImportController.serviceImportAggregator.brokerNamespace).String(), @@ -91,7 +91,7 @@ func (a *Controller) Cleanup() error { } // Delete all local EndpointSlices from the broker. - err = deleteResources(a.endpointSliceController.syncer.GetBrokerClient().Resource(endpointSliceGVR), + err = deleteResources(ctx, a.endpointSliceController.syncer.GetBrokerClient().Resource(endpointSliceGVR), a.endpointSliceController.syncer.GetBrokerNamespace(), &metav1.ListOptions{ LabelSelector: labels.Set(map[string]string{constants.MCSLabelSourceCluster: a.clusterID}).String(), @@ -100,14 +100,14 @@ func (a *Controller) Cleanup() error { return errors.Wrap(err, "error deleting remote EndpointSlices") } -func deleteResources(client dynamic.NamespaceableResourceInterface, ns string, options *metav1.ListOptions) error { - list, err := listResources(client, ns, options) +func deleteResources(ctx context.Context, client dynamic.NamespaceableResourceInterface, ns string, options *metav1.ListOptions) error { + list, err := listResources(ctx, client, ns, options) if err != nil { return err } for i := range list { - err = client.Namespace(list[i].GetNamespace()).Delete(context.TODO(), list[i].GetName(), metav1.DeleteOptions{}) + err = client.Namespace(list[i].GetNamespace()).Delete(ctx, list[i].GetName(), metav1.DeleteOptions{}) if err != nil && !apierrors.IsNotFound(err) { return err //nolint:wrapcheck // Let the caller wrap } @@ -116,10 +116,10 @@ func deleteResources(client dynamic.NamespaceableResourceInterface, ns string, o return nil } -func listResources(client dynamic.NamespaceableResourceInterface, ns string, +func listResources(ctx context.Context, client dynamic.NamespaceableResourceInterface, ns string, options *metav1.ListOptions, ) ([]unstructured.Unstructured, error) { - list, err := client.Namespace(ns).List(context.TODO(), *options) + list, err := client.Namespace(ns).List(ctx, *options) if err != nil && !apierrors.IsNotFound(err) { return nil, err //nolint:wrapcheck // Let the caller wrap } diff --git a/pkg/agent/controller/cleanup_test.go b/pkg/agent/controller/cleanup_test.go index bf993cd37..8d4c4b4c8 100644 --- a/pkg/agent/controller/cleanup_test.go +++ b/pkg/agent/controller/cleanup_test.go @@ -228,7 +228,7 @@ var _ = Describe("Cleanup", func() { }) It("should correctly remove local and remote ServiceImports and EndpointSlices", func() { - Expect(t.cluster1.agentController.Cleanup()).To(Succeed()) + Expect(t.cluster1.agentController.Cleanup(context.Background())).To(Succeed()) test.AwaitNoResource(t.cluster1.localServiceImportClient.Namespace(test.LocalNamespace), localServiceImport1.Name) test.AwaitNoResource(t.cluster1.localServiceImportClient.Namespace(serviceNamespace), localAggregatedServiceImport1.Name) diff --git a/pkg/agent/controller/endpoint_slice.go b/pkg/agent/controller/endpoint_slice.go index 1494915ed..0499eaeee 100644 --- a/pkg/agent/controller/endpoint_slice.go +++ b/pkg/agent/controller/endpoint_slice.go @@ -65,7 +65,7 @@ func newEndpointSliceController(spec *AgentSpecification, syncerConfig broker.Sy BrokerResourceType: &discovery.EndpointSlice{}, TransformBrokerToLocal: c.onRemoteEndpointSlice, OnSuccessfulSyncFromBroker: func(obj runtime.Object, op syncer.Operation) bool { - c.enqueueForConflictCheck(obj.(*discovery.EndpointSlice), op) + c.enqueueForConflictCheck(context.TODO(), obj.(*discovery.EndpointSlice), op) return false }, BrokerResyncPeriod: BrokerResyncPeriod, @@ -102,12 +102,13 @@ func (c *EndpointSliceController) start(stopCh <-chan struct{}) error { func (c *EndpointSliceController) onLocalEndpointSlice(obj runtime.Object, _ int, op syncer.Operation) (runtime.Object, bool) { endpointSlice := obj.(*discovery.EndpointSlice) + ctx := context.TODO() if op != syncer.Delete && isLegacyEndpointSlice(endpointSlice) { logger.Infof("Found legacy EndpointSlice %s/%s - deleting it", endpointSlice.Namespace, endpointSlice.Name) - err := c.syncer.GetLocalFederator().Delete(endpointSlice) + err := c.syncer.GetLocalFederator().Delete(ctx, endpointSlice) if err != nil { logger.Errorf(err, "Error deleting legacy EndpointSlice %s/%s", endpointSlice.Namespace, endpointSlice.Name) } @@ -135,6 +136,7 @@ func (c *EndpointSliceController) onRemoteEndpointSlice(obj runtime.Object, _ in func (c *EndpointSliceController) onLocalEndpointSliceSynced(obj runtime.Object, op syncer.Operation) bool { endpointSlice := obj.(*discovery.EndpointSlice) + ctx := context.TODO() serviceName := endpointSlice.Labels[mcsv1a1.LabelServiceName] serviceNamespace := endpointSlice.Labels[constants.LabelSourceNamespace] @@ -153,18 +155,20 @@ func (c *EndpointSliceController) onLocalEndpointSliceSynced(obj runtime.Object, if op == syncer.Delete { if c.hasNoRemainingEndpointSlices(endpointSlice) { - err = c.serviceImportAggregator.updateOnDelete(serviceName, serviceNamespace) + err = c.serviceImportAggregator.updateOnDelete(ctx, serviceName, serviceNamespace) } } else { - err = c.serviceImportAggregator.updateOnCreateOrUpdate(serviceName, serviceNamespace) + err = c.serviceImportAggregator.updateOnCreateOrUpdate(ctx, serviceName, serviceNamespace) if err != nil { - c.serviceExportClient.updateStatusConditions(serviceName, serviceNamespace, newServiceExportCondition(constants.ServiceExportReady, - corev1.ConditionFalse, exportFailedReason, fmt.Sprintf("Unable to export: %v", err))) + c.serviceExportClient.updateStatusConditions(ctx, serviceName, serviceNamespace, + newServiceExportCondition(constants.ServiceExportReady, corev1.ConditionFalse, exportFailedReason, + fmt.Sprintf("Unable to export: %v", err))) } else { - c.serviceExportClient.updateStatusConditions(serviceName, serviceNamespace, newServiceExportCondition(constants.ServiceExportReady, - corev1.ConditionTrue, "", "Service was successfully exported to the broker")) + c.serviceExportClient.updateStatusConditions(ctx, serviceName, serviceNamespace, + newServiceExportCondition(constants.ServiceExportReady, corev1.ConditionTrue, "", + "Service was successfully exported to the broker")) - c.enqueueForConflictCheck(endpointSlice, op) + c.enqueueForConflictCheck(ctx, endpointSlice, op) } } @@ -201,6 +205,8 @@ func (c *EndpointSliceController) hasNoRemainingEndpointSlices(endpointSlice *di } func (c *EndpointSliceController) checkForConflicts(_, name, namespace string) (bool, error) { + ctx := context.TODO() + localServiceExport := c.serviceExportClient.getLocalInstance(name, namespace) if localServiceExport == nil { return false, nil @@ -230,26 +236,26 @@ func (c *EndpointSliceController) checkForConflicts(_, name, namespace string) ( } if conflict { - c.serviceExportClient.updateStatusConditions(name, namespace, newServiceExportCondition( + c.serviceExportClient.updateStatusConditions(ctx, name, namespace, newServiceExportCondition( mcsv1a1.ServiceExportConflict, corev1.ConditionTrue, portConflictReason, fmt.Sprintf("The service ports conflict between the constituent clusters %s. "+ "The service will expose the intersection of all the ports: %s", fmt.Sprintf("[%s]", strings.Join(clusterNames, ", ")), servicePortsToString(intersectedServicePorts)))) } else if FindServiceExportStatusCondition(localServiceExport.Status.Conditions, mcsv1a1.ServiceExportConflict) != nil { - c.serviceExportClient.removeStatusCondition(name, namespace, mcsv1a1.ServiceExportConflict, portConflictReason) + c.serviceExportClient.removeStatusCondition(ctx, name, namespace, mcsv1a1.ServiceExportConflict, portConflictReason) } return false, nil } -func (c *EndpointSliceController) enqueueForConflictCheck(eps *discovery.EndpointSlice, op syncer.Operation) { +func (c *EndpointSliceController) enqueueForConflictCheck(ctx context.Context, eps *discovery.EndpointSlice, op syncer.Operation) { if eps.Labels[constants.LabelIsHeadless] != "false" { return } // Since the conflict checking works off of the local cache for efficiency, wait a little bit here for the local cache to be updated // with the latest state of the EndpointSlice. - _ = wait.PollUntilContextTimeout(context.Background(), 10*time.Millisecond, 100*time.Millisecond, true, + _ = wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 100*time.Millisecond, true, func(_ context.Context) (bool, error) { _, found, _ := c.syncer.GetLocalResource(eps.Name, eps.Namespace, eps) return (op == syncer.Delete && !found) || (op != syncer.Delete && found), nil diff --git a/pkg/agent/controller/service_endpoint_slices.go b/pkg/agent/controller/service_endpoint_slices.go index 9c7d8d5d3..cc2eb138e 100644 --- a/pkg/agent/controller/service_endpoint_slices.go +++ b/pkg/agent/controller/service_endpoint_slices.go @@ -99,7 +99,7 @@ func (c *ServiceEndpointSliceController) stop() { }) } -func (c *ServiceEndpointSliceController) cleanup() (bool, error) { +func (c *ServiceEndpointSliceController) cleanup(ctx context.Context) (bool, error) { listOptions := metav1.ListOptions{ LabelSelector: k8slabels.SelectorFromSet(map[string]string{ discovery.LabelManagedBy: constants.LabelValueManagedBy, @@ -109,7 +109,7 @@ func (c *ServiceEndpointSliceController) cleanup() (bool, error) { }).String(), } - list, err := c.localClient.List(context.Background(), listOptions) + list, err := c.localClient.List(ctx, listOptions) if err != nil { return false, errors.Wrapf(err, "error listing the EndpointSlices associated with service %s/%s", c.serviceNamespace, c.serviceName) @@ -119,7 +119,7 @@ func (c *ServiceEndpointSliceController) cleanup() (bool, error) { return false, nil } - err = c.localClient.DeleteCollection(context.Background(), metav1.DeleteOptions{}, listOptions) + err = c.localClient.DeleteCollection(ctx, metav1.DeleteOptions{}, listOptions) if err != nil && !apierrors.IsNotFound(err) { return false, errors.Wrapf(err, "error deleting the EndpointSlices associated with service %s/%s", @@ -155,7 +155,7 @@ func (c *ServiceEndpointSliceController) onServiceEndpointSlice(obj runtime.Obje } if op == syncer.Delete { - list, err := c.localClient.List(context.Background(), metav1.ListOptions{ + list, err := c.localClient.List(context.TODO(), metav1.ListOptions{ LabelSelector: k8slabels.SelectorFromSet(returnEPS.Labels).String(), }) if err != nil { @@ -320,18 +320,18 @@ func (c *ServiceEndpointSliceController) isHeadless() bool { return c.serviceImportSpec.Type == mcsv1a1.Headless } -func (c *ServiceEndpointSliceController) Distribute(obj runtime.Object) error { - return c.federator.Distribute(obj) //nolint:wrapcheck // No need to wrap here +func (c *ServiceEndpointSliceController) Distribute(ctx context.Context, obj runtime.Object) error { + return c.federator.Distribute(ctx, obj) //nolint:wrapcheck // No need to wrap here } -func (c *ServiceEndpointSliceController) Delete(obj runtime.Object) error { +func (c *ServiceEndpointSliceController) Delete(ctx context.Context, obj runtime.Object) error { if c.isHeadless() { - return c.federator.Delete(obj) //nolint:wrapcheck // No need to wrap here + return c.federator.Delete(ctx, obj) //nolint:wrapcheck // No need to wrap here } // For a non-headless service, we never delete the single exported EPS - we update its endpoint condition based on // the backend service EPS's as they are created/updated/deleted. - return c.Distribute(obj) + return c.Distribute(ctx, obj) } type endpointSliceStringer struct { diff --git a/pkg/agent/controller/service_export_client.go b/pkg/agent/controller/service_export_client.go index cf92132b5..3743f7b38 100644 --- a/pkg/agent/controller/service_export_client.go +++ b/pkg/agent/controller/service_export_client.go @@ -32,8 +32,10 @@ import ( mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" ) -func (c *ServiceExportClient) removeStatusCondition(name, namespace string, condType mcsv1a1.ServiceExportConditionType, reason string) { - c.doUpdate(name, namespace, func(toUpdate *mcsv1a1.ServiceExport) bool { +func (c *ServiceExportClient) removeStatusCondition(ctx context.Context, name, namespace string, + condType mcsv1a1.ServiceExportConditionType, reason string, +) { + c.doUpdate(ctx, name, namespace, func(toUpdate *mcsv1a1.ServiceExport) bool { condition := FindServiceExportStatusCondition(toUpdate.Status.Conditions, condType) if condition != nil && reflect.DeepEqual(condition.Reason, &reason) { logger.V(log.DEBUG).Infof("Removing status condition (Type: %q, Reason: %q) from ServiceExport (%s/%s)", @@ -51,11 +53,13 @@ func (c *ServiceExportClient) removeStatusCondition(name, namespace string, cond }) } -func (c *ServiceExportClient) updateStatusConditions(name, namespace string, conditions ...mcsv1a1.ServiceExportCondition) { - c.tryUpdateStatusConditions(name, namespace, true, conditions...) +func (c *ServiceExportClient) updateStatusConditions(ctx context.Context, name, namespace string, + conditions ...mcsv1a1.ServiceExportCondition, +) { + c.tryUpdateStatusConditions(ctx, name, namespace, true, conditions...) } -func (c *ServiceExportClient) tryUpdateStatusConditions(name, namespace string, canReplace bool, +func (c *ServiceExportClient) tryUpdateStatusConditions(ctx context.Context, name, namespace string, canReplace bool, conditions ...mcsv1a1.ServiceExportCondition, ) { findStatusCondition := func(conditions []mcsv1a1.ServiceExportCondition, condType mcsv1a1.ServiceExportConditionType, @@ -71,7 +75,7 @@ func (c *ServiceExportClient) tryUpdateStatusConditions(name, namespace string, return cond } - c.doUpdate(name, namespace, func(toUpdate *mcsv1a1.ServiceExport) bool { + c.doUpdate(ctx, name, namespace, func(toUpdate *mcsv1a1.ServiceExport) bool { updated := false for i := range conditions { @@ -100,9 +104,9 @@ func (c *ServiceExportClient) tryUpdateStatusConditions(name, namespace string, }) } -func (c *ServiceExportClient) doUpdate(name, namespace string, update func(toUpdate *mcsv1a1.ServiceExport) bool) { +func (c *ServiceExportClient) doUpdate(ctx context.Context, name, namespace string, update func(toUpdate *mcsv1a1.ServiceExport) bool) { err := retry.RetryOnConflict(retry.DefaultRetry, func() error { - obj, err := c.Namespace(namespace).Get(context.Background(), name, metav1.GetOptions{}) + obj, err := c.Namespace(namespace).Get(ctx, name, metav1.GetOptions{}) if apierrors.IsNotFound(err) { logger.V(log.TRACE).Infof("ServiceExport (%s/%s) not found - unable to update status", namespace, name) return nil @@ -117,8 +121,7 @@ func (c *ServiceExportClient) doUpdate(name, namespace string, update func(toUpd return nil } - _, err = c.Namespace(toUpdate.Namespace).UpdateStatus(context.TODO(), - c.toUnstructured(toUpdate), metav1.UpdateOptions{}) + _, err = c.Namespace(toUpdate.Namespace).UpdateStatus(ctx, c.toUnstructured(toUpdate), metav1.UpdateOptions{}) return errors.Wrap(err, "error from UpdateStatus") }) diff --git a/pkg/agent/controller/service_import.go b/pkg/agent/controller/service_import.go index 78269c551..d40bac3e0 100644 --- a/pkg/agent/controller/service_import.go +++ b/pkg/agent/controller/service_import.go @@ -232,12 +232,12 @@ func (c *ServiceImportController) startEndpointsController(serviceImport *mcsv1a return nil } -func (c *ServiceImportController) stopEndpointsController(key string) (bool, error) { +func (c *ServiceImportController) stopEndpointsController(ctx context.Context, key string) (bool, error) { if obj, found := c.endpointControllers.Load(key); found { endpointController := obj.(*ServiceEndpointSliceController) endpointController.stop() - found, err := endpointController.cleanup() + found, err := endpointController.cleanup(ctx) if err == nil { c.endpointControllers.Delete(key) } @@ -251,6 +251,7 @@ func (c *ServiceImportController) stopEndpointsController(key string) (bool, err func (c *ServiceImportController) onLocalServiceImport(obj runtime.Object, _ int, op syncer.Operation) (runtime.Object, bool) { serviceImport := obj.(*mcsv1a1.ServiceImport) key, _ := cache.MetaNamespaceKeyFunc(serviceImport) + ctx := context.TODO() serviceName := serviceImportSourceName(serviceImport) @@ -262,13 +263,13 @@ func (c *ServiceImportController) onLocalServiceImport(obj runtime.Object, _ int logger.V(log.DEBUG).Infof("Local ServiceImport %q %sd", key, op) if op == syncer.Delete { - c.serviceExportClient.updateStatusConditions(serviceName, serviceImport.Labels[constants.LabelSourceNamespace], + c.serviceExportClient.updateStatusConditions(ctx, serviceName, serviceImport.Labels[constants.LabelSourceNamespace], newServiceExportCondition(constants.ServiceExportReady, corev1.ConditionFalse, "NoServiceImport", "ServiceImport was deleted")) return obj, false } else if op == syncer.Create { - c.serviceExportClient.tryUpdateStatusConditions(serviceName, serviceImport.Labels[constants.LabelSourceNamespace], + c.serviceExportClient.tryUpdateStatusConditions(ctx, serviceName, serviceImport.Labels[constants.LabelSourceNamespace], false, newServiceExportCondition(constants.ServiceExportReady, corev1.ConditionFalse, "AwaitingExport", fmt.Sprintf("ServiceImport %sd - awaiting aggregation on the broker", op))) } @@ -276,7 +277,7 @@ func (c *ServiceImportController) onLocalServiceImport(obj runtime.Object, _ int return obj, false } -func (c *ServiceImportController) Distribute(obj runtime.Object) error { +func (c *ServiceImportController) Distribute(ctx context.Context, obj runtime.Object) error { localServiceImport := c.converter.toServiceImport(obj) key, _ := cache.MetaNamespaceKeyFunc(localServiceImport) @@ -306,7 +307,7 @@ func (c *ServiceImportController) Distribute(obj runtime.Object) error { // is determined from the constituent clusters' EndpointSlices, thus each cluster must have a consistent view of all // the EndpointSlices in order for the aggregated port information to be eventually consistent. - result, err := util.CreateOrUpdate[runtime.Object](context.Background(), + result, err := util.CreateOrUpdate[runtime.Object](ctx, resource.ForDynamic(c.serviceImportAggregator.brokerServiceImportClient()), c.converter.toUnstructured(aggregate), func(obj runtime.Object) (runtime.Object, error) { @@ -319,11 +320,12 @@ func (c *ServiceImportController) Distribute(obj runtime.Object) error { fmt.Sprintf("The service type %q does not match the type (%q) of the existing service export", localServiceImport.Spec.Type, existing.Spec.Type)) - c.serviceExportClient.updateStatusConditions(serviceName, serviceNamespace, conflictCondition, + c.serviceExportClient.updateStatusConditions(ctx, serviceName, serviceNamespace, conflictCondition, newServiceExportCondition(constants.ServiceExportReady, corev1.ConditionFalse, exportFailedReason, "Unable to export due to an irresolvable conflict")) } else { - c.serviceExportClient.removeStatusCondition(serviceName, serviceNamespace, mcsv1a1.ServiceExportConflict, typeConflictReason) + c.serviceExportClient.removeStatusCondition(ctx, serviceName, serviceNamespace, mcsv1a1.ServiceExportConflict, + typeConflictReason) } return obj, nil @@ -333,7 +335,7 @@ func (c *ServiceImportController) Distribute(obj runtime.Object) error { } if err != nil { - c.serviceExportClient.updateStatusConditions(serviceName, serviceNamespace, + c.serviceExportClient.updateStatusConditions(ctx, serviceName, serviceNamespace, newServiceExportCondition(constants.ServiceExportReady, corev1.ConditionFalse, exportFailedReason, fmt.Sprintf("Unable to export: %v", err))) } @@ -345,7 +347,7 @@ func (c *ServiceImportController) Distribute(obj runtime.Object) error { return err } -func (c *ServiceImportController) Delete(obj runtime.Object) error { +func (c *ServiceImportController) Delete(ctx context.Context, obj runtime.Object) error { localServiceImport := c.converter.toServiceImport(obj) key, _ := cache.MetaNamespaceKeyFunc(localServiceImport) @@ -356,13 +358,13 @@ func (c *ServiceImportController) Delete(obj runtime.Object) error { // was never started or if there are no local EndpointSlices, which can happen during reconciliation on startup or // during clean up on uninstall, then we handle removal here. - found, err := c.stopEndpointsController(key) + found, err := c.stopEndpointsController(ctx, key) if err != nil { return err } if !found { - err = c.serviceImportAggregator.updateOnDelete(serviceImportSourceName(localServiceImport), + err = c.serviceImportAggregator.updateOnDelete(ctx, serviceImportSourceName(localServiceImport), localServiceImport.Labels[constants.LabelSourceNamespace]) } @@ -370,7 +372,7 @@ func (c *ServiceImportController) Delete(obj runtime.Object) error { return err } - return c.serviceImportMigrator.onLocalServiceImportDeleted(localServiceImport) + return c.serviceImportMigrator.onLocalServiceImportDeleted(ctx, localServiceImport) } func (c *ServiceImportController) onRemoteServiceImport(obj runtime.Object, _ int, _ syncer.Operation) (runtime.Object, bool) { diff --git a/pkg/agent/controller/service_import_aggregator.go b/pkg/agent/controller/service_import_aggregator.go index 5b085b8e7..dd2b6381b 100644 --- a/pkg/agent/controller/service_import_aggregator.go +++ b/pkg/agent/controller/service_import_aggregator.go @@ -48,8 +48,8 @@ func newServiceImportAggregator(brokerClient dynamic.Interface, brokerNamespace, } } -func (a *ServiceImportAggregator) updateOnCreateOrUpdate(name, namespace string) error { - return a.update(name, namespace, func(existing *mcsv1a1.ServiceImport) error { +func (a *ServiceImportAggregator) updateOnCreateOrUpdate(ctx context.Context, name, namespace string) error { + return a.update(ctx, name, namespace, func(existing *mcsv1a1.ServiceImport) error { var added bool existing.Status.Clusters, added = slices.AppendIfNotPresent(existing.Status.Clusters, @@ -60,11 +60,11 @@ func (a *ServiceImportAggregator) updateOnCreateOrUpdate(name, namespace string) a.clusterID, existing.Name, existing.Status.Clusters) } - return a.setServicePorts(existing) + return a.setServicePorts(ctx, existing) }) } -func (a *ServiceImportAggregator) setServicePorts(si *mcsv1a1.ServiceImport) error { +func (a *ServiceImportAggregator) setServicePorts(ctx context.Context, si *mcsv1a1.ServiceImport) error { // We don't set the port info for headless services. if si.Spec.Type != mcsv1a1.ClusterSetIP { return nil @@ -73,7 +73,7 @@ func (a *ServiceImportAggregator) setServicePorts(si *mcsv1a1.ServiceImport) err serviceName := si.Annotations[mcsv1a1.LabelServiceName] serviceNamespace := si.Annotations[constants.LabelSourceNamespace] - list, err := a.brokerClient.Resource(endpointSliceGVR).Namespace(a.brokerNamespace).List(context.Background(), metav1.ListOptions{ + list, err := a.brokerClient.Resource(endpointSliceGVR).Namespace(a.brokerNamespace).List(ctx, metav1.ListOptions{ LabelSelector: labels.SelectorFromSet(map[string]string{ discovery.LabelManagedBy: constants.LabelValueManagedBy, constants.LabelSourceNamespace: serviceNamespace, @@ -97,8 +97,8 @@ func (a *ServiceImportAggregator) setServicePorts(si *mcsv1a1.ServiceImport) err return nil } -func (a *ServiceImportAggregator) updateOnDelete(name, namespace string) error { - return a.update(name, namespace, func(existing *mcsv1a1.ServiceImport) error { +func (a *ServiceImportAggregator) updateOnDelete(ctx context.Context, name, namespace string) error { + return a.update(ctx, name, namespace, func(existing *mcsv1a1.ServiceImport) error { var removed bool existing.Status.Clusters, removed = slices.Remove(existing.Status.Clusters, mcsv1a1.ClusterStatus{Cluster: a.clusterID}, @@ -110,18 +110,18 @@ func (a *ServiceImportAggregator) updateOnDelete(name, namespace string) error { logger.V(log.DEBUG).Infof("Removed cluster name %q from aggregated ServiceImport %q. New status: %#v", a.clusterID, existing.Name, existing.Status.Clusters) - return a.setServicePorts(existing) + return a.setServicePorts(ctx, existing) }) } -func (a *ServiceImportAggregator) update(name, namespace string, mutate func(*mcsv1a1.ServiceImport) error) error { +func (a *ServiceImportAggregator) update(ctx context.Context, name, namespace string, mutate func(*mcsv1a1.ServiceImport) error) error { aggregate := &mcsv1a1.ServiceImport{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%s-%s", name, namespace), }, } - return util.Update[runtime.Object](context.Background(), resource.ForDynamic(a.brokerServiceImportClient()), + return util.Update[runtime.Object](ctx, resource.ForDynamic(a.brokerServiceImportClient()), a.converter.toUnstructured(aggregate), func(obj runtime.Object) (runtime.Object, error) { existing := a.converter.toServiceImport(obj) @@ -134,7 +134,7 @@ func (a *ServiceImportAggregator) update(name, namespace string, mutate func(*mc if len(existing.Status.Clusters) == 0 { logger.V(log.DEBUG).Infof("Deleting aggregated ServiceImport %q", existing.Name) - err := a.brokerServiceImportClient().Delete(context.Background(), existing.Name, metav1.DeleteOptions{ + err := a.brokerServiceImportClient().Delete(ctx, existing.Name, metav1.DeleteOptions{ Preconditions: &metav1.Preconditions{ ResourceVersion: ptr.To(existing.ResourceVersion), }, diff --git a/pkg/agent/controller/service_import_migrator.go b/pkg/agent/controller/service_import_migrator.go index bbd903e7e..f75509794 100644 --- a/pkg/agent/controller/service_import_migrator.go +++ b/pkg/agent/controller/service_import_migrator.go @@ -110,7 +110,7 @@ func (c *ServiceImportMigrator) onSuccessfulSyncFromBroker(obj runtime.Object, o logger.Infof("All remote clusters have been upgraded for service \"%s/%s\" - removing local ServiceImport %q from the broker", aggregatedServiceImport.Namespace, aggregatedServiceImport.Name, localServiceImportName) - err := c.brokerClient.Delete(context.Background(), localServiceImportName, metav1.DeleteOptions{}) + err := c.brokerClient.Delete(context.TODO(), localServiceImportName, metav1.DeleteOptions{}) if err != nil && !apierrors.IsNotFound(err) { logger.Error(err, "error deleting legacy ServiceImport from the broker") return true @@ -122,14 +122,14 @@ func (c *ServiceImportMigrator) onSuccessfulSyncFromBroker(obj runtime.Object, o return false } -func (c *ServiceImportMigrator) onLocalServiceImportDeleted(serviceImport *mcsv1a1.ServiceImport) error { +func (c *ServiceImportMigrator) onLocalServiceImportDeleted(ctx context.Context, serviceImport *mcsv1a1.ServiceImport) error { if serviceImport.Labels[LegacySourceClusterLabel] != c.clusterID { return nil } logger.Infof("Legacy local ServiceImport %q deleted - removing from the broker", serviceImport.Name) - err := c.brokerClient.Delete(context.Background(), serviceImport.Name, metav1.DeleteOptions{}) + err := c.brokerClient.Delete(ctx, serviceImport.Name, metav1.DeleteOptions{}) if apierrors.IsNotFound(err) { err = nil } diff --git a/pkg/agent/main.go b/pkg/agent/main.go index b44aac4c9..94296296d 100644 --- a/pkg/agent/main.go +++ b/pkg/agent/main.go @@ -19,7 +19,6 @@ limitations under the License. package main import ( - "context" "errors" "flag" "fmt" @@ -139,7 +138,7 @@ func main() { if agentSpec.Uninstall { logger.Info("Uninstalling lighthouse") - err := lightHouseAgent.Cleanup() + err := lightHouseAgent.Cleanup(ctx) exitOnError(err, "Error cleaning up the lighthouse agent controller") return @@ -154,7 +153,7 @@ func main() { logger.Info("All controllers stopped or exited. Stopping main loop") - if err := httpServer.Shutdown(context.TODO()); err != nil { + if err := httpServer.Shutdown(ctx); err != nil { logger.Error(err, "Error shutting down metrics HTTP server") } }