Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VSS/instant-updates: more stable event watcher #898

Merged
merged 7 commits into from
Sep 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions controllers/vaultstaticsecret_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,10 @@ func (r *VaultStaticSecretReconciler) ensureEventWatcher(ctx context.Context, o
// launch the goroutine to watch events
logger.V(consts.LogLevelDebug).Info("Starting event watcher", "meta", updatedMeta)
r.eventWatcherRegistry.Register(name, updatedMeta)
go r.getEvents(watchCtx, o, wsClient, stoppedCh)
// Pass a dereferenced VSS object here because it seems to avoid an issue
// where the EventWatcherStarted event is occasionally emitted without a
// name or namespace attached.
go r.getEvents(watchCtx, *o, wsClient, stoppedCh)

return nil
}
Expand All @@ -313,9 +316,9 @@ func (r *VaultStaticSecretReconciler) unWatchEvents(o *secretsv1beta1.VaultStati

// getEvents calls streamStaticSecretEvents in a loop, collecting and responding
// to any errors returned.
func (r *VaultStaticSecretReconciler) getEvents(ctx context.Context, o *secretsv1beta1.VaultStaticSecret, wsClient *vault.WebsocketClient, stoppedCh chan struct{}) {
func (r *VaultStaticSecretReconciler) getEvents(ctx context.Context, o secretsv1beta1.VaultStaticSecret, wsClient *vault.WebsocketClient, stoppedCh chan struct{}) {
logger := log.FromContext(ctx).WithName("getEvents")
name := client.ObjectKeyFromObject(o)
name := client.ObjectKeyFromObject(&o)
defer func() {
r.eventWatcherRegistry.Delete(name)
close(stoppedCh)
Expand Down Expand Up @@ -344,7 +347,7 @@ eventLoop:
}
time.Sleep(retryBackoff.NextBackOff())
}
err := r.streamStaticSecretEvents(ctx, o, wsClient)
err := r.streamStaticSecretEvents(ctx, &o, wsClient)
if err != nil {
if strings.Contains(err.Error(), "use of closed network connection") ||
strings.Contains(err.Error(), "context canceled") {
Expand All @@ -363,15 +366,15 @@ eventLoop:
// For any other errors, we emit the error as an event on the
// VaultStaticSecret, reload the client and try connecting
// again.
r.Recorder.Eventf(o, corev1.EventTypeWarning, consts.ReasonEventWatcherError,
r.Recorder.Eventf(&o, corev1.EventTypeWarning, consts.ReasonEventWatcherError,
"Error while watching events: %s", err)

if errorCount >= errorThreshold {
logger.Error(err, "Too many errors while watching events, requeuing")
break eventLoop
}

newVaultClient, err := r.ClientFactory.Get(ctx, r.Client, o)
newVaultClient, err := r.ClientFactory.Get(ctx, r.Client, &o)
if err != nil {
logger.Error(err, "Failed to retrieve Vault client")
break eventLoop
Expand All @@ -384,7 +387,7 @@ eventLoop:
}

// Update the LastClientID in the event registry
key := client.ObjectKeyFromObject(o)
key := client.ObjectKeyFromObject(&o)
meta, ok := r.eventWatcherRegistry.Get(key)
if !ok {
logger.Error(
Expand Down Expand Up @@ -434,7 +437,7 @@ func (r *VaultStaticSecretReconciler) streamStaticSecretEvents(ctx context.Conte

// We made it past the initial websocket connection, so emit a "good" event
// status
r.Recorder.Eventf(o, corev1.EventTypeNormal, consts.ReasonEventWatcherStarted, "Started watching events")
r.Recorder.Event(o, corev1.EventTypeNormal, consts.ReasonEventWatcherStarted, "Started watching events")

for {
select {
Expand Down
23 changes: 23 additions & 0 deletions test/integration/vaultstaticsecret_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,29 @@ func TestVaultStaticSecret(t *testing.T) {
awaitRolloutRestarts(t, ctx, crdClient, obj, obj.Spec.RolloutRestartTargets)
}
}

if expectInitial && obj.Spec.SyncConfig != nil && obj.Spec.SyncConfig.InstantUpdates {
// Ensure the (Vault) event watcher has started by waiting for the
// EventWatcherStarted k8s event so that subsequent Vault updates
// are detected and synced.
assert.NoError(t, backoff.Retry(func() error {
objEvents := corev1.EventList{}
err := crdClient.List(ctx, &objEvents,
ctrlclient.InNamespace(obj.Namespace),
ctrlclient.MatchingFields{
"involvedObject.name": obj.Name,
"reason": consts.ReasonEventWatcherStarted,
},
)
if err != nil {
return err
}
if len(objEvents.Items) == 0 {
return fmt.Errorf("no EventWatcherStarted event for %s", obj.Name)
}
return nil
}, backoff.WithMaxRetries(backoff.NewConstantBackOff(time.Millisecond*500), 200)))
}
}

for _, tt := range tests {
Expand Down