diff --git a/go.mod b/go.mod index ea7a6b77..88546d30 100644 --- a/go.mod +++ b/go.mod @@ -8,14 +8,14 @@ require ( github.com/onsi/gomega v1.33.0 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.19.0 - github.com/submariner-io/admiral v0.18.0-m3 + github.com/submariner-io/admiral v0.18.0-m3.0.20240514134824-b20caee1a104 github.com/submariner-io/shipyard v0.18.0-m3 k8s.io/api v0.30.0 k8s.io/apimachinery v0.30.0 k8s.io/client-go v0.30.0 k8s.io/klog/v2 v2.120.1 k8s.io/utils v0.0.0-20230726121419-3b25d923346b - sigs.k8s.io/controller-runtime v0.18.0 + sigs.k8s.io/controller-runtime v0.18.1 sigs.k8s.io/mcs-api v0.1.0 ) diff --git a/go.sum b/go.sum index 8f3941d5..a1fd070c 100644 --- a/go.sum +++ b/go.sum @@ -386,8 +386,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= -github.com/submariner-io/admiral v0.18.0-m3 h1:L87Jwb4oYAHbfuHxJ0BlKucZT5lP/OdyAuS18NenS6Q= -github.com/submariner-io/admiral v0.18.0-m3/go.mod h1:vkFQs9wWq1HY4k3JesxlLfKZTaA/llOkbdCDZCdk3yc= +github.com/submariner-io/admiral v0.18.0-m3.0.20240514134824-b20caee1a104 h1:1d22eI0EdGEzH2KeCOE3/Hp+1AVyK8Suz+HWyl1fRS8= +github.com/submariner-io/admiral v0.18.0-m3.0.20240514134824-b20caee1a104/go.mod h1:hQtSx5shxVotFbHEC6oQ7+qRKIdSHrXTE1IVNKkI7II= github.com/submariner-io/shipyard v0.18.0-m3 h1:N0/BAwTv5p6O7PgvQeouUzcgybJtq7QQ66gjos7+F48= github.com/submariner-io/shipyard v0.18.0-m3/go.mod h1:qs1LOCrPfM6H3JzR8TWNXFW4hvBiY+8gJ6OOjF4o4E0= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= @@ -632,8 +632,8 @@ k8s.io/utils v0.0.0-20230726121419-3b25d923346b h1:sgn3ZU783SCgtaSJjpcVVlRqd6GSn k8s.io/utils v0.0.0-20230726121419-3b25d923346b/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.7/go.mod h1:PHgbrJT7lCHcxMU+mDHEm+nx46H4zuuHZkDP6icnhu0= sigs.k8s.io/controller-runtime v0.6.1/go.mod h1:XRYBPdbf5XJu9kpS84VJiZ7h/u1hF3gEORz0efEja7A= -sigs.k8s.io/controller-runtime v0.18.0 h1:Z7jKuX784TQSUL1TIyeuF7j8KXZ4RtSX0YgtjKcSTME= -sigs.k8s.io/controller-runtime v0.18.0/go.mod h1:tuAt1+wbVsXIT8lPtk5RURxqAnq7xkpv2Mhttslg7Hw= +sigs.k8s.io/controller-runtime v0.18.1 h1:RpWbigmuiylbxOCLy0tGnq1cU1qWPwNIQzoJk+QeJx4= +sigs.k8s.io/controller-runtime v0.18.1/go.mod h1:tuAt1+wbVsXIT8lPtk5RURxqAnq7xkpv2Mhttslg7Hw= sigs.k8s.io/controller-tools v0.3.0/go.mod h1:enhtKGfxZD1GFEoMgP8Fdbu+uKQ/cq1/WGJhdVChfvI= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= diff --git a/pkg/agent/controller/agent.go b/pkg/agent/controller/agent.go index 78c373da..60bafd87 100644 --- a/pkg/agent/controller/agent.go +++ b/pkg/agent/controller/agent.go @@ -108,6 +108,17 @@ func New(spec *AgentSpecification, syncerConf broker.SyncerConfig, syncerMetricN return nil, errors.Wrap(err, "error creating Service syncer") } + syncerConf.NamespaceInformer, err = syncer.NewSharedInformer(&syncer.ResourceSyncerConfig{ + SourceClient: syncerConf.LocalClient, + RestMapper: syncerConf.RestMapper, + ResourceType: &corev1.Namespace{}, + }) + if err != nil { + return nil, errors.Wrap(err, "error creating namespace informer") + } + + agentController.namespaceInformer = syncerConf.NamespaceInformer + agentController.serviceExportClient = &ServiceExportClient{ NamespaceableResourceInterface: syncerConf.LocalClient.Resource(*gvr), converter: converter{scheme: syncerConf.Scheme}, @@ -139,6 +150,10 @@ func (a *Controller) Start(stopCh <-chan struct{}) error { // Start the informer factories to begin populating the informer caches logger.Info("Starting Agent controller") + go func() { + a.namespaceInformer.Run(stopCh) + }() + if err := a.serviceExportSyncer.Start(stopCh); err != nil { return errors.Wrap(err, "error starting ServiceExport syncer") } diff --git a/pkg/agent/controller/clusterip_service_test.go b/pkg/agent/controller/clusterip_service_test.go index 8e29d493..1fc9932a 100644 --- a/pkg/agent/controller/clusterip_service_test.go +++ b/pkg/agent/controller/clusterip_service_test.go @@ -23,6 +23,7 @@ import ( "strconv" . "github.com/onsi/ginkgo/v2" + "github.com/submariner-io/admiral/pkg/fake" "github.com/submariner-io/admiral/pkg/resource" "github.com/submariner-io/admiral/pkg/syncer/test" testutil "github.com/submariner-io/admiral/pkg/test" @@ -288,6 +289,48 @@ func testClusterIPServiceInOneCluster() { test.RemoteNamespace)), "other-eps") }) + When("the namespace of an exported service does not initially exist on an importing cluster", func() { + BeforeEach(func() { + fake.AddVerifyNamespaceReactor(&t.cluster2.localDynClient.Fake, "serviceimports", "endpointslices") + }) + + JustBeforeEach(func() { + t.cluster1.createService() + t.cluster1.createServiceExport() + }) + + It("should eventually import the service when the namespace is created", func() { + expServiceImport := &mcsv1a1.ServiceImport{ + ObjectMeta: metav1.ObjectMeta{ + Name: t.cluster1.service.Name, + Namespace: t.cluster1.service.Namespace, + }, + Spec: mcsv1a1.ServiceImportSpec{ + Type: mcsv1a1.ClusterSetIP, + Ports: t.aggregatedServicePorts, + }, + Status: mcsv1a1.ServiceImportStatus{ + Clusters: []mcsv1a1.ClusterStatus{{Cluster: t.cluster1.clusterID}}, + }, + } + + awaitServiceImport(t.cluster1.localServiceImportClient, expServiceImport) + + testutil.EnsureNoResource(resource.ForDynamic(t.cluster2.localServiceImportClient.Namespace( + t.cluster1.service.Namespace)), t.cluster1.service.Name) + t.cluster2.ensureNoEndpointSlice() + + By("Creating namespace on importing cluster") + + test.CreateResource(t.cluster2.localDynClient.Resource(corev1.SchemeGroupVersion.WithResource("namespaces")), + &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: t.cluster1.service.Namespace}, + }) + + t.awaitNonHeadlessServiceExported(&t.cluster1) + }) + }) + When("an existing ServiceExport has the legacy Synced status condition", func() { BeforeEach(func() { t.cluster1.serviceExport.Status.Conditions = []mcsv1a1.ServiceExportCondition{ diff --git a/pkg/agent/controller/controller_suite_test.go b/pkg/agent/controller/controller_suite_test.go index 1f13de84..93bdbe65 100644 --- a/pkg/agent/controller/controller_suite_test.go +++ b/pkg/agent/controller/controller_suite_test.go @@ -109,8 +109,6 @@ func init() { if err != nil { panic(err) } - - controller.BrokerResyncPeriod = time.Millisecond * 100 } func TestController(t *testing.T) { @@ -293,7 +291,7 @@ func newTestDiver() *testDriver { syncerConfig: &broker.SyncerConfig{ BrokerNamespace: test.RemoteNamespace, RestMapper: test.GetRESTMapperFor(&mcsv1a1.ServiceExport{}, &mcsv1a1.ServiceImport{}, &corev1.Service{}, - &corev1.Endpoints{}, &discovery.EndpointSlice{}, controller.GetGlobalIngressIPObj()), + &corev1.Endpoints{}, &corev1.Namespace{}, &discovery.EndpointSlice{}, controller.GetGlobalIngressIPObj()), BrokerClient: brokerClient, Scheme: syncerScheme, }, diff --git a/pkg/agent/controller/service_import.go b/pkg/agent/controller/service_import.go index afafcfed..ce639e70 100644 --- a/pkg/agent/controller/service_import.go +++ b/pkg/agent/controller/service_import.go @@ -90,16 +90,17 @@ func newServiceImportController(spec *AgentSpecification, syncerMetricNames Agen } controller.remoteSyncer, err = syncer.NewResourceSyncer(&syncer.ResourceSyncerConfig{ - Name: "Remote ServiceImport", - SourceClient: brokerClient, - SourceNamespace: brokerNamespace, - RestMapper: syncerConfig.RestMapper, - Federator: federate.NewCreateOrUpdateFederator(syncerConfig.LocalClient, syncerConfig.RestMapper, corev1.NamespaceAll, ""), - ResourceType: &mcsv1a1.ServiceImport{}, - Transform: controller.onRemoteServiceImport, - OnSuccessfulSync: controller.serviceImportMigrator.onSuccessfulSyncFromBroker, - Scheme: syncerConfig.Scheme, - ResyncPeriod: BrokerResyncPeriod, + Name: "Remote ServiceImport", + SourceClient: brokerClient, + SourceNamespace: brokerNamespace, + RestMapper: syncerConfig.RestMapper, + Federator: federate.NewCreateOrUpdateFederator(syncerConfig.LocalClient, syncerConfig.RestMapper, corev1.NamespaceAll, ""), + ResourceType: &mcsv1a1.ServiceImport{}, + Transform: controller.onRemoteServiceImport, + OnSuccessfulSync: controller.serviceImportMigrator.onSuccessfulSyncFromBroker, + Scheme: syncerConfig.Scheme, + ResyncPeriod: BrokerResyncPeriod, + NamespaceInformer: syncerConfig.NamespaceInformer, SyncCounterOpts: &prometheus.GaugeOpts{ Name: syncerMetricNames.ServiceImportCounterName, Help: "Count of imported services", diff --git a/pkg/agent/controller/types.go b/pkg/agent/controller/types.go index 176e9ad1..9e261144 100644 --- a/pkg/agent/controller/types.go +++ b/pkg/agent/controller/types.go @@ -32,6 +32,7 @@ import ( k8slabels "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/dynamic" + "k8s.io/client-go/tools/cache" mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" ) @@ -59,6 +60,7 @@ type Controller struct { serviceSyncer syncer.Interface serviceImportController *ServiceImportController localServiceImportFederator federate.Federator + namespaceInformer cache.SharedInformer } type AgentSpecification struct {