Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement conflict checking for SessionAffinity #1632

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 193 additions & 14 deletions pkg/agent/controller/clusterip_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package controller_test
import (
"fmt"
"strconv"
"time"

. "github.com/onsi/ginkgo/v2"
"github.com/submariner-io/admiral/pkg/fake"
Expand Down Expand Up @@ -201,6 +202,9 @@ func testClusterIPServiceInOneCluster() {
t.cluster1.service.Spec.SessionAffinityConfig = &corev1.SessionAffinityConfig{
ClientIP: &corev1.ClientIPConfig{TimeoutSeconds: ptr.To(int32(10))},
}

t.aggregatedSessionAffinity = t.cluster1.service.Spec.SessionAffinity
t.aggregatedSessionAffinityConfig = t.cluster1.service.Spec.SessionAffinityConfig
})

It("should be propagated to the ServiceImport", func() {
Expand Down Expand Up @@ -388,39 +392,55 @@ func testClusterIPServiceInTwoClusters() {

BeforeEach(func() {
t = newTestDiver()

t.cluster2.service.Spec.SessionAffinity = corev1.ServiceAffinityClientIP
t.cluster2.service.Spec.SessionAffinityConfig = &corev1.SessionAffinityConfig{
ClientIP: &corev1.ClientIPConfig{TimeoutSeconds: ptr.To(int32(10))},
}
})

JustBeforeEach(func() {
t.cluster1.createServiceEndpointSlices()
t.cluster1.createService()
t.cluster1.createServiceExport()

t.justBeforeEach()
t.cluster1.start(t, *t.syncerConfig)

// Sleep a little before starting the second cluster to ensure its resource CreationTimestamps will be
// later than the first cluster to ensure conflict checking in deterministic.
time.Sleep(100 * time.Millisecond)

t.cluster2.createServiceEndpointSlices()
t.cluster2.createService()
t.cluster2.createServiceExport()

t.cluster2.start(t, *t.syncerConfig)
})

AfterEach(func() {
t.afterEach()
})

It("should export the service in both clusters", func() {
t.awaitNonHeadlessServiceExported(&t.cluster1, &t.cluster2)
t.cluster1.ensureLastServiceExportCondition(newServiceExportReadyCondition(corev1.ConditionTrue, ""))
t.cluster1.ensureLastServiceExportCondition(newServiceExportValidCondition(corev1.ConditionTrue, ""))
t.cluster1.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict)
t.cluster2.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict)
Context("", func() {
BeforeEach(func() {
t.cluster1.service.Spec.SessionAffinity = corev1.ServiceAffinityClientIP
t.cluster1.service.Spec.SessionAffinityConfig = &corev1.SessionAffinityConfig{
ClientIP: &corev1.ClientIPConfig{TimeoutSeconds: ptr.To(int32(10))},
}

t.cluster2.service.Spec.SessionAffinity = t.cluster1.service.Spec.SessionAffinity
t.cluster2.service.Spec.SessionAffinityConfig = t.cluster1.service.Spec.SessionAffinityConfig

By("Ensure conflict checking does not try to unnecessarily update the ServiceExport status")
t.aggregatedSessionAffinity = t.cluster1.service.Spec.SessionAffinity
t.aggregatedSessionAffinityConfig = t.cluster1.service.Spec.SessionAffinityConfig
})

t.cluster1.ensureNoServiceExportActions()
It("should export the service in both clusters", func() {
t.awaitNonHeadlessServiceExported(&t.cluster1, &t.cluster2)
t.cluster1.ensureLastServiceExportCondition(newServiceExportReadyCondition(corev1.ConditionTrue, ""))
t.cluster1.ensureLastServiceExportCondition(newServiceExportValidCondition(corev1.ConditionTrue, ""))
t.cluster1.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict)
t.cluster2.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict)

By("Ensure conflict checking does not try to unnecessarily update the ServiceExport status")

t.cluster1.ensureNoServiceExportActions()
})
})

Context("with differing ports", func() {
Expand Down Expand Up @@ -492,6 +512,165 @@ func testClusterIPServiceInTwoClusters() {
})
})
})

Context("with differing service SessionAffinity", func() {
BeforeEach(func() {
t.cluster1.service.Spec.SessionAffinity = corev1.ServiceAffinityClientIP
t.aggregatedSessionAffinity = t.cluster1.service.Spec.SessionAffinity
})

It("should resolve the conflict and set the Conflict status condition on the conflicting cluster", func() {
t.awaitAggregatedServiceImport(mcsv1a1.ClusterSetIP, t.cluster1.service.Name, t.cluster1.service.Namespace,
&t.cluster1, &t.cluster2)

t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition(
controller.SessionAffinityConflictReason))
t.cluster1.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict)
})

Context("initially and after updating the SessionAffinity on the conflicting cluster to match", func() {
It("should clear the Conflict status condition on the conflicting cluster", func() {
t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition(
controller.SessionAffinityConflictReason))

By("Updating the SessionAffinity on the service")

t.cluster2.service.Spec.SessionAffinity = t.cluster1.service.Spec.SessionAffinity
t.cluster2.updateService()

t.cluster2.awaitServiceExportCondition(noConflictCondition)
})
})

Context("initially and after updating the SessionAffinity on the oldest exporting cluster to match", func() {
It("should clear the Conflict status condition on the conflicting cluster", func() {
t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition(
controller.SessionAffinityConflictReason))

By("Updating the SessionAffinity on the service")

t.cluster1.service.Spec.SessionAffinity = t.cluster2.service.Spec.SessionAffinity
t.cluster1.updateService()

t.aggregatedSessionAffinity = t.cluster1.service.Spec.SessionAffinity
t.awaitAggregatedServiceImport(mcsv1a1.ClusterSetIP, t.cluster1.service.Name, t.cluster1.service.Namespace,
&t.cluster1, &t.cluster2)

t.cluster2.awaitServiceExportCondition(noConflictCondition)
})
})

Context("initially and after the service on the oldest exporting cluster is unexported", func() {
It("should update the SessionAffinity on the aggregated ServiceImport and clear the Conflict status condition", func() {
t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition(
controller.SessionAffinityConflictReason))

By("Unexporting the service")

t.cluster1.deleteServiceExport()

t.aggregatedSessionAffinity = t.cluster2.service.Spec.SessionAffinity
t.awaitAggregatedServiceImport(mcsv1a1.ClusterSetIP, t.cluster1.service.Name, t.cluster1.service.Namespace, &t.cluster2)
t.cluster2.awaitServiceExportCondition(noConflictCondition)
})
})
})

Context("with differing service SessionAffinityConfig", func() {
BeforeEach(func() {
t.cluster1.service.Spec.SessionAffinity = corev1.ServiceAffinityClientIP
t.cluster2.service.Spec.SessionAffinity = corev1.ServiceAffinityClientIP
t.aggregatedSessionAffinity = t.cluster1.service.Spec.SessionAffinity

t.cluster1.service.Spec.SessionAffinityConfig = &corev1.SessionAffinityConfig{
ClientIP: &corev1.ClientIPConfig{TimeoutSeconds: ptr.To(int32(10))},
}
t.aggregatedSessionAffinityConfig = t.cluster1.service.Spec.SessionAffinityConfig
})

It("should resolve the conflict and set the Conflict status condition on the conflicting cluster", func() {
t.awaitAggregatedServiceImport(mcsv1a1.ClusterSetIP, t.cluster1.service.Name, t.cluster1.service.Namespace,
&t.cluster1, &t.cluster2)

t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition(
controller.SessionAffinityConfigConflictReason))
t.cluster1.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict)
})

Context("initially and after updating the SessionAffinityConfig on the conflicting cluster to match", func() {
It("should clear the Conflict status condition on the conflicting cluster", func() {
t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition(
controller.SessionAffinityConfigConflictReason))

By("Updating the SessionAffinityConfig on the service")

t.cluster2.service.Spec.SessionAffinityConfig = t.cluster1.service.Spec.SessionAffinityConfig
t.cluster2.updateService()

t.cluster2.awaitServiceExportCondition(noConflictCondition)
})
})

Context("initially and after updating the SessionAffinityConfig on the oldest exporting cluster to match", func() {
It("should clear the Conflict status condition on the conflicting cluster", func() {
t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition(
controller.SessionAffinityConfigConflictReason))

By("Updating the SessionAffinityConfig on the service")

t.cluster1.service.Spec.SessionAffinityConfig = t.cluster2.service.Spec.SessionAffinityConfig
t.cluster1.updateService()

t.aggregatedSessionAffinityConfig = t.cluster1.service.Spec.SessionAffinityConfig
t.awaitAggregatedServiceImport(mcsv1a1.ClusterSetIP, t.cluster1.service.Name, t.cluster1.service.Namespace,
&t.cluster1, &t.cluster2)

t.cluster2.awaitServiceExportCondition(noConflictCondition)
})
})

Context("initially and after the service on the oldest exporting cluster is unexported", func() {
BeforeEach(func() {
t.cluster2.service.Spec.SessionAffinityConfig = &corev1.SessionAffinityConfig{
ClientIP: &corev1.ClientIPConfig{TimeoutSeconds: ptr.To(int32(20))},
}
})

It("should update the SessionAffinity on the aggregated ServiceImport and clear the Conflict status condition", func() {
t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition(
controller.SessionAffinityConfigConflictReason))

By("Unexporting the service")

t.cluster1.deleteServiceExport()

t.aggregatedSessionAffinityConfig = t.cluster2.service.Spec.SessionAffinityConfig
t.awaitAggregatedServiceImport(mcsv1a1.ClusterSetIP, t.cluster1.service.Name, t.cluster1.service.Namespace, &t.cluster2)
t.cluster2.awaitServiceExportCondition(noConflictCondition)
})
})
})

Context("with differing service SessionAffinity and SessionAffinityConfig", func() {
BeforeEach(func() {
t.cluster1.service.Spec.SessionAffinity = corev1.ServiceAffinityClientIP
t.aggregatedSessionAffinity = t.cluster1.service.Spec.SessionAffinity

t.cluster1.service.Spec.SessionAffinityConfig = &corev1.SessionAffinityConfig{
ClientIP: &corev1.ClientIPConfig{TimeoutSeconds: ptr.To(int32(10))},
}
t.aggregatedSessionAffinityConfig = t.cluster1.service.Spec.SessionAffinityConfig
})

It("should resolve the conflicts and set the Conflict status condition on the conflicting cluster", func() {
t.awaitAggregatedServiceImport(mcsv1a1.ClusterSetIP, t.cluster1.service.Name, t.cluster1.service.Namespace,
&t.cluster1, &t.cluster2)

t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition(
fmt.Sprintf("%s,%s", controller.SessionAffinityConflictReason, controller.SessionAffinityConfigConflictReason)))
t.cluster1.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict)
})
})
}

func testClusterIPServiceWithMultipleEPS() {
Expand Down
60 changes: 34 additions & 26 deletions pkg/agent/controller/controller_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,16 +135,18 @@ type cluster struct {
}

type testDriver struct {
cluster1 cluster
cluster2 cluster
brokerServiceImportClient dynamic.NamespaceableResourceInterface
brokerEndpointSliceClient dynamic.ResourceInterface
brokerEndpointSliceReactor *fake.FailingReactor
stopCh chan struct{}
syncerConfig *broker.SyncerConfig
doStart bool
brokerServiceImportReactor *fake.FailingReactor
aggregatedServicePorts []mcsv1a1.ServicePort
cluster1 cluster
cluster2 cluster
brokerServiceImportClient dynamic.NamespaceableResourceInterface
brokerEndpointSliceClient dynamic.ResourceInterface
brokerEndpointSliceReactor *fake.FailingReactor
stopCh chan struct{}
syncerConfig *broker.SyncerConfig
doStart bool
brokerServiceImportReactor *fake.FailingReactor
aggregatedServicePorts []mcsv1a1.ServicePort
aggregatedSessionAffinity corev1.ServiceAffinity
aggregatedSessionAffinityConfig *corev1.SessionAffinityConfig
}

func newTestDiver() *testDriver {
Expand All @@ -163,7 +165,8 @@ func newTestDiver() *testDriver {
fake.AddBasicReactors(&brokerClient.Fake)

t := &testDriver{
aggregatedServicePorts: []mcsv1a1.ServicePort{port1, port2},
aggregatedServicePorts: []mcsv1a1.ServicePort{port1, port2},
aggregatedSessionAffinity: corev1.ServiceAffinityNone,
cluster1: cluster{
clusterID: clusterID1,
agentSpec: controller.AgentSpecification{
Expand Down Expand Up @@ -595,8 +598,7 @@ func (c *cluster) findLocalServiceImport() *mcsv1a1.ServiceImport {
for i := range list.Items {
if list.Items[i].GetLabels()[mcsv1a1.LabelServiceName] == c.service.Name &&
list.Items[i].GetLabels()[constants.LabelSourceNamespace] == c.service.Namespace {
serviceImport := &mcsv1a1.ServiceImport{}
Expect(scheme.Scheme.Convert(&list.Items[i], serviceImport, nil)).To(Succeed())
serviceImport := toServiceImport(&list.Items[i])

return serviceImport
}
Expand Down Expand Up @@ -645,8 +647,7 @@ func awaitServiceImport(client dynamic.NamespaceableResourceInterface, expected
return false, nil
}

serviceImport = &mcsv1a1.ServiceImport{}
Expect(scheme.Scheme.Convert(obj, serviceImport, nil)).To(Succeed())
serviceImport = toServiceImport(obj)

sortSlices(serviceImport)

Expand All @@ -667,6 +668,13 @@ func awaitServiceImport(client dynamic.NamespaceableResourceInterface, expected
Expect(serviceImport.Labels).To(BeEmpty())
}

func getServiceImport(client dynamic.NamespaceableResourceInterface, namespace, name string) *mcsv1a1.ServiceImport {
obj, err := client.Namespace(namespace).Get(context.TODO(), name, metav1.GetOptions{})
Expect(err).To(Succeed())

return toServiceImport(obj)
}

func findEndpointSlices(client dynamic.ResourceInterface, namespace, name, clusterID string) []*discovery.EndpointSlice {
list, err := client.List(context.TODO(), metav1.ListOptions{})
if resource.IsMissingNamespaceErr(err) {
Expand Down Expand Up @@ -777,9 +785,10 @@ func (t *testDriver) awaitAggregatedServiceImport(sType mcsv1a1.ServiceImportTyp
Namespace: test.RemoteNamespace,
},
Spec: mcsv1a1.ServiceImportSpec{
Type: sType,
Ports: []mcsv1a1.ServicePort{},
SessionAffinity: corev1.ServiceAffinityNone,
Type: sType,
Ports: []mcsv1a1.ServicePort{},
SessionAffinity: t.aggregatedSessionAffinity,
SessionAffinityConfig: t.aggregatedSessionAffinityConfig,
},
}

Expand All @@ -791,14 +800,6 @@ func (t *testDriver) awaitAggregatedServiceImport(sType mcsv1a1.ServiceImportTyp
for _, c := range clusters {
expServiceImport.Status.Clusters = append(expServiceImport.Status.Clusters,
mcsv1a1.ClusterStatus{Cluster: c.clusterID})

if c.service.Spec.SessionAffinity != corev1.ServiceAffinityNone {
expServiceImport.Spec.SessionAffinity = c.service.Spec.SessionAffinity
}

if c.service.Spec.SessionAffinityConfig != nil {
expServiceImport.Spec.SessionAffinityConfig = c.service.Spec.SessionAffinityConfig
}
}
}

Expand Down Expand Up @@ -938,6 +939,13 @@ func toServiceExport(obj interface{}) *mcsv1a1.ServiceExport {
return se
}

func toServiceImport(obj interface{}) *mcsv1a1.ServiceImport {
si := &mcsv1a1.ServiceImport{}
Expect(scheme.Scheme.Convert(obj, si, nil)).To(Succeed())

return si
}

func (t *testDriver) awaitNonHeadlessServiceExported(clusters ...*cluster) {
t.awaitServiceExported(mcsv1a1.ClusterSetIP, clusters...)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/controller/endpoint_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func (c *EndpointSliceController) checkForConflicts(_, name, namespace string) (
fmt.Sprintf("The service ports conflict between the constituent clusters %s. "+
"The service will expose the intersection of all the ports: %s",
fmt.Sprintf("[%s]", strings.Join(clusterNames, ", ")), servicePortsToString(intersectedServicePorts))))
} else if FindServiceExportStatusCondition(localServiceExport.Status.Conditions, mcsv1a1.ServiceExportConflict) != nil {
} else if c.serviceExportClient.hasCondition(name, namespace, mcsv1a1.ServiceExportConflict, PortConflictReason) {
c.serviceExportClient.UpdateStatusConditions(ctx, name, namespace, newServiceExportCondition(
mcsv1a1.ServiceExportConflict, corev1.ConditionFalse, PortConflictReason, ""))
}
Expand Down
Loading
Loading