From f4e54c2dcbbbd9edb0222a547e9dd8622fd8716a Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Tue, 24 Sep 2024 14:21:06 -0400 Subject: [PATCH] Handle conflicting service ports when merging per MCS spec The spec states that if the properties of service ports between clusters don't match, the clusterset service should expose the union of the ports. We're doing this however we're not properly handle conflicts as per the spec. If multiple clusters have a matching port by name but the other properties don't match, it should be considered a conflict and the conflict resolution policy should be applied, ie the port from the cluster with the oldest export timestamp should be used. When merging, we need to sort the ports by cluster timestamp and use just the port name when computing the union rather than all the port properties as we did before. Also when checking for conflicts, we should also check the AppProtocol property. Signed-off-by: Tom Pantelis --- coredns/resolver/service_info.go | 3 +- pkg/agent/controller/agent.go | 7 +- .../controller/clusterip_service_test.go | 16 +++++ pkg/agent/controller/endpoint_slice.go | 8 ++- pkg/agent/controller/service_import.go | 25 ++------ .../controller/service_import_aggregator.go | 64 +++++++++++++++++-- 6 files changed, 91 insertions(+), 32 deletions(-) diff --git a/coredns/resolver/service_info.go b/coredns/resolver/service_info.go index abdea075..f3c8c985 100644 --- a/coredns/resolver/service_info.go +++ b/coredns/resolver/service_info.go @@ -22,6 +22,7 @@ import ( "fmt" "github.com/submariner-io/admiral/pkg/slices" + "k8s.io/utils/ptr" mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" ) @@ -44,7 +45,7 @@ func (si *serviceInfo) mergePorts() { si.ports = info.endpointRecords[0].Ports } else { si.ports = slices.Intersect(si.ports, info.endpointRecords[0].Ports, func(p mcsv1a1.ServicePort) string { - return fmt.Sprintf("%s%s%d", p.Name, p.Protocol, p.Port) + return fmt.Sprintf("%s:%s:%d:%s", p.Name, p.Protocol, p.Port, ptr.Deref(p.AppProtocol, "")) }) } } diff --git a/pkg/agent/controller/agent.go b/pkg/agent/controller/agent.go index d00f0509..43e6ced8 100644 --- a/pkg/agent/controller/agent.go +++ b/pkg/agent/controller/agent.go @@ -41,6 +41,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" validations "k8s.io/apimachinery/pkg/util/validation" + "k8s.io/utils/ptr" logf "sigs.k8s.io/controller-runtime/pkg/log" mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" ) @@ -479,9 +480,9 @@ func (c converter) toServicePorts(from []discovery.EndpointPort) []mcsv1a1.Servi to := make([]mcsv1a1.ServicePort, len(from)) for i := range from { to[i] = mcsv1a1.ServicePort{ - Name: *from[i].Name, - Protocol: *from[i].Protocol, - Port: *from[i].Port, + Name: ptr.Deref(from[i].Name, ""), + Protocol: ptr.Deref(from[i].Protocol, ""), + Port: ptr.Deref(from[i].Port, 0), AppProtocol: from[i].AppProtocol, } } diff --git a/pkg/agent/controller/clusterip_service_test.go b/pkg/agent/controller/clusterip_service_test.go index 0fe57c08..a16bb460 100644 --- a/pkg/agent/controller/clusterip_service_test.go +++ b/pkg/agent/controller/clusterip_service_test.go @@ -572,6 +572,22 @@ func testClusterIPServiceInTwoClusters() { }) }) + Context("with conflicting ports", func() { + BeforeEach(func() { + t.cluster2.service.Spec.Ports = []corev1.ServicePort{t.cluster1.service.Spec.Ports[0], toServicePort(port3)} + t.cluster2.service.Spec.Ports[0].Port++ + t.aggregatedServicePorts = []mcsv1a1.ServicePort{port1, port2, port3} + }) + + It("should correctly set the ports in the aggregated ServiceImport and set the Conflict status condition", func() { + t.awaitNonHeadlessServiceExported(&t.cluster1, &t.cluster2) + + condition := newServiceExportConflictCondition(controller.PortConflictReason) + t.cluster1.awaitServiceExportCondition(condition) + t.cluster2.awaitServiceExportCondition(condition) + }) + }) + Context("with differing service types", func() { BeforeEach(func() { t.cluster2.service.Spec.ClusterIP = corev1.ClusterIPNone diff --git a/pkg/agent/controller/endpoint_slice.go b/pkg/agent/controller/endpoint_slice.go index 564c92b2..761aad9f 100644 --- a/pkg/agent/controller/endpoint_slice.go +++ b/pkg/agent/controller/endpoint_slice.go @@ -39,6 +39,7 @@ import ( k8slabels "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/utils/ptr" "k8s.io/utils/set" mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" ) @@ -243,6 +244,10 @@ func (c *EndpointSliceController) checkForConflicts(_, name, namespace string) ( mcsv1a1.LabelServiceName: name, })) + servicePortKey := func(p mcsv1a1.ServicePort) string { + return fmt.Sprintf("%s:%s:%d:%s", p.Name, p.Protocol, p.Port, ptr.Deref(p.AppProtocol, "")) + } + var prevServicePorts []mcsv1a1.ServicePort var intersectedServicePorts []mcsv1a1.ServicePort clusterNames := set.New[string]() @@ -321,7 +326,8 @@ func (c *EndpointSliceController) enqueueForConflictCheck(ctx context.Context, e func servicePortsToString(p []mcsv1a1.ServicePort) string { s := make([]string, len(p)) for i := range p { - s[i] = fmt.Sprintf("[name: %s, protocol: %s, port: %v]", p[i].Name, p[i].Protocol, p[i].Port) + s[i] = fmt.Sprintf("[name: %s, protocol: %s, port: %v, appProtocol: %q]", p[i].Name, p[i].Protocol, p[i].Port, + ptr.Deref(p[i].AppProtocol, "")) } return strings.Join(s, ", ") diff --git a/pkg/agent/controller/service_import.go b/pkg/agent/controller/service_import.go index e54018f7..c988ccdf 100644 --- a/pkg/agent/controller/service_import.go +++ b/pkg/agent/controller/service_import.go @@ -25,7 +25,6 @@ import ( "net" "reflect" "strconv" - "strings" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -699,28 +698,12 @@ func (c *ServiceImportController) localServiceImportLister(transform func(si *mc } func findClusterWithOldestTimestamp(from map[string]string) string { - oldest := int64(math.MaxInt64) - foundCluster := "" - - for k, v := range from { - cluster, found := strings.CutPrefix(k, timestampAnnotationPrefix) - if !found { - continue - } - - t, err := strconv.ParseInt(v, 10, 64) - if err != nil { - logger.Warningf("Invalid timestamp annotation value %q for cluster %q", v, cluster) - continue - } - - if t < oldest || (t == oldest && cluster < foundCluster) { - foundCluster = cluster - oldest = t - } + names := getClusterNamesOrderedByTimestamp(from) + if len(names) > 0 { + return names[0] } - return foundCluster + return "" } func toSessionAffinityConfigString(c *corev1.SessionAffinityConfig) string { diff --git a/pkg/agent/controller/service_import_aggregator.go b/pkg/agent/controller/service_import_aggregator.go index afb34b19..76d68339 100644 --- a/pkg/agent/controller/service_import_aggregator.go +++ b/pkg/agent/controller/service_import_aggregator.go @@ -20,7 +20,9 @@ package controller import ( "context" - "fmt" + goslices "slices" + "strconv" + "strings" "github.com/pkg/errors" "github.com/submariner-io/admiral/pkg/log" @@ -76,14 +78,25 @@ func (a *ServiceImportAggregator) setServicePorts(ctx context.Context, si *mcsv1 serviceNamespace, serviceName) } - si.Spec.Ports = make([]mcsv1a1.ServicePort, 0) + portsByCluster := map[string][]mcsv1a1.ServicePort{} for i := range list.Items { eps := a.converter.toEndpointSlice(&list.Items[i]) - si.Spec.Ports = slices.Union(si.Spec.Ports, a.converter.toServicePorts(eps.Ports), servicePortKey) + portsByCluster[eps.Labels[constants.MCSLabelSourceCluster]] = a.converter.toServicePorts(eps.Ports) + } + + // Sort the clusters by their ServiceExport timestamps stored in the ServiceImport annotations so conflicting ports are + // resolved by taking the oldest as per the MCS spec's conflict resolution policy. + + si.Spec.Ports = make([]mcsv1a1.ServicePort, 0) + for _, clusterName := range getClusterNamesOrderedByTimestamp(si.Annotations) { + ports := portsByCluster[clusterName] + si.Spec.Ports = slices.Union(si.Spec.Ports, ports, func(p mcsv1a1.ServicePort) string { + return p.Name + }) } - logger.V(log.DEBUG).Infof("Calculated ports for aggregated ServiceImport %q: %#v", si.Name, si.Spec.Ports) + logger.V(log.DEBUG).Infof("Calculated ports for aggregated ServiceImport %q: %s", si.Name, servicePortsToString(si.Spec.Ports)) return nil } @@ -152,6 +165,45 @@ func clusterStatusKey(c mcsv1a1.ClusterStatus) string { return c.Cluster } -func servicePortKey(p mcsv1a1.ServicePort) string { - return fmt.Sprintf("%s%s%d", p.Name, p.Protocol, p.Port) +type clusterSortInfo struct { + name string + timestamp int64 +} + +func getClusterNamesOrderedByTimestamp(from map[string]string) []string { + info := make([]clusterSortInfo, 0, len(from)) + + for k, v := range from { + cluster, found := strings.CutPrefix(k, timestampAnnotationPrefix) + if !found { + continue + } + + t, err := strconv.ParseInt(v, 10, 64) + if err != nil { + logger.Warningf("Invalid timestamp annotation value %q for cluster %q", v, cluster) + continue + } + + info = append(info, clusterSortInfo{name: cluster, timestamp: t}) + } + + goslices.SortFunc(info, func(a, b clusterSortInfo) int { + if a.timestamp == b.timestamp { + return strings.Compare(a.name, b.name) + } + + if a.timestamp < b.timestamp { + return -1 + } + + return 1 + }) + + sortedNames := make([]string, len(info)) + for i := range info { + sortedNames[i] = info[i].name + } + + return sortedNames }