Skip to content

Commit

Permalink
VSS/instant-updates: more stable event watcher tests (#898)
Browse files Browse the repository at this point in the history
Wait for the EventWatcherStarted k8s event before proceeding with the
instant updates tests, to ensure the Vault event subscription is setup
before the tests proceed with modifying Vault. Also pass a
dereferenced VSS object to the vault event watcher goroutine to avoid
the EventWatcherStarted k8s event being emitted without a name or
namespace.
  • Loading branch information
tvoran committed Sep 21, 2024
1 parent 41f038f commit 8f82498
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 8 deletions.
19 changes: 11 additions & 8 deletions controllers/vaultstaticsecret_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,10 @@ func (r *VaultStaticSecretReconciler) ensureEventWatcher(ctx context.Context, o
// launch the goroutine to watch events
logger.V(consts.LogLevelDebug).Info("Starting event watcher", "meta", updatedMeta)
r.eventWatcherRegistry.Register(name, updatedMeta)
go r.getEvents(watchCtx, o, wsClient, stoppedCh)
// Pass a dereferenced VSS object here because it seems to avoid an issue
// where the EventWatcherStarted event is occasionally emitted without a
// name or namespace attached.
go r.getEvents(watchCtx, *o, wsClient, stoppedCh)

return nil
}
Expand All @@ -313,9 +316,9 @@ func (r *VaultStaticSecretReconciler) unWatchEvents(o *secretsv1beta1.VaultStati

// getEvents calls streamStaticSecretEvents in a loop, collecting and responding
// to any errors returned.
func (r *VaultStaticSecretReconciler) getEvents(ctx context.Context, o *secretsv1beta1.VaultStaticSecret, wsClient *vault.WebsocketClient, stoppedCh chan struct{}) {
func (r *VaultStaticSecretReconciler) getEvents(ctx context.Context, o secretsv1beta1.VaultStaticSecret, wsClient *vault.WebsocketClient, stoppedCh chan struct{}) {
logger := log.FromContext(ctx).WithName("getEvents")
name := client.ObjectKeyFromObject(o)
name := client.ObjectKeyFromObject(&o)
defer func() {
r.eventWatcherRegistry.Delete(name)
close(stoppedCh)
Expand Down Expand Up @@ -344,7 +347,7 @@ eventLoop:
}
time.Sleep(retryBackoff.NextBackOff())
}
err := r.streamStaticSecretEvents(ctx, o, wsClient)
err := r.streamStaticSecretEvents(ctx, &o, wsClient)
if err != nil {
if strings.Contains(err.Error(), "use of closed network connection") ||
strings.Contains(err.Error(), "context canceled") {
Expand All @@ -363,15 +366,15 @@ eventLoop:
// For any other errors, we emit the error as an event on the
// VaultStaticSecret, reload the client and try connecting
// again.
r.Recorder.Eventf(o, corev1.EventTypeWarning, consts.ReasonEventWatcherError,
r.Recorder.Eventf(&o, corev1.EventTypeWarning, consts.ReasonEventWatcherError,
"Error while watching events: %s", err)

if errorCount >= errorThreshold {
logger.Error(err, "Too many errors while watching events, requeuing")
break eventLoop
}

newVaultClient, err := r.ClientFactory.Get(ctx, r.Client, o)
newVaultClient, err := r.ClientFactory.Get(ctx, r.Client, &o)
if err != nil {
logger.Error(err, "Failed to retrieve Vault client")
break eventLoop
Expand All @@ -384,7 +387,7 @@ eventLoop:
}

// Update the LastClientID in the event registry
key := client.ObjectKeyFromObject(o)
key := client.ObjectKeyFromObject(&o)
meta, ok := r.eventWatcherRegistry.Get(key)
if !ok {
logger.Error(
Expand Down Expand Up @@ -434,7 +437,7 @@ func (r *VaultStaticSecretReconciler) streamStaticSecretEvents(ctx context.Conte

// We made it past the initial websocket connection, so emit a "good" event
// status
r.Recorder.Eventf(o, corev1.EventTypeNormal, consts.ReasonEventWatcherStarted, "Started watching events")
r.Recorder.Event(o, corev1.EventTypeNormal, consts.ReasonEventWatcherStarted, "Started watching events")

for {
select {
Expand Down
23 changes: 23 additions & 0 deletions test/integration/vaultstaticsecret_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,29 @@ func TestVaultStaticSecret(t *testing.T) {
awaitRolloutRestarts(t, ctx, crdClient, obj, obj.Spec.RolloutRestartTargets)
}
}

if expectInitial && obj.Spec.SyncConfig != nil && obj.Spec.SyncConfig.InstantUpdates {
// Ensure the (Vault) event watcher has started by waiting for the
// EventWatcherStarted k8s event so that subsequent Vault updates
// are detected and synced.
assert.NoError(t, backoff.Retry(func() error {
objEvents := corev1.EventList{}
err := crdClient.List(ctx, &objEvents,
ctrlclient.InNamespace(obj.Namespace),
ctrlclient.MatchingFields{
"involvedObject.name": obj.Name,
"reason": consts.ReasonEventWatcherStarted,
},
)
if err != nil {
return err
}
if len(objEvents.Items) == 0 {
return fmt.Errorf("no EventWatcherStarted event for %s", obj.Name)
}
return nil
}, backoff.WithMaxRetries(backoff.NewConstantBackOff(time.Millisecond*500), 200)))
}
}

for _, tt := range tests {
Expand Down

0 comments on commit 8f82498

Please sign in to comment.