From 2a50e1e193df5530a227a22b24bbfad32165377d Mon Sep 17 00:00:00 2001 From: alex <8968914+acpana@users.noreply.github.com> Date: Mon, 18 Sep 2023 10:53:23 -0700 Subject: [PATCH] feat: enhance replay (#2984) Signed-off-by: Alex Pana <8968914+acpana@users.noreply.github.com> Signed-off-by: alex <8968914+acpana@users.noreply.github.com> Co-authored-by: Rita Zhang --- pkg/cachemanager/cachemanager.go | 43 +++++++++++++++++++++++++++----- 1 file changed, 37 insertions(+), 6 deletions(-) diff --git a/pkg/cachemanager/cachemanager.go b/pkg/cachemanager/cachemanager.go index 5af1e2c4b5a..00b1691d26a 100644 --- a/pkg/cachemanager/cachemanager.go +++ b/pkg/cachemanager/cachemanager.go @@ -318,6 +318,14 @@ func (c *CacheManager) syncGVK(ctx context.Context, gvk schema.GroupVersionKind) func (c *CacheManager) manageCache(ctx context.Context) { // relistStopChan is used to stop any list operations still in progress relistStopChan := make(chan struct{}) + // waitToCloseChan is used to wait on the relist goroutine to end + // when needing to create another one. This ensures that we are essentially + // only using a singleton routine to relist gvks. + waitToCloseChan := make(chan struct{}) + + // edge case: the 0th relist goroutine is "stopped", by definition, so we close the wait channel + // but it's also "running" so we don't close the kill channel in order to do so in the for loop below. + close(waitToCloseChan) for { select { @@ -341,7 +349,17 @@ func (c *CacheManager) manageCache(ctx context.Context) { // stop any goroutines that were relisting before // as we may no longer be interested in those gvks + // and wait with a timeout for the child gorountine to stop. close(relistStopChan) + select { + case <-waitToCloseChan: + // child goroutine exited gracefully + break + case <-time.After(time.Second * 10): + log.Error(fmt.Errorf("internal: background relist did not exit gracefully"), "possible goroutine leak") + // do not close waitToCloseChan as the goroutine may eventually exit and call close on the channel + break + } // assume all gvks need to be relisted // and while under lock, make a copy of @@ -352,8 +370,12 @@ func (c *CacheManager) manageCache(ctx context.Context) { // clean state c.needToList = false relistStopChan = make(chan struct{}) + waitToCloseChan = make(chan struct{}) - go c.replayGVKs(ctx, gvksToRelist, relistStopChan) + go func() { + c.replayGVKs(ctx, gvksToRelist, relistStopChan) + close(waitToCloseChan) + }() }() } } @@ -373,14 +395,23 @@ func (c *CacheManager) replayGVKs(ctx context.Context, gvksToRelist []schema.Gro case <-stopCh: return default: - operation := func() (bool, error) { - if err := c.syncGVK(ctx, gvk); err != nil { - return false, err + operation := func(ctx context.Context) (bool, error) { + select { + // make sure that the stop channel hasn't closed yet in order to stop + // the operation in the backoff retry-er earlier so we don't sync GVKs + // that we may not want to sync anymore. This also ensures that we exit + // the func as soon as possible. + case <-stopCh: + return true, nil + default: + if err := c.syncGVK(ctx, gvk); err != nil { + return false, err + } + return true, nil } - return true, nil } - if err := wait.ExponentialBackoff(backoff, operation); err != nil { + if err := wait.ExponentialBackoffWithContext(ctx, backoff, operation); err != nil { log.Error(err, "internal: error listings gvk cache data", "gvk", gvk) } else { gvksSet.Remove(gvk)