diff --git a/coredns/resolver/service_info.go b/coredns/resolver/service_info.go index abdea075..f3c8c985 100644 --- a/coredns/resolver/service_info.go +++ b/coredns/resolver/service_info.go @@ -22,6 +22,7 @@ import ( "fmt" "github.com/submariner-io/admiral/pkg/slices" + "k8s.io/utils/ptr" mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" ) @@ -44,7 +45,7 @@ func (si *serviceInfo) mergePorts() { si.ports = info.endpointRecords[0].Ports } else { si.ports = slices.Intersect(si.ports, info.endpointRecords[0].Ports, func(p mcsv1a1.ServicePort) string { - return fmt.Sprintf("%s%s%d", p.Name, p.Protocol, p.Port) + return fmt.Sprintf("%s:%s:%d:%s", p.Name, p.Protocol, p.Port, ptr.Deref(p.AppProtocol, "")) }) } } diff --git a/pkg/agent/controller/agent.go b/pkg/agent/controller/agent.go index d00f0509..43e6ced8 100644 --- a/pkg/agent/controller/agent.go +++ b/pkg/agent/controller/agent.go @@ -41,6 +41,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" validations "k8s.io/apimachinery/pkg/util/validation" + "k8s.io/utils/ptr" logf "sigs.k8s.io/controller-runtime/pkg/log" mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" ) @@ -479,9 +480,9 @@ func (c converter) toServicePorts(from []discovery.EndpointPort) []mcsv1a1.Servi to := make([]mcsv1a1.ServicePort, len(from)) for i := range from { to[i] = mcsv1a1.ServicePort{ - Name: *from[i].Name, - Protocol: *from[i].Protocol, - Port: *from[i].Port, + Name: ptr.Deref(from[i].Name, ""), + Protocol: ptr.Deref(from[i].Protocol, ""), + Port: ptr.Deref(from[i].Port, 0), AppProtocol: from[i].AppProtocol, } } diff --git a/pkg/agent/controller/clusterip_service_test.go b/pkg/agent/controller/clusterip_service_test.go index 0fe57c08..a16bb460 100644 --- a/pkg/agent/controller/clusterip_service_test.go +++ b/pkg/agent/controller/clusterip_service_test.go @@ -572,6 +572,22 @@ func testClusterIPServiceInTwoClusters() { }) }) + Context("with conflicting ports", func() { + BeforeEach(func() { + t.cluster2.service.Spec.Ports = []corev1.ServicePort{t.cluster1.service.Spec.Ports[0], toServicePort(port3)} + t.cluster2.service.Spec.Ports[0].Port++ + t.aggregatedServicePorts = []mcsv1a1.ServicePort{port1, port2, port3} + }) + + It("should correctly set the ports in the aggregated ServiceImport and set the Conflict status condition", func() { + t.awaitNonHeadlessServiceExported(&t.cluster1, &t.cluster2) + + condition := newServiceExportConflictCondition(controller.PortConflictReason) + t.cluster1.awaitServiceExportCondition(condition) + t.cluster2.awaitServiceExportCondition(condition) + }) + }) + Context("with differing service types", func() { BeforeEach(func() { t.cluster2.service.Spec.ClusterIP = corev1.ClusterIPNone diff --git a/pkg/agent/controller/endpoint_slice.go b/pkg/agent/controller/endpoint_slice.go index 564c92b2..761aad9f 100644 --- a/pkg/agent/controller/endpoint_slice.go +++ b/pkg/agent/controller/endpoint_slice.go @@ -39,6 +39,7 @@ import ( k8slabels "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/utils/ptr" "k8s.io/utils/set" mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" ) @@ -243,6 +244,10 @@ func (c *EndpointSliceController) checkForConflicts(_, name, namespace string) ( mcsv1a1.LabelServiceName: name, })) + servicePortKey := func(p mcsv1a1.ServicePort) string { + return fmt.Sprintf("%s:%s:%d:%s", p.Name, p.Protocol, p.Port, ptr.Deref(p.AppProtocol, "")) + } + var prevServicePorts []mcsv1a1.ServicePort var intersectedServicePorts []mcsv1a1.ServicePort clusterNames := set.New[string]() @@ -321,7 +326,8 @@ func (c *EndpointSliceController) enqueueForConflictCheck(ctx context.Context, e func servicePortsToString(p []mcsv1a1.ServicePort) string { s := make([]string, len(p)) for i := range p { - s[i] = fmt.Sprintf("[name: %s, protocol: %s, port: %v]", p[i].Name, p[i].Protocol, p[i].Port) + s[i] = fmt.Sprintf("[name: %s, protocol: %s, port: %v, appProtocol: %q]", p[i].Name, p[i].Protocol, p[i].Port, + ptr.Deref(p[i].AppProtocol, "")) } return strings.Join(s, ", ") diff --git a/pkg/agent/controller/service_import.go b/pkg/agent/controller/service_import.go index e54018f7..c988ccdf 100644 --- a/pkg/agent/controller/service_import.go +++ b/pkg/agent/controller/service_import.go @@ -25,7 +25,6 @@ import ( "net" "reflect" "strconv" - "strings" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -699,28 +698,12 @@ func (c *ServiceImportController) localServiceImportLister(transform func(si *mc } func findClusterWithOldestTimestamp(from map[string]string) string { - oldest := int64(math.MaxInt64) - foundCluster := "" - - for k, v := range from { - cluster, found := strings.CutPrefix(k, timestampAnnotationPrefix) - if !found { - continue - } - - t, err := strconv.ParseInt(v, 10, 64) - if err != nil { - logger.Warningf("Invalid timestamp annotation value %q for cluster %q", v, cluster) - continue - } - - if t < oldest || (t == oldest && cluster < foundCluster) { - foundCluster = cluster - oldest = t - } + names := getClusterNamesOrderedByTimestamp(from) + if len(names) > 0 { + return names[0] } - return foundCluster + return "" } func toSessionAffinityConfigString(c *corev1.SessionAffinityConfig) string { diff --git a/pkg/agent/controller/service_import_aggregator.go b/pkg/agent/controller/service_import_aggregator.go index afb34b19..76d68339 100644 --- a/pkg/agent/controller/service_import_aggregator.go +++ b/pkg/agent/controller/service_import_aggregator.go @@ -20,7 +20,9 @@ package controller import ( "context" - "fmt" + goslices "slices" + "strconv" + "strings" "github.com/pkg/errors" "github.com/submariner-io/admiral/pkg/log" @@ -76,14 +78,25 @@ func (a *ServiceImportAggregator) setServicePorts(ctx context.Context, si *mcsv1 serviceNamespace, serviceName) } - si.Spec.Ports = make([]mcsv1a1.ServicePort, 0) + portsByCluster := map[string][]mcsv1a1.ServicePort{} for i := range list.Items { eps := a.converter.toEndpointSlice(&list.Items[i]) - si.Spec.Ports = slices.Union(si.Spec.Ports, a.converter.toServicePorts(eps.Ports), servicePortKey) + portsByCluster[eps.Labels[constants.MCSLabelSourceCluster]] = a.converter.toServicePorts(eps.Ports) + } + + // Sort the clusters by their ServiceExport timestamps stored in the ServiceImport annotations so conflicting ports are + // resolved by taking the oldest as per the MCS spec's conflict resolution policy. + + si.Spec.Ports = make([]mcsv1a1.ServicePort, 0) + for _, clusterName := range getClusterNamesOrderedByTimestamp(si.Annotations) { + ports := portsByCluster[clusterName] + si.Spec.Ports = slices.Union(si.Spec.Ports, ports, func(p mcsv1a1.ServicePort) string { + return p.Name + }) } - logger.V(log.DEBUG).Infof("Calculated ports for aggregated ServiceImport %q: %#v", si.Name, si.Spec.Ports) + logger.V(log.DEBUG).Infof("Calculated ports for aggregated ServiceImport %q: %s", si.Name, servicePortsToString(si.Spec.Ports)) return nil } @@ -152,6 +165,45 @@ func clusterStatusKey(c mcsv1a1.ClusterStatus) string { return c.Cluster } -func servicePortKey(p mcsv1a1.ServicePort) string { - return fmt.Sprintf("%s%s%d", p.Name, p.Protocol, p.Port) +type clusterSortInfo struct { + name string + timestamp int64 +} + +func getClusterNamesOrderedByTimestamp(from map[string]string) []string { + info := make([]clusterSortInfo, 0, len(from)) + + for k, v := range from { + cluster, found := strings.CutPrefix(k, timestampAnnotationPrefix) + if !found { + continue + } + + t, err := strconv.ParseInt(v, 10, 64) + if err != nil { + logger.Warningf("Invalid timestamp annotation value %q for cluster %q", v, cluster) + continue + } + + info = append(info, clusterSortInfo{name: cluster, timestamp: t}) + } + + goslices.SortFunc(info, func(a, b clusterSortInfo) int { + if a.timestamp == b.timestamp { + return strings.Compare(a.name, b.name) + } + + if a.timestamp < b.timestamp { + return -1 + } + + return 1 + }) + + sortedNames := make([]string, len(info)) + for i := range info { + sortedNames[i] = info[i].name + } + + return sortedNames }