From 8f82498e38c304493706ca8de89a39e5a263e378 Mon Sep 17 00:00:00 2001 From: Theron Voran Date: Fri, 20 Sep 2024 17:36:55 -0700 Subject: [PATCH] VSS/instant-updates: more stable event watcher tests (#898) Wait for the EventWatcherStarted k8s event before proceeding with the instant updates tests, to ensure the Vault event subscription is setup before the tests proceed with modifying Vault. Also pass a dereferenced VSS object to the vault event watcher goroutine to avoid the EventWatcherStarted k8s event being emitted without a name or namespace. --- controllers/vaultstaticsecret_controller.go | 19 ++++++++------- .../vaultstaticsecret_integration_test.go | 23 +++++++++++++++++++ 2 files changed, 34 insertions(+), 8 deletions(-) diff --git a/controllers/vaultstaticsecret_controller.go b/controllers/vaultstaticsecret_controller.go index 7730b030..da9f8fdb 100644 --- a/controllers/vaultstaticsecret_controller.go +++ b/controllers/vaultstaticsecret_controller.go @@ -293,7 +293,10 @@ func (r *VaultStaticSecretReconciler) ensureEventWatcher(ctx context.Context, o // launch the goroutine to watch events logger.V(consts.LogLevelDebug).Info("Starting event watcher", "meta", updatedMeta) r.eventWatcherRegistry.Register(name, updatedMeta) - go r.getEvents(watchCtx, o, wsClient, stoppedCh) + // Pass a dereferenced VSS object here because it seems to avoid an issue + // where the EventWatcherStarted event is occasionally emitted without a + // name or namespace attached. + go r.getEvents(watchCtx, *o, wsClient, stoppedCh) return nil } @@ -313,9 +316,9 @@ func (r *VaultStaticSecretReconciler) unWatchEvents(o *secretsv1beta1.VaultStati // getEvents calls streamStaticSecretEvents in a loop, collecting and responding // to any errors returned. -func (r *VaultStaticSecretReconciler) getEvents(ctx context.Context, o *secretsv1beta1.VaultStaticSecret, wsClient *vault.WebsocketClient, stoppedCh chan struct{}) { +func (r *VaultStaticSecretReconciler) getEvents(ctx context.Context, o secretsv1beta1.VaultStaticSecret, wsClient *vault.WebsocketClient, stoppedCh chan struct{}) { logger := log.FromContext(ctx).WithName("getEvents") - name := client.ObjectKeyFromObject(o) + name := client.ObjectKeyFromObject(&o) defer func() { r.eventWatcherRegistry.Delete(name) close(stoppedCh) @@ -344,7 +347,7 @@ eventLoop: } time.Sleep(retryBackoff.NextBackOff()) } - err := r.streamStaticSecretEvents(ctx, o, wsClient) + err := r.streamStaticSecretEvents(ctx, &o, wsClient) if err != nil { if strings.Contains(err.Error(), "use of closed network connection") || strings.Contains(err.Error(), "context canceled") { @@ -363,7 +366,7 @@ eventLoop: // For any other errors, we emit the error as an event on the // VaultStaticSecret, reload the client and try connecting // again. - r.Recorder.Eventf(o, corev1.EventTypeWarning, consts.ReasonEventWatcherError, + r.Recorder.Eventf(&o, corev1.EventTypeWarning, consts.ReasonEventWatcherError, "Error while watching events: %s", err) if errorCount >= errorThreshold { @@ -371,7 +374,7 @@ eventLoop: break eventLoop } - newVaultClient, err := r.ClientFactory.Get(ctx, r.Client, o) + newVaultClient, err := r.ClientFactory.Get(ctx, r.Client, &o) if err != nil { logger.Error(err, "Failed to retrieve Vault client") break eventLoop @@ -384,7 +387,7 @@ eventLoop: } // Update the LastClientID in the event registry - key := client.ObjectKeyFromObject(o) + key := client.ObjectKeyFromObject(&o) meta, ok := r.eventWatcherRegistry.Get(key) if !ok { logger.Error( @@ -434,7 +437,7 @@ func (r *VaultStaticSecretReconciler) streamStaticSecretEvents(ctx context.Conte // We made it past the initial websocket connection, so emit a "good" event // status - r.Recorder.Eventf(o, corev1.EventTypeNormal, consts.ReasonEventWatcherStarted, "Started watching events") + r.Recorder.Event(o, corev1.EventTypeNormal, consts.ReasonEventWatcherStarted, "Started watching events") for { select { diff --git a/test/integration/vaultstaticsecret_integration_test.go b/test/integration/vaultstaticsecret_integration_test.go index a02b5233..e7a52a92 100644 --- a/test/integration/vaultstaticsecret_integration_test.go +++ b/test/integration/vaultstaticsecret_integration_test.go @@ -435,6 +435,29 @@ func TestVaultStaticSecret(t *testing.T) { awaitRolloutRestarts(t, ctx, crdClient, obj, obj.Spec.RolloutRestartTargets) } } + + if expectInitial && obj.Spec.SyncConfig != nil && obj.Spec.SyncConfig.InstantUpdates { + // Ensure the (Vault) event watcher has started by waiting for the + // EventWatcherStarted k8s event so that subsequent Vault updates + // are detected and synced. + assert.NoError(t, backoff.Retry(func() error { + objEvents := corev1.EventList{} + err := crdClient.List(ctx, &objEvents, + ctrlclient.InNamespace(obj.Namespace), + ctrlclient.MatchingFields{ + "involvedObject.name": obj.Name, + "reason": consts.ReasonEventWatcherStarted, + }, + ) + if err != nil { + return err + } + if len(objEvents.Items) == 0 { + return fmt.Errorf("no EventWatcherStarted event for %s", obj.Name) + } + return nil + }, backoff.WithMaxRetries(backoff.NewConstantBackOff(time.Millisecond*500), 200))) + } } for _, tt := range tests {