From 2b9b790f51d08928a655025831e40d9a6debc83f Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Tue, 3 Sep 2024 13:04:04 -0400 Subject: [PATCH] Fix intermittent UT failure re: recreating namespace ...due to a timing issue with updates with the fake client when the cache isn't fully synced initially. Signed-off-by: Tom Pantelis --- pkg/syncer/resource_syncer.go | 16 ++++++++-------- pkg/syncer/resource_syncer_test.go | 10 ++++++---- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/pkg/syncer/resource_syncer.go b/pkg/syncer/resource_syncer.go index 72cf929a..d6d5508c 100644 --- a/pkg/syncer/resource_syncer.go +++ b/pkg/syncer/resource_syncer.go @@ -194,7 +194,7 @@ type ResourceSyncerConfig struct { type resourceSyncer struct { workQueue workqueue.Interface - hasSynced func() bool + cachesSynced []cache.InformerSynced unregHandler func() informer cache.Controller store cache.Store @@ -243,7 +243,7 @@ func NewResourceSyncer(config *ResourceSyncerConfig) (Interface, error) { Transform: resourceUtil.TrimManagedFields, }) - syncer.hasSynced = syncer.informer.HasSynced + syncer.cachesSynced = append(syncer.cachesSynced, syncer.informer.HasSynced) return syncer, nil } @@ -265,7 +265,7 @@ func NewResourceSyncerWithSharedInformer(config *ResourceSyncerConfig, informer return nil, errors.Wrapf(err, "error registering event handler") } - syncer.hasSynced = reg.HasSynced + syncer.cachesSynced = append(syncer.cachesSynced, reg.HasSynced) syncer.unregHandler = func() { _ = informer.RemoveEventHandler(reg) @@ -315,7 +315,7 @@ func newResourceSyncer(config *ResourceSyncerConfig) (*resourceSyncer, error) { syncer.workQueue = workqueue.New(config.Name) if config.NamespaceInformer != nil { - _, err := config.NamespaceInformer.AddEventHandler(cache.ResourceEventHandlerDetailedFuncs{ + reg, err := config.NamespaceInformer.AddEventHandler(cache.ResourceEventHandlerDetailedFuncs{ AddFunc: func(obj interface{}, _ bool) { syncer.workQueue.Enqueue(cache.ExplicitKey(cache.NewObjectName(namespaceAddedKey, resourceUtil.MustToMeta(obj).GetName()).String())) }, @@ -329,6 +329,8 @@ func newResourceSyncer(config *ResourceSyncerConfig) (*resourceSyncer, error) { if err != nil { return nil, errors.Wrapf(err, "error registering namespace handler") } + + syncer.cachesSynced = append(syncer.cachesSynced, reg.HasSynced) } return syncer, nil @@ -387,7 +389,7 @@ func (r *resourceSyncer) Start(stopCh <-chan struct{}) error { if *r.config.WaitForCacheSync { r.log.V(log.LIBDEBUG).Infof("Syncer %q waiting for informer cache to sync", r.config.Name) - _ = cache.WaitForCacheSync(stopCh, r.hasSynced) + _ = cache.WaitForCacheSync(stopCh, r.cachesSynced...) } r.workQueue.Run(stopCh, r.processNextWorkItem) @@ -516,7 +518,7 @@ func (r *resourceSyncer) doReconcile(resourceLister func() []runtime.Object) { } func (r *resourceSyncer) runIfCacheSynced(defaultReturn any, run func() any) any { - if ok := cache.WaitForCacheSync(r.stopCh, r.hasSynced); !ok { + if ok := cache.WaitForCacheSync(r.stopCh, r.cachesSynced...); !ok { // This means the cache was stopped. r.log.Warningf("Syncer %q failed to wait for informer cache to sync", r.config.Name) @@ -837,8 +839,6 @@ func (r *resourceSyncer) handleNamespaceAdded(namespace string) { ns, name, _ := cache.SplitMetaNamespaceKey(k) r.RequeueResource(name, ns) } - - delete(r.missingNamespaces, namespace) } } diff --git a/pkg/syncer/resource_syncer_test.go b/pkg/syncer/resource_syncer_test.go index 04676162..215ef3d3 100644 --- a/pkg/syncer/resource_syncer_test.go +++ b/pkg/syncer/resource_syncer_test.go @@ -1283,10 +1283,6 @@ func testWithMissingNamespace() { createNamespace(test.LocalNamespace) fakereactor.AddVerifyNamespaceReactor(&d.config.SourceClient.(*fakeClient.FakeDynamicClient).Fake, "pods") - - if d.config.NamespaceInformer != nil { - go d.config.NamespaceInformer.Run(d.stopCh) - } }) Specify("distribute should eventually succeed when the namespace is created", func() { @@ -1500,6 +1496,12 @@ func newTestDriver(sourceNamespace, localClusterID string, syncDirection syncer. Expect(err).To(Succeed()) + if d.config.NamespaceInformer != nil { + go func() { + d.config.NamespaceInformer.Run(d.stopCh) + }() + } + utilruntime.ErrorHandlers = append(utilruntime.ErrorHandlers, func(_ context.Context, err error, _ string, _ ...interface{}) { d.handledError <- err