Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make VPS integration tests fail early #700

Draft
wants to merge 23 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
e1b3b26
WIP: reconcile VDS instances on lifetimeWatcher errors
benashz Apr 11, 2024
dabe9d6
Drop some extraneous code
benashz Apr 11, 2024
a4f071c
Drop encrypt client lock
benashz Apr 11, 2024
92f68c5
Drop vault.IsLeaseNotFoundError()
benashz Apr 11, 2024
b95ab74
Revert changes to cache_storage_test
benashz Apr 11, 2024
5c00b9f
Merge branch 'main' into VAULT-25273/vds-trigger-reconciliation-on-to…
benashz Apr 12, 2024
e344117
Get Vault client early, trigger sync on cache key changes
benashz Apr 12, 2024
dbf1221
Add ID() method to the Vault Client interface
benashz Apr 15, 2024
5f39089
Misc updates
benashz Apr 15, 2024
ed5dc0d
Add SyncController tests
benashz Apr 16, 2024
175df1c
Add vaultClientCallback tests
benashz Apr 16, 2024
1cdae92
Add some client_factory tests
benashz Apr 16, 2024
f356a88
client cache storage: don't expose the token accessor
benashz Apr 16, 2024
725e7db
Optionally requeue a request on error.
benashz Apr 17, 2024
4115e99
Fix tests
benashz Apr 17, 2024
15c84d4
Merge branch 'main' into VAULT-25273/vds-trigger-reconciliation-on-to…
benashz Apr 19, 2024
1823da3
Add integration tests
benashz Apr 19, 2024
20f3c57
Post review updates
benashz Apr 19, 2024
e11cceb
Make postgres init more resilient
benashz Apr 19, 2024
adfc9a5
callbackHandler: continue on error
benashz Apr 19, 2024
b21080d
Misc fix
benashz Apr 19, 2024
462929b
postgres init: exit with a non-zero status on failure
benashz Apr 19, 2024
cf4f566
Make VPS integration tests fail early
benashz Apr 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .copywrite.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ project {
# Default: []
header_ignore = [
".idea/**",
"build/**"
# "vendor/**",
# "**autogen**",
]
Expand Down
10 changes: 10 additions & 0 deletions api/v1beta1/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,13 @@ type Template struct {
// Refer to https://pkg.go.dev/text/template for more information.
Text string `json:"text"`
}

// VaultClientMeta defines the observed state of the last Vault Client used to
// sync the secret. This status is used during resource reconciliation.
type VaultClientMeta struct {
// CacheKey is the unique key used to identify the client cache.
CacheKey string `json:"cacheKey,omitempty"`
// ID is the Vault ID of the authenticated client. The ID should never contain
// any sensitive information.
ID string `json:"id,omitempty"`
}
3 changes: 3 additions & 0 deletions api/v1beta1/vaultdynamicsecret_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ type VaultDynamicSecretStatus struct {
// If drift is detected the data will be synced to the Destination.
// SecretMAC will only be stored when VaultDynamicSecretSpec.AllowStaticCreds is true.
SecretMAC string `json:"secretMAC,omitempty"`
// VaultClientMeta contains the status of the Vault client and is used during
// resource reconciliation.
VaultClientMeta VaultClientMeta `json:"vaultClientMeta,omitempty"`
}

type VaultSecretLease struct {
Expand Down
16 changes: 16 additions & 0 deletions api/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions chart/crds/secrets.hashicorp.com_vaultdynamicsecrets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,21 @@ spec:
- rotationPeriod
- ttl
type: object
vaultClientMeta:
description: |-
VaultClientMeta contains the status of the Vault client and is used during
resource reconciliation.
properties:
cacheKey:
description: CacheKey is the unique key used to identify the client
cache.
type: string
id:
description: |-
ID is the Vault ID of the authenticated client. The ID should never contain
any sensitive information.
type: string
type: object
required:
- lastGeneration
- lastRenewalTime
Expand Down
15 changes: 15 additions & 0 deletions config/crd/bases/secrets.hashicorp.com_vaultdynamicsecrets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,21 @@ spec:
- rotationPeriod
- ttl
type: object
vaultClientMeta:
description: |-
VaultClientMeta contains the status of the Vault client and is used during
resource reconciliation.
properties:
cacheKey:
description: CacheKey is the unique key used to identify the client
cache.
type: string
id:
description: |-
ID is the Vault ID of the authenticated client. The ID should never contain
any sensitive information.
type: string
type: object
required:
- lastGeneration
- lastRenewalTime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ spec:
args:
- "-client-cache-persistence-model=direct-encrypted"
- "-min-refresh-after-hvsa=3s"
- "-zap-log-level=5"
- "-zap-log-level=6"
8 changes: 5 additions & 3 deletions controllers/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ import (
)

var (
_ error = (*LeaseTruncatedError)(nil)
random = rand.New(rand.NewSource(int64(time.Now().Nanosecond())))
requeueDurationOnError = time.Second * 5
_ error = (*LeaseTruncatedError)(nil)
// random is not cryptographically secure, should not be used in any crypto
// type of operations.
random = rand.New(rand.NewSource(int64(time.Now().Nanosecond())))
requeueDurationOnError = time.Second * 5
// used by monkey patching unit tests
nowFunc = time.Now
)
Expand Down
215 changes: 215 additions & 0 deletions controllers/sync_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1

package controllers

import (
"context"
"errors"
"sync"
"time"

"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/hashicorp/vault-secrets-operator/internal/consts"
)

// SecretReconciler is an interface for a controller that can reconcile secrets.
// It is a subset of the controller-runtime Reconciler interface.
type SecretReconciler interface {
Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error)
Syncer
}

// Syncer is an interface for a controller that can sync secrets.
type Syncer interface {
Sync(ctx context.Context, req SyncRequest) (ctrl.Result, error)
Start(ctx context.Context) error
}

// SyncControllerOptions is the options for creating a SyncController.
type SyncControllerOptions struct {
// Name is the name of the controller. It is used for logging.
Name string
// Syncer is the Syncer to delegate sync requests to. It is required.
Syncer Syncer
// MaxConcurrentSyncs is the maximum number of concurrent syncs. It defaults to 1.
MaxConcurrentSyncs int
}

// SyncController is an interface for a controller that can sync secrets.
type SyncController interface {
Syncer
Start(ctx context.Context) error
}

// NewSyncController creates a new SyncController with the given Syncer and options.
func NewSyncController(opts SyncControllerOptions) (SyncController, error) {
maxConcurrentSyncs := opts.MaxConcurrentSyncs
if maxConcurrentSyncs < 1 {
maxConcurrentSyncs = 1
}
c := &defaultSyncController{
do: opts.Syncer,
maxConcurrentSyncs: maxConcurrentSyncs,
queue: workqueue.NewRateLimitingQueueWithConfig(
workqueue.DefaultControllerRateLimiter(),
workqueue.RateLimitingQueueConfig{
Name: opts.Name,
},
),
}

return c, nil
}

// SyncRequest is a request to sync a secret.
type SyncRequest struct {
// Request is the reconcile request for the secret.
ctrl.Request
// Delay is the delay before syncing the secret.
Delay time.Duration
// RequeueOnErr is a flag to requeue the request on error.
RequeueOnErr bool
}

var _ SyncController = &defaultSyncController{}

// defaultSyncController handles delegated secret reconciliation requests from a
// Syncer. The queue processing is based off of the controller-runtime
// internal/controller code, minus the k8s watchers and event handling.
type defaultSyncController struct {
do Syncer
queue workqueue.RateLimitingInterface
maxConcurrentSyncs int
mu sync.RWMutex
started bool
}

// Sync implements the SyncController interface. It delegates the SyncRequest to the Syncer.
func (c *defaultSyncController) Sync(ctx context.Context, req SyncRequest) (ctrl.Result, error) {
return ctrl.Result{}, c.syncHandler(ctx, req)
}

// Start starts the sync controller. It will start the sync workers and block
// until the context is done. It returns an error if the controller is already
// started. The context is used to stop the controller
func (c *defaultSyncController) Start(ctx context.Context) error {
c.mu.Lock()
if c.started {
return errors.New("controller already started")
}

go func() {
<-ctx.Done()
log.FromContext(ctx).Info("Shutting down sync controller")
c.queue.ShutDown()
}()

wg := &sync.WaitGroup{}
defer c.mu.Unlock()
wg.Add(c.maxConcurrentSyncs)
for i := 0; i < c.maxConcurrentSyncs; i++ {
go func() {
defer wg.Done()
for c.processNextWorkItem(ctx) {
}
}()
}
c.started = true

<-ctx.Done()
wg.Wait()
return nil
}

// processNextWorkItem processes the next item in the queue. It returns false if
// the queue is shutting down.
func (c *defaultSyncController) processNextWorkItem(ctx context.Context) bool {
obj, shutdown := c.queue.Get()
if shutdown {
// The queue is shutting down.
return false
}

defer c.queue.Done(obj)

req, ok := obj.(SyncRequest)
if !ok {
log.FromContext(ctx).V(consts.LogLevelDebug).Info(
"Dropping invalid item in queue, expected SyncRequest",
"actual", obj)
c.queue.Forget(obj)
} else {
_ = c.syncHandler(ctx, req)
}

return true
}

// syncHandler handles a single sync request. It delegates the request to the
// Syncer. It returns an error if the sync fails. It also handles enqueuing the
// request if SyncRequest.Request has Requeue or RequeueAfter set. When
// SyncRequest.Delay is set, the sync will happen later.
// Delayed requests are scheduled in the future, typically those would be
// scheduled outside a Reconciler's Reconile method.
func (c *defaultSyncController) syncHandler(ctx context.Context, req SyncRequest) error {
// If the request has a delay, we need to requeue it with the delay.
if req.Delay > 0 {
c.queue.Forget(req)
req.Delay = 0
c.queue.AddAfter(req, req.Delay)
return nil
}

syncID := uuid.NewUUID()
logger := log.FromContext(ctx).WithValues("syncID", syncID,
"name", req.Request.Name,
"namespace", req.Request.Namespace,
)
ctx = log.IntoContext(ctx, logger)
ctx = addSyncID(ctx, syncID)

debugLogger := logger.V(consts.LogLevelDebug)
debugLogger.Info("Syncing")
result, err := c.do.Sync(ctx, req)
if err != nil {
if req.RequeueOnErr && !errors.Is(err, reconcile.TerminalError(nil)) {
c.queue.AddRateLimited(req)
}
logger.Error(err, "Sync error")
return err
}

switch {
case result.RequeueAfter > 0:
// RequeueAfter is set, requeue the request after the delay.
debugLogger.Info(
"Sync done", "horizon", result.RequeueAfter)
c.queue.Forget(req)
c.queue.AddAfter(req, result.RequeueAfter)
case result.Requeue:
// Requeue is set, requeue the request.
debugLogger.Info("Sync done, requeue rate limited")
c.queue.AddRateLimited(req)
default:
// Forget the request to avoid tracking failures forever.
debugLogger.Info("Sync successful")
c.queue.Forget(req)
}

return nil
}

// syncIDKey is the context key for the sync ID. It is used to correlate log messages.
type syncIDKey struct{}

// addSyncID adds the sync ID to the context. It is used to correlate log messages.
func addSyncID(ctx context.Context, reconcileID types.UID) context.Context {
return context.WithValue(ctx, syncIDKey{}, reconcileID)
}
Loading
Loading