Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VSS/instant-updates: more stable event watcher #898

Merged
merged 7 commits into from
Sep 21, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions controllers/vaultstaticsecret_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,9 @@ func (r *VaultStaticSecretReconciler) ensureEventWatcher(ctx context.Context, o
// launch the goroutine to watch events
logger.V(consts.LogLevelDebug).Info("Starting event watcher", "meta", updatedMeta)
r.eventWatcherRegistry.Register(name, updatedMeta)
go r.getEvents(watchCtx, o, wsClient, stoppedCh)
// Pass a dereferenced VSS object here to avoid the effect of the VSS object
// being garbage collected before the goroutine is started/evaluated
go r.getEvents(watchCtx, *o, wsClient, stoppedCh)

return nil
}
Expand All @@ -312,9 +314,9 @@ func (r *VaultStaticSecretReconciler) unWatchEvents(o *secretsv1beta1.VaultStati

// getEvents calls streamStaticSecretEvents in a loop, collecting and responding
// to any errors returned.
func (r *VaultStaticSecretReconciler) getEvents(ctx context.Context, o *secretsv1beta1.VaultStaticSecret, wsClient *vault.WebsocketClient, stoppedCh chan struct{}) {
func (r *VaultStaticSecretReconciler) getEvents(ctx context.Context, o secretsv1beta1.VaultStaticSecret, wsClient *vault.WebsocketClient, stoppedCh chan struct{}) {
logger := log.FromContext(ctx).WithName("getEvents")
name := client.ObjectKeyFromObject(o)
name := client.ObjectKeyFromObject(&o)
defer func() {
r.eventWatcherRegistry.Delete(name)
close(stoppedCh)
Expand Down Expand Up @@ -343,7 +345,7 @@ eventLoop:
}
time.Sleep(retryBackoff.NextBackOff())
}
err := r.streamStaticSecretEvents(ctx, o, wsClient)
err := r.streamStaticSecretEvents(ctx, &o, wsClient)
if err != nil {
if strings.Contains(err.Error(), "use of closed network connection") ||
strings.Contains(err.Error(), "context canceled") {
Expand All @@ -362,15 +364,15 @@ eventLoop:
// For any other errors, we emit the error as an event on the
// VaultStaticSecret, reload the client and try connecting
// again.
r.Recorder.Eventf(o, corev1.EventTypeWarning, consts.ReasonEventWatcherError,
r.Recorder.Eventf(&o, corev1.EventTypeWarning, consts.ReasonEventWatcherError,
"Error while watching events: %s", err)

if errorCount >= errorThreshold {
logger.Error(err, "Too many errors while watching events, requeuing")
break eventLoop
}

newVaultClient, err := r.ClientFactory.Get(ctx, r.Client, o)
newVaultClient, err := r.ClientFactory.Get(ctx, r.Client, &o)
if err != nil {
logger.Error(err, "Failed to retrieve Vault client")
break eventLoop
Expand All @@ -383,7 +385,7 @@ eventLoop:
}

// Update the LastClientID in the event registry
key := client.ObjectKeyFromObject(o)
key := client.ObjectKeyFromObject(&o)
meta, ok := r.eventWatcherRegistry.Get(key)
if !ok {
logger.Error(
Expand Down
22 changes: 22 additions & 0 deletions test/integration/vaultstaticsecret_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,28 @@ func TestVaultStaticSecret(t *testing.T) {
awaitRolloutRestarts(t, ctx, crdClient, obj, obj.Spec.RolloutRestartTargets)
}
}

if expectInitial && obj.Spec.SyncConfig != nil && obj.Spec.SyncConfig.InstantUpdates {
// Ensure the event watcher has started so that subsequent updates
// are detected and synced.
assert.NoError(t, backoff.Retry(func() error {
objEvents := corev1.EventList{}
err := crdClient.List(ctx, &objEvents,
ctrlclient.InNamespace(obj.Namespace),
ctrlclient.MatchingFields{
"involvedObject.name": obj.Name,
"reason": consts.ReasonEventWatcherStarted,
},
)
if err != nil {
return err
}
if len(objEvents.Items) == 0 {
return fmt.Errorf("no EventWatcherStarted event for %s", obj.Name)
}
return nil
}, backoff.WithMaxRetries(backoff.NewConstantBackOff(time.Millisecond*500), 200)))
}
}

for _, tt := range tests {
Expand Down