Skip to content

Commit

Permalink
feat: enhance replay (#2984)
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Pana <[email protected]>
Signed-off-by: alex <[email protected]>
Co-authored-by: Rita Zhang <[email protected]>
  • Loading branch information
acpana and ritazh authored Sep 18, 2023
1 parent 2ed7d79 commit 2a50e1e
Showing 1 changed file with 37 additions and 6 deletions.
43 changes: 37 additions & 6 deletions pkg/cachemanager/cachemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,14 @@ func (c *CacheManager) syncGVK(ctx context.Context, gvk schema.GroupVersionKind)
func (c *CacheManager) manageCache(ctx context.Context) {
// relistStopChan is used to stop any list operations still in progress
relistStopChan := make(chan struct{})
// waitToCloseChan is used to wait on the relist goroutine to end
// when needing to create another one. This ensures that we are essentially
// only using a singleton routine to relist gvks.
waitToCloseChan := make(chan struct{})

// edge case: the 0th relist goroutine is "stopped", by definition, so we close the wait channel
// but it's also "running" so we don't close the kill channel in order to do so in the for loop below.
close(waitToCloseChan)

for {
select {
Expand All @@ -341,7 +349,17 @@ func (c *CacheManager) manageCache(ctx context.Context) {

// stop any goroutines that were relisting before
// as we may no longer be interested in those gvks
// and wait with a timeout for the child gorountine to stop.
close(relistStopChan)
select {
case <-waitToCloseChan:
// child goroutine exited gracefully
break
case <-time.After(time.Second * 10):
log.Error(fmt.Errorf("internal: background relist did not exit gracefully"), "possible goroutine leak")
// do not close waitToCloseChan as the goroutine may eventually exit and call close on the channel
break
}

// assume all gvks need to be relisted
// and while under lock, make a copy of
Expand All @@ -352,8 +370,12 @@ func (c *CacheManager) manageCache(ctx context.Context) {
// clean state
c.needToList = false
relistStopChan = make(chan struct{})
waitToCloseChan = make(chan struct{})

go c.replayGVKs(ctx, gvksToRelist, relistStopChan)
go func() {
c.replayGVKs(ctx, gvksToRelist, relistStopChan)
close(waitToCloseChan)
}()
}()
}
}
Expand All @@ -373,14 +395,23 @@ func (c *CacheManager) replayGVKs(ctx context.Context, gvksToRelist []schema.Gro
case <-stopCh:
return
default:
operation := func() (bool, error) {
if err := c.syncGVK(ctx, gvk); err != nil {
return false, err
operation := func(ctx context.Context) (bool, error) {
select {
// make sure that the stop channel hasn't closed yet in order to stop
// the operation in the backoff retry-er earlier so we don't sync GVKs
// that we may not want to sync anymore. This also ensures that we exit
// the func as soon as possible.
case <-stopCh:
return true, nil
default:
if err := c.syncGVK(ctx, gvk); err != nil {
return false, err
}
return true, nil
}
return true, nil
}

if err := wait.ExponentialBackoff(backoff, operation); err != nil {
if err := wait.ExponentialBackoffWithContext(ctx, backoff, operation); err != nil {
log.Error(err, "internal: error listings gvk cache data", "gvk", gvk)
} else {
gvksSet.Remove(gvk)
Expand Down

0 comments on commit 2a50e1e

Please sign in to comment.