Skip to content

Commit

Permalink
Implement clusterset VIP for ServiceImports in LH agent
Browse files Browse the repository at this point in the history
As per
https://github.com/submariner-io/enhancements/blob/devel/lighthouse/mcs-compliance.md

Highlights:

- The clusterset IP enabled flag and clusterset IP CIDR are passed vi
  env vars SUBMARINER_CLUSTERSET_IP_ENABLED and
  SUBMARINER_CLUSTERSET_IP_CIDR.

- If no CIDR is passed, then clusterset IP functionality is disabled

- Clusterset IP allocation is determined by the Serviceexport annotation
  "lighthouse.submariner.io/use-clusterset-ip" or, if not set, the
  global enablement flag.

- Clusterset allocation is done only on creation of the aggregated
  ServiceImport. The
  "lighthouse.submariner.io/clusterset-ip-allocated-by" annotation
  is added by the allocating cluster.

- The "lighthouse.submariner.io/use-clusterset-ip" annotation is always
  set on the aggregated ServiceImport.

- If an exporting cluster's local clusterset enablement does not match
  the annotation setting on the aggregated ServiceImport, a
  "ConflictingClusterSetIPEnablement" Conflict condition is set.

- If the allocating cluster's service is unexported, the ServiceImport's
  clusterset IP remains and is not release to the pool.

- When a service is unexported from all clusters and the aggregated
  ServiceImport is deleted, the allocating cluster releases the
  clusterset IP back to the pool.

- On restart, before starting the syncers, the controller iterates thru
  all the local aggregated ServiceImports and reserves any clusterset
  IPs that were allocated from its pool, ie that are in its CIDR range.

Signed-off-by: Tom Pantelis <[email protected]>
  • Loading branch information
tpantelis committed Sep 6, 2024
1 parent 687c11c commit cf517f8
Show file tree
Hide file tree
Showing 11 changed files with 476 additions and 44 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ require (
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/emirpasic/gods v1.18.1 // indirect
github.com/evanphx/json-patch/v5 v5.9.0 // indirect
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb
github.com/emicklei/go-restful v2.9.5+incompatible/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g=
github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/evanphx/json-patch v4.2.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
Expand Down
15 changes: 13 additions & 2 deletions pkg/agent/controller/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/pkg/errors"
"github.com/submariner-io/admiral/pkg/federate"
"github.com/submariner-io/admiral/pkg/ipam"
"github.com/submariner-io/admiral/pkg/log"
"github.com/submariner-io/admiral/pkg/syncer"
"github.com/submariner-io/admiral/pkg/syncer/broker"
Expand All @@ -52,12 +53,13 @@ const (
type AgentConfig struct {
ServiceImportCounterName string
ServiceExportCounterName string
IPPool *ipam.IPPool
}

var logger = log.Logger{Logger: logf.Log.WithName("agent")}

//nolint:gocritic // (hugeParam) This function modifies syncerConf so we don't want to pass by pointer.
func New(spec *AgentSpecification, syncerConf broker.SyncerConfig, syncerMetricNames AgentConfig) (*Controller, error) {
func New(spec *AgentSpecification, syncerConf broker.SyncerConfig, agentConfig AgentConfig) (*Controller, error) {
if errs := validations.IsDNS1123Label(spec.ClusterID); len(errs) > 0 {
return nil, errors.Errorf("%s is not a valid ClusterID %v", spec.ClusterID, errs)
}
Expand Down Expand Up @@ -124,7 +126,7 @@ func New(spec *AgentSpecification, syncerConf broker.SyncerConfig, syncerMetricN
return nil, err
}

agentController.serviceImportController, err = newServiceImportController(spec, syncerMetricNames, syncerConf,
agentController.serviceImportController, err = newServiceImportController(spec, agentConfig, syncerConf,
agentController.endpointSliceController.syncer.GetBrokerClient(),
agentController.endpointSliceController.syncer.GetBrokerNamespace(), agentController.serviceExportClient,
func(selector k8slabels.Selector) []runtime.Object {
Expand Down Expand Up @@ -244,6 +246,10 @@ func (a *Controller) serviceExportToServiceImport(obj runtime.Object, _ int, op
serviceImport := a.newServiceImport(svcExport.Name, svcExport.Namespace)
serviceImport.Annotations[constants.PublishNotReadyAddresses] = strconv.FormatBool(svc.Spec.PublishNotReadyAddresses)

if svcExport.Annotations[constants.UseClustersetIP] != "" {
serviceImport.Annotations[constants.UseClustersetIP] = svcExport.Annotations[constants.UseClustersetIP]
}

serviceImport.Spec = mcsv1a1.ServiceImportSpec{
Ports: []mcsv1a1.ServicePort{},
Type: svcType,
Expand Down Expand Up @@ -303,6 +309,11 @@ func getServiceImportType(service *corev1.Service) (mcsv1a1.ServiceImportType, b
}

func (a *Controller) shouldProcessServiceExportUpdate(oldObj, newObj *unstructured.Unstructured) bool {
// To reduce unnecessary churn, only process a ServiceExport update if the UseClustersetIP annotation or the Valid condition changed.
if oldObj.GetAnnotations()[constants.UseClustersetIP] != newObj.GetAnnotations()[constants.UseClustersetIP] {
return true
}

oldValidCond := FindServiceExportStatusCondition(a.toServiceExport(oldObj).Status.Conditions, mcsv1a1.ServiceExportValid)
newValidCond := FindServiceExportStatusCondition(a.toServiceExport(newObj).Status.Conditions, mcsv1a1.ServiceExportValid)

Expand Down
173 changes: 170 additions & 3 deletions pkg/agent/controller/clusterip_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ limitations under the License.
package controller_test

import (
"context"
"fmt"
"strconv"
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/submariner-io/admiral/pkg/fake"
"github.com/submariner-io/admiral/pkg/resource"
"github.com/submariner-io/admiral/pkg/syncer/test"
Expand Down Expand Up @@ -214,6 +216,90 @@ func testClusterIPServiceInOneCluster() {
})
})

Context("with clusterset IP enabled", func() {
BeforeEach(func() {
t.useClusterSetIP = true
})

JustBeforeEach(func() {
t.cluster1.createService()
t.cluster1.createServiceExport()
})

Context("via ServiceExport annotation", func() {
BeforeEach(func() {
t.cluster1.serviceExport.Annotations = map[string]string{constants.UseClustersetIP: strconv.FormatBool(true)}
})

It("should allocate an IP for the aggregated ServiceImport and release the IP when unexported", func() {
t.awaitNonHeadlessServiceExported(&t.cluster1)

localSI := getServiceImport(t.cluster1.localServiceImportClient, t.cluster1.service.Namespace, t.cluster1.service.Name)
Expect(localSI.Annotations).To(HaveKeyWithValue(constants.ClustersetIPAllocatedBy, t.cluster1.clusterID))

By("Unexporting the service")

t.cluster1.deleteServiceExport()

Eventually(func() error {
return t.ipPool.Reserve(localSI.Spec.IPs...)
}).Should(Succeed(), "ServiceImport IP was not released")
})

Context("but with no IP pool specified", func() {
BeforeEach(func() {
t.useClusterSetIP = false
t.ipPool = nil
})

It("should not set the IP on the aggregated ServiceImport", func() {
t.awaitNonHeadlessServiceExported(&t.cluster1)
})
})

Context("with the IP pool initially exhausted", func() {
var ips []string

BeforeEach(func() {
var err error

ips, err = t.ipPool.Allocate(t.ipPool.Size())
Expect(err).To(Succeed())
})

It("should eventually set the IP on the aggregated ServiceImport", func() {
t.cluster1.awaitServiceExportCondition(newServiceExportReadyCondition(corev1.ConditionFalse,
controller.ExportFailedReason))

_ = t.ipPool.Release(ips...)

t.awaitNonHeadlessServiceExported(&t.cluster1)
})
})
})

Context("via the global setting", func() {
BeforeEach(func() {
t.cluster1.agentSpec.ClustersetIPEnabled = true
})

It("should set the IP on the aggregated ServiceImport", func() {
t.awaitNonHeadlessServiceExported(&t.cluster1)
})

Context("but disabled via ServiceExport annotation", func() {
BeforeEach(func() {
t.useClusterSetIP = false
t.cluster1.serviceExport.Annotations = map[string]string{constants.UseClustersetIP: strconv.FormatBool(false)}
})

It("should not set the IP on the aggregated ServiceImport", func() {
t.awaitNonHeadlessServiceExported(&t.cluster1)
})
})
})
})

When("two Services with the same name in different namespaces are exported", func() {
It("should correctly export both services", func() {
t.cluster1.createService()
Expand Down Expand Up @@ -288,7 +374,7 @@ func testClusterIPServiceInOneCluster() {
test.CreateResource(t.cluster1.dynamicServiceClientFor().Namespace(service.Namespace), service)
test.CreateResource(serviceExportClientFor(t.cluster1.localDynClient, service.Namespace), serviceExport)

awaitServiceImport(t.cluster2.localServiceImportClient, expServiceImport)
awaitServiceImport(t.cluster2.localServiceImportClient, expServiceImport, t.ipPool)
awaitEndpointSlice(endpointSliceClientFor(t.cluster2.localDynClient, service.Namespace), service.Name, expEndpointSlice)

// Ensure the resources for the first Service weren't overwritten
Expand Down Expand Up @@ -346,7 +432,7 @@ func testClusterIPServiceInOneCluster() {
},
}

awaitServiceImport(t.cluster1.localServiceImportClient, expServiceImport)
awaitServiceImport(t.cluster1.localServiceImportClient, expServiceImport, t.ipPool)

testutil.EnsureNoResource(resource.ForDynamic(t.cluster2.localServiceImportClient.Namespace(
t.cluster1.service.Namespace)), t.cluster1.service.Name)
Expand Down Expand Up @@ -402,7 +488,7 @@ func testClusterIPServiceInTwoClusters() {
t.cluster1.start(t, *t.syncerConfig)

// Sleep a little before starting the second cluster to ensure its resource CreationTimestamps will be
// later than the first cluster to ensure conflict checking in deterministic.
// later than the first cluster to ensure conflict checking is deterministic.
time.Sleep(100 * time.Millisecond)

t.cluster2.createServiceEndpointSlices()
Expand Down Expand Up @@ -671,6 +757,87 @@ func testClusterIPServiceInTwoClusters() {
t.cluster1.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict)
})
})

Context("with clusterset IP enabled on the first exporting cluster but not the second", func() {
BeforeEach(func() {
t.useClusterSetIP = true
t.cluster1.serviceExport.Annotations = map[string]string{constants.UseClustersetIP: strconv.FormatBool(true)}
})

JustBeforeEach(func() {
t.awaitNonHeadlessServiceExported(&t.cluster1, &t.cluster2)
})

It("should set the Conflict status condition on the second cluster", func() {
t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition(controller.ClusterSetIPEnablementConflictReason))
t.cluster1.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict)

By("Updating the ServiceExport on the second cluster")

se, err := t.cluster2.localServiceExportClient.Get(context.TODO(), t.cluster2.serviceExport.Name, metav1.GetOptions{})
Expect(err).To(Succeed())

se.SetAnnotations(map[string]string{constants.UseClustersetIP: strconv.FormatBool(true)})
test.UpdateResource(t.cluster2.localServiceExportClient, se)

t.cluster2.awaitServiceExportCondition(noConflictCondition)
})

It("should not release the allocated clusterset IP until all clusters have unexported", func() {
localSI := getServiceImport(t.cluster1.localServiceImportClient, t.cluster1.service.Namespace, t.cluster1.service.Name)

By("Unexporting service on the first cluster")

t.cluster1.deleteServiceExport()

t.awaitNoEndpointSlice(&t.cluster1)
t.awaitAggregatedServiceImport(mcsv1a1.ClusterSetIP, t.cluster1.service.Name, t.cluster1.service.Namespace, &t.cluster2)

Consistently(func() error {
return t.ipPool.Reserve(localSI.Spec.IPs...)
}).ShouldNot(Succeed(), "ServiceImport IP was released")

By("Unexporting service on the second cluster")

t.cluster2.deleteServiceExport()

t.awaitServiceUnexported(&t.cluster2)

Eventually(func() error {
return t.ipPool.Reserve(localSI.Spec.IPs...)
}).Should(Succeed(), "ServiceImport IP was not released")
})
})

Context("with clusterset IP disabled on the first exporting cluster but enabled on the second", func() {
BeforeEach(func() {
t.cluster2.serviceExport.Annotations = map[string]string{constants.UseClustersetIP: strconv.FormatBool(true)}
})

It("should set the Conflict status condition on the second cluster", func() {
t.awaitNonHeadlessServiceExported(&t.cluster1, &t.cluster2)
t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition(controller.ClusterSetIPEnablementConflictReason))
t.cluster1.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict)
})
})

Context("with clusterset IP enabled on both clusters", func() {
BeforeEach(func() {
t.useClusterSetIP = true
t.cluster1.serviceExport.Annotations = map[string]string{constants.UseClustersetIP: strconv.FormatBool(true)}
t.cluster2.serviceExport.Annotations = map[string]string{constants.UseClustersetIP: strconv.FormatBool(true)}
})

Specify("the first cluster should allocate the clusterset IP", func() {
t.awaitNonHeadlessServiceExported(&t.cluster1, &t.cluster2)

localSI := getServiceImport(t.cluster1.localServiceImportClient, t.cluster1.service.Namespace, t.cluster1.service.Name)
Expect(localSI.Annotations).To(HaveKeyWithValue(constants.ClustersetIPAllocatedBy, t.cluster1.clusterID))

t.cluster1.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict)
t.cluster2.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict)
})
})
}

func testClusterIPServiceWithMultipleEPS() {
Expand Down
Loading

0 comments on commit cf517f8

Please sign in to comment.