From cf517f899060cf823b330a0232ce3bad6bfc02cc Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Wed, 4 Sep 2024 10:07:27 -0400 Subject: [PATCH] Implement clusterset VIP for ServiceImports in LH agent As per https://github.com/submariner-io/enhancements/blob/devel/lighthouse/mcs-compliance.md Highlights: - The clusterset IP enabled flag and clusterset IP CIDR are passed vi env vars SUBMARINER_CLUSTERSET_IP_ENABLED and SUBMARINER_CLUSTERSET_IP_CIDR. - If no CIDR is passed, then clusterset IP functionality is disabled - Clusterset IP allocation is determined by the Serviceexport annotation "lighthouse.submariner.io/use-clusterset-ip" or, if not set, the global enablement flag. - Clusterset allocation is done only on creation of the aggregated ServiceImport. The "lighthouse.submariner.io/clusterset-ip-allocated-by" annotation is added by the allocating cluster. - The "lighthouse.submariner.io/use-clusterset-ip" annotation is always set on the aggregated ServiceImport. - If an exporting cluster's local clusterset enablement does not match the annotation setting on the aggregated ServiceImport, a "ConflictingClusterSetIPEnablement" Conflict condition is set. - If the allocating cluster's service is unexported, the ServiceImport's clusterset IP remains and is not release to the pool. - When a service is unexported from all clusters and the aggregated ServiceImport is deleted, the allocating cluster releases the clusterset IP back to the pool. - On restart, before starting the syncers, the controller iterates thru all the local aggregated ServiceImports and reserves any clusterset IPs that were allocated from its pool, ie that are in its CIDR range. Signed-off-by: Tom Pantelis --- go.mod | 1 + go.sum | 2 + pkg/agent/controller/agent.go | 15 +- .../controller/clusterip_service_test.go | 173 +++++++++++++++++- pkg/agent/controller/controller_suite_test.go | 48 ++++- pkg/agent/controller/reconciliation_test.go | 29 ++- pkg/agent/controller/service_import.go | 153 ++++++++++++++-- .../service_import_migration_test.go | 55 +++++- pkg/agent/controller/types.go | 30 +-- pkg/agent/main.go | 12 +- pkg/constants/constants.go | 2 + 11 files changed, 476 insertions(+), 44 deletions(-) diff --git a/go.mod b/go.mod index 5d0ec49df..2e106d83f 100644 --- a/go.mod +++ b/go.mod @@ -24,6 +24,7 @@ require ( github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect + github.com/emirpasic/gods v1.18.1 // indirect github.com/evanphx/json-patch/v5 v5.9.0 // indirect github.com/fxamacker/cbor/v2 v2.7.0 // indirect github.com/go-logr/logr v1.4.2 // indirect diff --git a/go.sum b/go.sum index 9703be626..dcbbf1526 100644 --- a/go.sum +++ b/go.sum @@ -73,6 +73,8 @@ github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb github.com/emicklei/go-restful v2.9.5+incompatible/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g= github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= +github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc= +github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/evanphx/json-patch v4.2.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= diff --git a/pkg/agent/controller/agent.go b/pkg/agent/controller/agent.go index f40b3a02f..87491cb57 100644 --- a/pkg/agent/controller/agent.go +++ b/pkg/agent/controller/agent.go @@ -27,6 +27,7 @@ import ( "github.com/pkg/errors" "github.com/submariner-io/admiral/pkg/federate" + "github.com/submariner-io/admiral/pkg/ipam" "github.com/submariner-io/admiral/pkg/log" "github.com/submariner-io/admiral/pkg/syncer" "github.com/submariner-io/admiral/pkg/syncer/broker" @@ -52,12 +53,13 @@ const ( type AgentConfig struct { ServiceImportCounterName string ServiceExportCounterName string + IPPool *ipam.IPPool } var logger = log.Logger{Logger: logf.Log.WithName("agent")} //nolint:gocritic // (hugeParam) This function modifies syncerConf so we don't want to pass by pointer. -func New(spec *AgentSpecification, syncerConf broker.SyncerConfig, syncerMetricNames AgentConfig) (*Controller, error) { +func New(spec *AgentSpecification, syncerConf broker.SyncerConfig, agentConfig AgentConfig) (*Controller, error) { if errs := validations.IsDNS1123Label(spec.ClusterID); len(errs) > 0 { return nil, errors.Errorf("%s is not a valid ClusterID %v", spec.ClusterID, errs) } @@ -124,7 +126,7 @@ func New(spec *AgentSpecification, syncerConf broker.SyncerConfig, syncerMetricN return nil, err } - agentController.serviceImportController, err = newServiceImportController(spec, syncerMetricNames, syncerConf, + agentController.serviceImportController, err = newServiceImportController(spec, agentConfig, syncerConf, agentController.endpointSliceController.syncer.GetBrokerClient(), agentController.endpointSliceController.syncer.GetBrokerNamespace(), agentController.serviceExportClient, func(selector k8slabels.Selector) []runtime.Object { @@ -244,6 +246,10 @@ func (a *Controller) serviceExportToServiceImport(obj runtime.Object, _ int, op serviceImport := a.newServiceImport(svcExport.Name, svcExport.Namespace) serviceImport.Annotations[constants.PublishNotReadyAddresses] = strconv.FormatBool(svc.Spec.PublishNotReadyAddresses) + if svcExport.Annotations[constants.UseClustersetIP] != "" { + serviceImport.Annotations[constants.UseClustersetIP] = svcExport.Annotations[constants.UseClustersetIP] + } + serviceImport.Spec = mcsv1a1.ServiceImportSpec{ Ports: []mcsv1a1.ServicePort{}, Type: svcType, @@ -303,6 +309,11 @@ func getServiceImportType(service *corev1.Service) (mcsv1a1.ServiceImportType, b } func (a *Controller) shouldProcessServiceExportUpdate(oldObj, newObj *unstructured.Unstructured) bool { + // To reduce unnecessary churn, only process a ServiceExport update if the UseClustersetIP annotation or the Valid condition changed. + if oldObj.GetAnnotations()[constants.UseClustersetIP] != newObj.GetAnnotations()[constants.UseClustersetIP] { + return true + } + oldValidCond := FindServiceExportStatusCondition(a.toServiceExport(oldObj).Status.Conditions, mcsv1a1.ServiceExportValid) newValidCond := FindServiceExportStatusCondition(a.toServiceExport(newObj).Status.Conditions, mcsv1a1.ServiceExportValid) diff --git a/pkg/agent/controller/clusterip_service_test.go b/pkg/agent/controller/clusterip_service_test.go index 9e61af967..300704a43 100644 --- a/pkg/agent/controller/clusterip_service_test.go +++ b/pkg/agent/controller/clusterip_service_test.go @@ -19,11 +19,13 @@ limitations under the License. package controller_test import ( + "context" "fmt" "strconv" "time" . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" "github.com/submariner-io/admiral/pkg/fake" "github.com/submariner-io/admiral/pkg/resource" "github.com/submariner-io/admiral/pkg/syncer/test" @@ -214,6 +216,90 @@ func testClusterIPServiceInOneCluster() { }) }) + Context("with clusterset IP enabled", func() { + BeforeEach(func() { + t.useClusterSetIP = true + }) + + JustBeforeEach(func() { + t.cluster1.createService() + t.cluster1.createServiceExport() + }) + + Context("via ServiceExport annotation", func() { + BeforeEach(func() { + t.cluster1.serviceExport.Annotations = map[string]string{constants.UseClustersetIP: strconv.FormatBool(true)} + }) + + It("should allocate an IP for the aggregated ServiceImport and release the IP when unexported", func() { + t.awaitNonHeadlessServiceExported(&t.cluster1) + + localSI := getServiceImport(t.cluster1.localServiceImportClient, t.cluster1.service.Namespace, t.cluster1.service.Name) + Expect(localSI.Annotations).To(HaveKeyWithValue(constants.ClustersetIPAllocatedBy, t.cluster1.clusterID)) + + By("Unexporting the service") + + t.cluster1.deleteServiceExport() + + Eventually(func() error { + return t.ipPool.Reserve(localSI.Spec.IPs...) + }).Should(Succeed(), "ServiceImport IP was not released") + }) + + Context("but with no IP pool specified", func() { + BeforeEach(func() { + t.useClusterSetIP = false + t.ipPool = nil + }) + + It("should not set the IP on the aggregated ServiceImport", func() { + t.awaitNonHeadlessServiceExported(&t.cluster1) + }) + }) + + Context("with the IP pool initially exhausted", func() { + var ips []string + + BeforeEach(func() { + var err error + + ips, err = t.ipPool.Allocate(t.ipPool.Size()) + Expect(err).To(Succeed()) + }) + + It("should eventually set the IP on the aggregated ServiceImport", func() { + t.cluster1.awaitServiceExportCondition(newServiceExportReadyCondition(corev1.ConditionFalse, + controller.ExportFailedReason)) + + _ = t.ipPool.Release(ips...) + + t.awaitNonHeadlessServiceExported(&t.cluster1) + }) + }) + }) + + Context("via the global setting", func() { + BeforeEach(func() { + t.cluster1.agentSpec.ClustersetIPEnabled = true + }) + + It("should set the IP on the aggregated ServiceImport", func() { + t.awaitNonHeadlessServiceExported(&t.cluster1) + }) + + Context("but disabled via ServiceExport annotation", func() { + BeforeEach(func() { + t.useClusterSetIP = false + t.cluster1.serviceExport.Annotations = map[string]string{constants.UseClustersetIP: strconv.FormatBool(false)} + }) + + It("should not set the IP on the aggregated ServiceImport", func() { + t.awaitNonHeadlessServiceExported(&t.cluster1) + }) + }) + }) + }) + When("two Services with the same name in different namespaces are exported", func() { It("should correctly export both services", func() { t.cluster1.createService() @@ -288,7 +374,7 @@ func testClusterIPServiceInOneCluster() { test.CreateResource(t.cluster1.dynamicServiceClientFor().Namespace(service.Namespace), service) test.CreateResource(serviceExportClientFor(t.cluster1.localDynClient, service.Namespace), serviceExport) - awaitServiceImport(t.cluster2.localServiceImportClient, expServiceImport) + awaitServiceImport(t.cluster2.localServiceImportClient, expServiceImport, t.ipPool) awaitEndpointSlice(endpointSliceClientFor(t.cluster2.localDynClient, service.Namespace), service.Name, expEndpointSlice) // Ensure the resources for the first Service weren't overwritten @@ -346,7 +432,7 @@ func testClusterIPServiceInOneCluster() { }, } - awaitServiceImport(t.cluster1.localServiceImportClient, expServiceImport) + awaitServiceImport(t.cluster1.localServiceImportClient, expServiceImport, t.ipPool) testutil.EnsureNoResource(resource.ForDynamic(t.cluster2.localServiceImportClient.Namespace( t.cluster1.service.Namespace)), t.cluster1.service.Name) @@ -402,7 +488,7 @@ func testClusterIPServiceInTwoClusters() { t.cluster1.start(t, *t.syncerConfig) // Sleep a little before starting the second cluster to ensure its resource CreationTimestamps will be - // later than the first cluster to ensure conflict checking in deterministic. + // later than the first cluster to ensure conflict checking is deterministic. time.Sleep(100 * time.Millisecond) t.cluster2.createServiceEndpointSlices() @@ -671,6 +757,87 @@ func testClusterIPServiceInTwoClusters() { t.cluster1.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict) }) }) + + Context("with clusterset IP enabled on the first exporting cluster but not the second", func() { + BeforeEach(func() { + t.useClusterSetIP = true + t.cluster1.serviceExport.Annotations = map[string]string{constants.UseClustersetIP: strconv.FormatBool(true)} + }) + + JustBeforeEach(func() { + t.awaitNonHeadlessServiceExported(&t.cluster1, &t.cluster2) + }) + + It("should set the Conflict status condition on the second cluster", func() { + t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition(controller.ClusterSetIPEnablementConflictReason)) + t.cluster1.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict) + + By("Updating the ServiceExport on the second cluster") + + se, err := t.cluster2.localServiceExportClient.Get(context.TODO(), t.cluster2.serviceExport.Name, metav1.GetOptions{}) + Expect(err).To(Succeed()) + + se.SetAnnotations(map[string]string{constants.UseClustersetIP: strconv.FormatBool(true)}) + test.UpdateResource(t.cluster2.localServiceExportClient, se) + + t.cluster2.awaitServiceExportCondition(noConflictCondition) + }) + + It("should not release the allocated clusterset IP until all clusters have unexported", func() { + localSI := getServiceImport(t.cluster1.localServiceImportClient, t.cluster1.service.Namespace, t.cluster1.service.Name) + + By("Unexporting service on the first cluster") + + t.cluster1.deleteServiceExport() + + t.awaitNoEndpointSlice(&t.cluster1) + t.awaitAggregatedServiceImport(mcsv1a1.ClusterSetIP, t.cluster1.service.Name, t.cluster1.service.Namespace, &t.cluster2) + + Consistently(func() error { + return t.ipPool.Reserve(localSI.Spec.IPs...) + }).ShouldNot(Succeed(), "ServiceImport IP was released") + + By("Unexporting service on the second cluster") + + t.cluster2.deleteServiceExport() + + t.awaitServiceUnexported(&t.cluster2) + + Eventually(func() error { + return t.ipPool.Reserve(localSI.Spec.IPs...) + }).Should(Succeed(), "ServiceImport IP was not released") + }) + }) + + Context("with clusterset IP disabled on the first exporting cluster but enabled on the second", func() { + BeforeEach(func() { + t.cluster2.serviceExport.Annotations = map[string]string{constants.UseClustersetIP: strconv.FormatBool(true)} + }) + + It("should set the Conflict status condition on the second cluster", func() { + t.awaitNonHeadlessServiceExported(&t.cluster1, &t.cluster2) + t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition(controller.ClusterSetIPEnablementConflictReason)) + t.cluster1.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict) + }) + }) + + Context("with clusterset IP enabled on both clusters", func() { + BeforeEach(func() { + t.useClusterSetIP = true + t.cluster1.serviceExport.Annotations = map[string]string{constants.UseClustersetIP: strconv.FormatBool(true)} + t.cluster2.serviceExport.Annotations = map[string]string{constants.UseClustersetIP: strconv.FormatBool(true)} + }) + + Specify("the first cluster should allocate the clusterset IP", func() { + t.awaitNonHeadlessServiceExported(&t.cluster1, &t.cluster2) + + localSI := getServiceImport(t.cluster1.localServiceImportClient, t.cluster1.service.Namespace, t.cluster1.service.Name) + Expect(localSI.Annotations).To(HaveKeyWithValue(constants.ClustersetIPAllocatedBy, t.cluster1.clusterID)) + + t.cluster1.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict) + t.cluster2.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict) + }) + }) } func testClusterIPServiceWithMultipleEPS() { diff --git a/pkg/agent/controller/controller_suite_test.go b/pkg/agent/controller/controller_suite_test.go index 472c46f60..d3e258714 100644 --- a/pkg/agent/controller/controller_suite_test.go +++ b/pkg/agent/controller/controller_suite_test.go @@ -34,6 +34,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/submariner-io/admiral/pkg/fake" + "github.com/submariner-io/admiral/pkg/ipam" "github.com/submariner-io/admiral/pkg/log/kzerolog" "github.com/submariner-io/admiral/pkg/resource" "github.com/submariner-io/admiral/pkg/syncer/broker" @@ -143,6 +144,8 @@ type testDriver struct { stopCh chan struct{} syncerConfig *broker.SyncerConfig doStart bool + useClusterSetIP bool + ipPool *ipam.IPPool brokerServiceImportReactor *fake.FailingReactor aggregatedServicePorts []mcsv1a1.ServicePort aggregatedSessionAffinity corev1.ServiceAffinity @@ -304,6 +307,11 @@ func newTestDiver() *testDriver { doStart: true, } + var err error + + t.ipPool, err = ipam.NewIPPool("243.10.1.0/24", nil) + Expect(err).To(Succeed()) + t.brokerServiceImportReactor = fake.NewFailingReactorForResource(&brokerClient.Fake, "serviceimports") t.brokerEndpointSliceReactor = fake.NewFailingReactorForResource(&brokerClient.Fake, "endpointslices") @@ -384,6 +392,7 @@ func (c *cluster) start(t *testDriver, syncerConfig broker.SyncerConfig) { controller.AgentConfig{ ServiceImportCounterName: serviceImportCounterName, ServiceExportCounterName: serviceExportCounterName, + IPPool: t.ipPool, }) Expect(err).To(Succeed()) @@ -625,7 +634,8 @@ func (c *cluster) ensureNoServiceExportActions() { }, 500*time.Millisecond).Should(BeEmpty()) } -func awaitServiceImport(client dynamic.NamespaceableResourceInterface, expected *mcsv1a1.ServiceImport) { +func awaitServiceImport(client dynamic.NamespaceableResourceInterface, expected *mcsv1a1.ServiceImport, ipPool *ipam.IPPool, +) *mcsv1a1.ServiceImport { sortSlices := func(si *mcsv1a1.ServiceImport) { sort.SliceStable(si.Spec.Ports, func(i, j int) bool { return si.Spec.Ports[i].Port < si.Spec.Ports[j].Port @@ -638,6 +648,10 @@ func awaitServiceImport(client dynamic.NamespaceableResourceInterface, expected sortSlices(expected) + expected = expected.DeepCopy() + expectedServiceImportIPs := expected.Spec.IPs + expected.Spec.IPs = nil + var serviceImport *mcsv1a1.ServiceImport err := wait.PollUntilContextTimeout(context.Background(), 50*time.Millisecond, 5*time.Second, true, @@ -651,7 +665,12 @@ func awaitServiceImport(client dynamic.NamespaceableResourceInterface, expected sortSlices(serviceImport) - return reflect.DeepEqual(&expected.Spec, &serviceImport.Spec) && reflect.DeepEqual(&expected.Status, &serviceImport.Status), nil + ipsEquivalent := len(expectedServiceImportIPs) == len(serviceImport.Spec.IPs) + actualSpec := serviceImport.Spec.DeepCopy() + actualSpec.IPs = nil + + return ipsEquivalent && reflect.DeepEqual(&expected.Spec, actualSpec) && + reflect.DeepEqual(&expected.Status, &serviceImport.Status), nil }) if !wait.Interrupted(err) { @@ -662,10 +681,21 @@ func awaitServiceImport(client dynamic.NamespaceableResourceInterface, expected Fail(fmt.Sprintf("ServiceImport %s/%s not found", expected.Namespace, expected.Name)) } + Expect(serviceImport.Spec.IPs).To(HaveLen(len(expectedServiceImportIPs))) + + if len(serviceImport.Spec.IPs) > 0 { + Expect(ipPool.Reserve(serviceImport.Spec.IPs...)).ToNot(Succeed(), ""+ + "ServiceImport IP was not allocated or reserved") + } + + serviceImport.Spec.IPs = nil + Expect(serviceImport.Spec).To(Equal(expected.Spec)) Expect(serviceImport.Status).To(Equal(expected.Status)) Expect(serviceImport.Labels).To(BeEmpty()) + + return serviceImport } func getServiceImport(client dynamic.NamespaceableResourceInterface, namespace, name string) *mcsv1a1.ServiceImport { @@ -795,6 +825,10 @@ func (t *testDriver) awaitAggregatedServiceImport(sType mcsv1a1.ServiceImportTyp if len(clusters) > 0 { if sType == mcsv1a1.ClusterSetIP { expServiceImport.Spec.Ports = t.aggregatedServicePorts + + if t.useClusterSetIP { + expServiceImport.Spec.IPs = []string{"1.1.1.1"} + } } for _, c := range clusters { @@ -803,13 +837,17 @@ func (t *testDriver) awaitAggregatedServiceImport(sType mcsv1a1.ServiceImportTyp } } - awaitServiceImport(t.brokerServiceImportClient, expServiceImport) + actual := awaitServiceImport(t.brokerServiceImportClient, expServiceImport, t.ipPool) + + if sType == mcsv1a1.ClusterSetIP { + Expect(actual.Annotations).To(HaveKeyWithValue(constants.UseClustersetIP, strconv.FormatBool(t.useClusterSetIP))) + } expServiceImport.Name = name expServiceImport.Namespace = ns - awaitServiceImport(t.cluster1.localServiceImportClient, expServiceImport) - awaitServiceImport(t.cluster2.localServiceImportClient, expServiceImport) + awaitServiceImport(t.cluster1.localServiceImportClient, expServiceImport, t.ipPool) + awaitServiceImport(t.cluster2.localServiceImportClient, expServiceImport, t.ipPool) } func (t *testDriver) ensureAggregatedServiceImport(sType mcsv1a1.ServiceImportType, name, ns string, clusters ...*cluster) { diff --git a/pkg/agent/controller/reconciliation_test.go b/pkg/agent/controller/reconciliation_test.go index 2ab140ada..e4354920e 100644 --- a/pkg/agent/controller/reconciliation_test.go +++ b/pkg/agent/controller/reconciliation_test.go @@ -41,12 +41,13 @@ import ( var _ = Describe("Reconciliation", func() { var ( - t *testDriver - serviceExport *mcsv1a1.ServiceExport - localServiceImport *mcsv1a1.ServiceImport - localEndpointSlice *discovery.EndpointSlice - brokerServiceImports *unstructured.UnstructuredList - brokerEndpointSlices *unstructured.UnstructuredList + t *testDriver + serviceExport *mcsv1a1.ServiceExport + localServiceImport *mcsv1a1.ServiceImport + localAggregatedServiceImport *unstructured.Unstructured + localEndpointSlice *discovery.EndpointSlice + brokerServiceImports *unstructured.UnstructuredList + brokerEndpointSlices *unstructured.UnstructuredList ) BeforeEach(func() { @@ -55,6 +56,7 @@ var _ = Describe("Reconciliation", func() { JustBeforeEach(func() { t.justBeforeEach() + t.cluster1.createServiceEndpointSlices() t.cluster1.createService() t.cluster1.createServiceExport() @@ -76,6 +78,11 @@ var _ = Describe("Reconciliation", func() { localServiceImport = t.cluster1.findLocalServiceImport() Expect(localServiceImport).ToNot(BeNil()) + localAggregatedServiceImport, err = t.cluster1.localServiceImportClient.Namespace(serviceNamespace).Get(context.TODO(), + serviceName, metav1.GetOptions{}) + Expect(err).To(Succeed()) + localAggregatedServiceImport.SetResourceVersion("") + endpointSlices := t.cluster1.findLocalEndpointSlices() Expect(endpointSlices).To(HaveLen(1)) localEndpointSlice = endpointSlices[0] @@ -100,14 +107,24 @@ var _ = Describe("Reconciliation", func() { } Context("on restart after a service was exported", func() { + BeforeEach(func() { + t.useClusterSetIP = true + t.cluster1.serviceExport.Annotations = map[string]string{constants.UseClustersetIP: strconv.FormatBool(true)} + }) + It("should retain the exported resources on reconciliation", func() { t.afterEach() t = newTestDiver() + t.useClusterSetIP = true test.CreateResource(t.cluster1.localServiceImportClient.Namespace(test.LocalNamespace), localServiceImport) test.CreateResource(t.cluster1.localEndpointSliceClient, localEndpointSlice) test.CreateResource(t.cluster1.localServiceExportClient, serviceExport) + _, err := t.cluster1.localServiceImportClient.Namespace(serviceNamespace).Create(context.TODO(), localAggregatedServiceImport, + metav1.CreateOptions{}) + Expect(err).To(Succeed()) + restoreBrokerResources() t.cluster1.createServiceEndpointSlices() diff --git a/pkg/agent/controller/service_import.go b/pkg/agent/controller/service_import.go index b584a39ab..aea5de8d5 100644 --- a/pkg/agent/controller/service_import.go +++ b/pkg/agent/controller/service_import.go @@ -22,6 +22,7 @@ import ( "context" "fmt" "math" + "net" "reflect" "strconv" "strings" @@ -52,7 +53,7 @@ import ( const timestampAnnotationPrefix = "timestamp.submariner.io/" //nolint:gocritic // (hugeParam) This function modifies syncerConf so we don't want to pass by pointer. -func newServiceImportController(spec *AgentSpecification, syncerMetricNames AgentConfig, syncerConfig broker.SyncerConfig, +func newServiceImportController(spec *AgentSpecification, agentConfig AgentConfig, syncerConfig broker.SyncerConfig, brokerClient dynamic.Interface, brokerNamespace string, serviceExportClient *ServiceExportClient, localLHEndpointSliceLister EndpointSliceListerFn, ) (*ServiceImportController, error) { @@ -65,6 +66,8 @@ func newServiceImportController(spec *AgentSpecification, syncerMetricNames Agen serviceImportAggregator: newServiceImportAggregator(brokerClient, brokerNamespace, spec.ClusterID, syncerConfig.Scheme), serviceExportClient: serviceExportClient, localLHEndpointSliceLister: localLHEndpointSliceLister, + clustersetIPPool: agentConfig.IPPool, + clustersetIPEnabled: spec.ClustersetIPEnabled, } var err error @@ -80,7 +83,7 @@ func newServiceImportController(spec *AgentSpecification, syncerMetricNames Agen Transform: controller.onLocalServiceImport, Scheme: syncerConfig.Scheme, SyncCounterOpts: &prometheus.GaugeOpts{ - Name: syncerMetricNames.ServiceExportCounterName, + Name: agentConfig.ServiceExportCounterName, Help: "Count of exported services", }, }) @@ -109,7 +112,7 @@ func newServiceImportController(spec *AgentSpecification, syncerMetricNames Agen Scheme: syncerConfig.Scheme, NamespaceInformer: syncerConfig.NamespaceInformer, SyncCounterOpts: &prometheus.GaugeOpts{ - Name: syncerMetricNames.ServiceImportCounterName, + Name: agentConfig.ServiceImportCounterName, Help: "Count of imported services", }, }) @@ -146,6 +149,10 @@ func (c *ServiceImportController) start(stopCh <-chan struct{}) error { logger.Info("ServiceImport Controller stopped") }() + if err := c.reserveAggregatedServiceImportIPs(); err != nil { + return err + } + if err := c.localSyncer.Start(stopCh); err != nil { return errors.Wrap(err, "error starting local ServiceImport syncer") } @@ -160,6 +167,42 @@ func (c *ServiceImportController) start(stopCh <-chan struct{}) error { return nil } +func (c *ServiceImportController) isIPInClustersetCIDR(si *mcsv1a1.ServiceImport) bool { + if c.clustersetIPPool == nil || len(si.Spec.IPs) == 0 { + return false + } + + ip := net.ParseIP(si.Spec.IPs[0]) + _, cidr, _ := net.ParseCIDR(c.clustersetIPPool.GetCIDR()) + + return ip != nil && cidr.Contains(ip) +} + +func (c *ServiceImportController) reserveAggregatedServiceImportIPs() error { + client := c.localClient.Resource(serviceImportGVR).Namespace(corev1.NamespaceAll) + + list, err := client.List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return errors.Wrap(err, "error listing the local ServiceImports") + } + + for i := range list.Items { + si := c.converter.toServiceImport(&list.Items[i]) + + if serviceImportSourceName(si) != "" || !c.isIPInClustersetCIDR(si) { + continue + } + + err = c.clustersetIPPool.Reserve(si.Spec.IPs[0]) + if err != nil { + logger.Errorf(err, "Unable to reserve clusterset IP %q in CIDR %q for ServiceImport %s", + si.Spec.IPs[0], c.clustersetIPPool.GetCIDR(), resource.ToJSON(si)) + } + } + + return nil +} + func (c *ServiceImportController) reconcileRemoteAggregatedServiceImports() { c.localSyncer.Reconcile(func() []runtime.Object { siList := c.remoteSyncer.ListResources() @@ -335,6 +378,9 @@ func (c *ServiceImportController) Distribute(ctx context.Context, obj runtime.Ob } typeConflict := false + clusterSetIP := "" + + useClusterSetIP := c.determineUseClusterSetIP(localServiceImport) // Here we create the aggregated ServiceImport on the broker or update the existing instance with our local service // info, but we don't add/merge our local service ports until we've successfully synced our local EndpointSlice to @@ -342,10 +388,10 @@ func (c *ServiceImportController) Distribute(ctx context.Context, obj runtime.Ob // EndpointSlices, thus each cluster must have a consistent view of all the EndpointSlices in order for the // aggregated port information to be eventually consistent. - result, err := util.CreateOrUpdate(ctx, - resource.ForDynamic(c.serviceImportAggregator.brokerServiceImportClient()), - c.converter.toUnstructured(aggregate), - func(obj *unstructured.Unstructured) (*unstructured.Unstructured, error) { + result, newAggregate, err := util.CreateOrUpdateWithOptions(ctx, util.CreateOrUpdateOptions[*unstructured.Unstructured]{ + Client: resource.ForDynamic(c.serviceImportAggregator.brokerServiceImportClient()), + Obj: c.converter.toUnstructured(aggregate), + MutateOnUpdate: func(obj *unstructured.Unstructured) (*unstructured.Unstructured, error) { existing := c.converter.toServiceImport(obj) if localServiceImport.Spec.Type != existing.Spec.Type { @@ -370,10 +416,15 @@ func (c *ServiceImportController) Distribute(ctx context.Context, obj runtime.Ob existing.Annotations[timestampAnnotationKey] = localTimestamp + if _, found := existing.Annotations[constants.UseClustersetIP]; !found { + // This will happen on migration from pre-clusterset IP version + existing.Annotations[constants.UseClustersetIP] = strconv.FormatBool(false) + } + // Update the appropriate aggregated ServiceImport fields if we're the oldest exporting cluster _ = c.updateAggregatedServiceImport(existing, localServiceImport) - c.checkConflicts(ctx, existing, localServiceImport) + c.checkConflicts(ctx, existing, localServiceImport, &useClusterSetIP) var added bool @@ -387,7 +438,28 @@ func (c *ServiceImportController) Distribute(ctx context.Context, obj runtime.Ob } return c.converter.toUnstructured(existing), nil - }) + }, + MutateOnCreate: func(obj *unstructured.Unstructured) (*unstructured.Unstructured, error) { + si := c.converter.toServiceImport(obj) + + if si.Spec.Type != mcsv1a1.ClusterSetIP { + return obj, nil + } + + var err error + + if useClusterSetIP { + clusterSetIP, err = c.allocateClusterSetIPIfNeeded(clusterSetIP) + + si.Spec.IPs = []string{clusterSetIP} + si.Annotations[constants.ClustersetIPAllocatedBy] = c.clusterID + } + + si.Annotations[constants.UseClustersetIP] = strconv.FormatBool(useClusterSetIP) + + return c.converter.toUnstructured(si), err + }, + }) if err == nil && !typeConflict { err = c.startEndpointsController(localServiceImport) } @@ -396,10 +468,14 @@ func (c *ServiceImportController) Distribute(ctx context.Context, obj runtime.Ob c.serviceExportClient.UpdateStatusConditions(ctx, serviceName, serviceNamespace, newServiceExportCondition(constants.ServiceExportReady, corev1.ConditionFalse, ExportFailedReason, fmt.Sprintf("Unable to export: %v", err))) + + if clusterSetIP != "" { + _ = c.clustersetIPPool.Release(clusterSetIP) + } } if result == util.OperationResultCreated { - logger.V(log.DEBUG).Infof("Created aggregated ServiceImport %s", resource.ToJSON(aggregate)) + logger.V(log.DEBUG).Infof("Created aggregated ServiceImport %s", resource.ToJSON(newAggregate)) } return err @@ -458,6 +534,14 @@ func (c *ServiceImportController) onSuccessfulSyncFromBroker(synced runtime.Obje aggregatedServiceImport := synced.(*mcsv1a1.ServiceImport) + if op == syncer.Delete { + if c.isIPInClustersetCIDR(aggregatedServiceImport) { + _ = c.clustersetIPPool.Release(aggregatedServiceImport.Spec.IPs[0]) + } + + return retry + } + // Check for conflicts with the local ServiceImport siList := c.localSyncer.ListResourcesBySelector(k8slabels.SelectorFromSet(map[string]string{ @@ -491,12 +575,38 @@ func (c *ServiceImportController) onSuccessfulSyncFromBroker(synced runtime.Obje } } - c.checkConflicts(ctx, aggregatedServiceImport, localServiceImport) + c.checkConflicts(ctx, aggregatedServiceImport, localServiceImport, nil) return retry } -func (c *ServiceImportController) checkConflicts(ctx context.Context, aggregated, local *mcsv1a1.ServiceImport) { +func (c *ServiceImportController) determineUseClusterSetIP(localServiceImport *mcsv1a1.ServiceImport) bool { + var useClusterSetIP bool + + useClusterSetIPStr, found := localServiceImport.Annotations[constants.UseClustersetIP] + if found { + useClusterSetIP = useClusterSetIPStr == strconv.FormatBool(true) + } else { + useClusterSetIP = c.clustersetIPEnabled + } + + return useClusterSetIP && c.clustersetIPPool != nil +} + +func (c *ServiceImportController) allocateClusterSetIPIfNeeded(existingIP string) (string, error) { + if existingIP == "" { + allocatedIPs, err := c.clustersetIPPool.Allocate(1) + if err != nil { + return "", errors.Wrap(err, "unable to allocate clusterset IP from the pool") + } + + existingIP = allocatedIPs[0] + } + + return existingIP, nil +} + +func (c *ServiceImportController) checkConflicts(ctx context.Context, aggregated, local *mcsv1a1.ServiceImport, useClusterSetIP *bool) { var conditions []mcsv1a1.ServiceExportCondition serviceName := local.Labels[mcsv1a1.LabelServiceName] @@ -529,6 +639,25 @@ func (c *ServiceImportController) checkConflicts(ctx context.Context, aggregated mcsv1a1.ServiceExportConflict, corev1.ConditionFalse, SessionAffinityConfigConflictReason, "")) } + if aggregated.Spec.Type == mcsv1a1.ClusterSetIP && useClusterSetIP != nil { + if aggregated.Annotations[constants.UseClustersetIP] != strconv.FormatBool(*useClusterSetIP) { + clusterName := aggregated.Annotations[constants.ClustersetIPAllocatedBy] + if clusterName == "" { + clusterName = precedentCluster + } + + conditions = append(conditions, newServiceExportCondition(mcsv1a1.ServiceExportConflict, corev1.ConditionTrue, + ClusterSetIPEnablementConflictReason, + fmt.Sprintf("The local service clusterset IP enablement setting %q conflicts with the enablement setting %q"+ + " determined by the first exporting cluster %q.", + strconv.FormatBool(*useClusterSetIP), aggregated.Annotations[constants.UseClustersetIP], clusterName))) + } else if c.serviceExportClient.hasCondition(serviceName, serviceNamespace, mcsv1a1.ServiceExportConflict, + ClusterSetIPEnablementConflictReason) { + conditions = append(conditions, newServiceExportCondition( + mcsv1a1.ServiceExportConflict, corev1.ConditionFalse, ClusterSetIPEnablementConflictReason, "")) + } + } + c.serviceExportClient.UpdateStatusConditions(ctx, serviceName, serviceNamespace, conditions...) } diff --git a/pkg/agent/controller/service_import_migration_test.go b/pkg/agent/controller/service_import_migration_test.go index 5400702a2..3cddd9b52 100644 --- a/pkg/agent/controller/service_import_migration_test.go +++ b/pkg/agent/controller/service_import_migration_test.go @@ -28,18 +28,67 @@ import ( "github.com/submariner-io/admiral/pkg/syncer/test" "github.com/submariner-io/lighthouse/pkg/agent/controller" "github.com/submariner-io/lighthouse/pkg/constants" + corev1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/dynamic" + "k8s.io/utils/ptr" mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" ) -var _ = Describe("ServiceImport migration", func() { - Describe("after restart on upgrade with a service that was previously exported", testServiceImportMigration) +var _ = Describe("Legacy ServiceImport migration", func() { + Describe("after restart on upgrade with a service that was previously exported", testLegacyServiceImportMigration) }) -func testServiceImportMigration() { +var _ = Describe("Pre-clusterset IP ServiceImport migration", func() { + var t *testDriver + + BeforeEach(func() { + t = newTestDiver() + }) + + JustBeforeEach(func() { + test.CreateResource(t.brokerServiceImportClient.Namespace(test.RemoteNamespace), &mcsv1a1.ServiceImport{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%s", serviceName, serviceNamespace), + Annotations: map[string]string{ + mcsv1a1.LabelServiceName: serviceName, + constants.LabelSourceNamespace: serviceNamespace, + }, + }, + Spec: mcsv1a1.ServiceImportSpec{ + Type: mcsv1a1.ClusterSetIP, + }, + }) + + t.cluster1.service.Spec.SessionAffinity = corev1.ServiceAffinityClientIP + t.cluster1.service.Spec.SessionAffinityConfig = &corev1.SessionAffinityConfig{ + ClientIP: &corev1.ClientIPConfig{TimeoutSeconds: ptr.To(int32(10))}, + } + + t.aggregatedSessionAffinity = t.cluster1.service.Spec.SessionAffinity + t.aggregatedSessionAffinityConfig = t.cluster1.service.Spec.SessionAffinityConfig + + t.cluster1.createServiceEndpointSlices() + t.cluster1.createService() + t.cluster1.createServiceExport() + + t.justBeforeEach() + }) + + AfterEach(func() { + t.afterEach() + }) + + It("should update the existing aggregated ServiceImport and not create any Conflict conditions", func() { + t.awaitNonHeadlessServiceExported(&t.cluster1) + + t.cluster1.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict) + }) +}) + +func testLegacyServiceImportMigration() { var ( t *testDriver legacyServiceImport *mcsv1a1.ServiceImport diff --git a/pkg/agent/controller/types.go b/pkg/agent/controller/types.go index 8254de4a5..ab4a7e796 100644 --- a/pkg/agent/controller/types.go +++ b/pkg/agent/controller/types.go @@ -22,6 +22,7 @@ import ( "sync" "github.com/submariner-io/admiral/pkg/federate" + "github.com/submariner-io/admiral/pkg/ipam" "github.com/submariner-io/admiral/pkg/syncer" "github.com/submariner-io/admiral/pkg/syncer/broker" "github.com/submariner-io/admiral/pkg/watcher" @@ -36,11 +37,12 @@ import ( ) const ( - ExportFailedReason = "ExportFailed" - TypeConflictReason = "ConflictingType" - PortConflictReason = "ConflictingPorts" - SessionAffinityConflictReason = "ConflictingSessionAffinity" - SessionAffinityConfigConflictReason = "ConflictingSessionAffinityConfig" + ExportFailedReason = "ExportFailed" + TypeConflictReason = "ConflictingType" + PortConflictReason = "ConflictingPorts" + SessionAffinityConflictReason = "ConflictingSessionAffinity" + SessionAffinityConfigConflictReason = "ConflictingSessionAffinityConfig" + ClusterSetIPEnablementConflictReason = "ConflictingClusterSetIPEnablement" ) type EndpointSliceListerFn func(selector k8slabels.Selector) []runtime.Object @@ -63,13 +65,15 @@ type Controller struct { } type AgentSpecification struct { - ClusterID string - Namespace string - Verbosity int - GlobalnetEnabled bool `split_words:"true"` - Uninstall bool - HaltOnCertError bool `split_words:"true"` - Debug bool + ClusterID string + Namespace string + Verbosity int + ClustersetIPCidr string `split_words:"true"` + ClustersetIPEnabled bool `split_words:"true"` + GlobalnetEnabled bool `split_words:"true"` + Uninstall bool + HaltOnCertError bool `split_words:"true"` + Debug bool } type ServiceImportAggregator struct { @@ -96,6 +100,8 @@ type ServiceImportController struct { converter converter globalIngressIPCache *globalIngressIPCache localLHEndpointSliceLister EndpointSliceListerFn + clustersetIPPool *ipam.IPPool + clustersetIPEnabled bool } // Each ServiceEndpointSliceController watches for the EndpointSlices that backs a Service and have a ServiceImport. diff --git a/pkg/agent/main.go b/pkg/agent/main.go index 5b2212bb7..3a731e923 100644 --- a/pkg/agent/main.go +++ b/pkg/agent/main.go @@ -25,9 +25,11 @@ import ( "github.com/kelseyhightower/envconfig" "github.com/submariner-io/admiral/pkg/http" + "github.com/submariner-io/admiral/pkg/ipam" "github.com/submariner-io/admiral/pkg/log" "github.com/submariner-io/admiral/pkg/log/kzerolog" "github.com/submariner-io/admiral/pkg/names" + "github.com/submariner-io/admiral/pkg/resource" "github.com/submariner-io/admiral/pkg/syncer/broker" "github.com/submariner-io/admiral/pkg/util" admversion "github.com/submariner-io/admiral/pkg/version" @@ -102,7 +104,7 @@ func main() { klogFlags.Parse(os.Args[1:]) logger.Infof("Arguments: %v", os.Args) - logger.Infof("AgentSpec: %#v", agentSpec) + logger.Infof("AgentSpec: %s", resource.ToJSON(agentSpec)) util.AddCertificateErrorHandler(agentSpec.HaltOnCertError) @@ -121,6 +123,13 @@ func main() { // set up signals so we handle the first shutdown signal gracefully ctx := signals.SetupSignalHandler() + var ipPool *ipam.IPPool + + if agentSpec.ClustersetIPCidr != "" { + ipPool, err = ipam.NewIPPool(agentSpec.ClustersetIPCidr, nil) + exitOnError(err, "Error creating clusterset IP pool") + } + lightHouseAgent, err := controller.New(&agentSpec, broker.SyncerConfig{ LocalRestConfig: cfg, LocalClient: localClient, @@ -129,6 +138,7 @@ func main() { }, controller.AgentConfig{ ServiceImportCounterName: "submariner_service_import", ServiceExportCounterName: "submariner_service_export", + IPPool: ipPool, }) exitOnError(err, "Failed to create lighthouse agent") diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index cdb49963b..e2e26c5b1 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -30,6 +30,8 @@ const ( LabelSourceName = "lighthouse.submariner.io/source-name" PublishNotReadyAddresses = "lighthouse.submariner.io/publish-not-ready-addresses" GlobalnetEnabled = "lighthouse.submariner.io/globalnet-enabled" + UseClustersetIP = "lighthouse.submariner.io/use-clusterset-ip" + ClustersetIPAllocatedBy = "lighthouse.submariner.io/clusterset-ip-allocated-by" ) const ServiceExportReady v1alpha1.ServiceExportConditionType = "Ready"