From 0d61cd60c78259819202fc64abbc65ec441d66e0 Mon Sep 17 00:00:00 2001 From: Nikolay Mateev Date: Tue, 11 Feb 2020 20:54:47 +0200 Subject: [PATCH] Operations Maintainer (#433) --- Gopkg.lock | 40 +-- application.yml | 6 +- config/config_test.go | 11 +- operations/config.go | 57 ++- operations/maintainer.go | 324 +++++++++++++++--- operations/scheduler.go | 36 +- pkg/sm/sm.go | 8 +- storage/encrypting_repository.go | 19 +- .../smaap_service_binding_interceptor.go | 10 +- .../smaap_service_instance_interceptor.go | 10 +- storage/postgres/keystore.go | 42 +-- storage/postgres/keystore_test.go | 54 --- storage/postgres/locker.go | 168 +++++++++ storage/postgres/locker_test.go | 160 +++++++++ storage/postgres/storage.go | 1 - storage/postgres/storage_test.go | 17 +- test/broker_test/broker_test.go | 2 +- test/common/test_context.go | 15 +- test/configuration_test/configuration_test.go | 7 +- test/operations_test/operations_test.go | 59 +++- test/platform_test/platform_test.go | 2 +- .../service_binding_test.go | 231 ++++++++++++- .../service_instance_test.go | 290 ++++++++++++++-- .../service_offering_test.go | 2 +- test/service_plan_test/service_plan_test.go | 2 +- test/test.go | 36 +- test/visibility_test/visibility_test.go | 2 +- 27 files changed, 1295 insertions(+), 316 deletions(-) create mode 100644 storage/postgres/locker.go create mode 100644 storage/postgres/locker_test.go diff --git a/Gopkg.lock b/Gopkg.lock index 9237a9189..813cff0d9 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -163,12 +163,12 @@ revision = "23def4e6c14b4da8ac2ed8007337bc5eb5007998" [[projects]] - digest = "1:573ca21d3669500ff845bdebee890eb7fc7f0f50c59f2132f2a0c6b03d85086a" + digest = "1:d1e35b720b5f5156502ffdd174000c81295919293735342da78582e4dc7a8bcd" name = "github.com/golang/protobuf" packages = ["proto"] pruneopts = "UT" - revision = "6c65a5562fc06764971b7c5d05c76c75e84bdbf7" - version = "v1.3.2" + revision = "d23c5127dc24889085f8ccea5c9d560a57a879d8" + version = "v1.3.3" [[projects]] digest = "1:a63cff6b5d8b95638bfe300385d93b2a6d9d687734b863da8e09dc834510a690" @@ -256,7 +256,7 @@ revision = "2ba0fc60eb4a54030f3a6d73ff0a047349c7eeca" [[projects]] - digest = "1:cb7edefacdcbfd95b7611c11b3b027404fa39a66fdc91f6366e1811cbdb5cd3e" + digest = "1:1d965d8955ca44e9c963e24fdf0517033ca1820b6643ec546106dc2040ed54a0" name = "github.com/klauspost/compress" packages = [ "flate", @@ -264,8 +264,8 @@ "zlib", ] pruneopts = "UT" - revision = "459b83aadb42b806aed42f0f4b3240c8834e0cc1" - version = "v1.9.8" + revision = "b9d5dc7bd435c628cb0c658ce7f556409007ea71" + version = "v1.10.0" [[projects]] digest = "1:31e761d97c76151dde79e9d28964a812c46efc5baee4085b86f68f0c654450de" @@ -328,7 +328,7 @@ version = "v0.4.1" [[projects]] - digest = "1:ad7bc85a2256ae7246450290c96619db309d8d8f1918a3163e9af90ef1d7a077" + digest = "1:13b913f297752cf0f236c0ab940d0ffaac64e5a97b7bb68c659a9f734bd32a89" name = "github.com/onsi/ginkgo" packages = [ ".", @@ -352,8 +352,8 @@ "types", ] pruneopts = "UT" - revision = "388ac7e50a3abf0798010091d5094171f4aefc0b" - version = "v1.11.0" + revision = "40598150331533e3cd497f21dcce387dae84b561" + version = "v1.12.0" [[projects]] digest = "1:e42321c3ec0ff36c0644da60c2c1469886b214134286f4610199b704619e11a3" @@ -475,12 +475,12 @@ version = "v1.4.0" [[projects]] - digest = "1:fcea9aca14ce388baeb3afd42bc3302089393f78f9531212aed88d60a134a921" + digest = "1:13503b1b68ee704913caf452f02968fa313a9934e67d951ed0d39ca8e230d5e0" name = "github.com/tidwall/gjson" packages = ["."] pruneopts = "UT" - revision = "d10932a0d0b5f1618759b6259b05f7cb7bea0c25" - version = "v1.4.0" + revision = "0360deb6d803e8c271363ce5f6c85d6cd843a3a0" + version = "v1.5.0" [[projects]] digest = "1:8453ddbed197809ee8ca28b06bd04e127bec9912deb4ba451fea7a1eca578328" @@ -491,12 +491,12 @@ version = "v1.0.1" [[projects]] - digest = "1:ddfe0a54e5f9b29536a6d7b2defa376f2cb2b6e4234d676d7ff214d5b097cb50" + digest = "1:f63bab79e68e805cdd9bf70daa09e8c430cfbddf29d14b567a92fb12581b9b95" name = "github.com/tidwall/pretty" packages = ["."] pruneopts = "UT" - revision = "1166b9ac2b65e46a43d8618d30d1554f4652d49b" - version = "v1.0.0" + revision = "b2475501f89994f7ea30b3c94ba86b49079961fe" + version = "v1.0.1" [[projects]] digest = "1:b70c951ba6fdeecfbd50dabe95aa5e1b973866ae9abbece46ad60348112214f2" @@ -586,11 +586,11 @@ "pbkdf2", ] pruneopts = "UT" - revision = "530e935923ad688be97c15eeb8e5ee42ebf2b54a" + revision = "86ce3cb696783b739e41e834e2eead3e1b4aa3fb" [[projects]] branch = "master" - digest = "1:317bb86f606d3cdb6b282cea377e61ce50023b16bdc72d2edfb2ef0164f18364" + digest = "1:7217cd222c72ced17b355f472688cd619e80c8b2e811cbe8b68b739091721173" name = "golang.org/x/net" packages = [ "context", @@ -602,7 +602,7 @@ "publicsuffix", ] pruneopts = "UT" - revision = "6afb5195e5aab057fda82e27171243402346b0ad" + revision = "16171245cfb220d5317888b716d69c1fb4e7992b" [[projects]] branch = "master" @@ -617,11 +617,11 @@ [[projects]] branch = "master" - digest = "1:8a44970c7e8c0a1c8646af14605c1ffd31374074d68101c2e11d7761df12c9d1" + digest = "1:72b7c210f8cfe1431d2f300fbf37f25e52aa77324b05ab6b698483054033803e" name = "golang.org/x/sys" packages = ["unix"] pruneopts = "UT" - revision = "e047566fdf82409bf7a52212cf71df83ea2772fb" + revision = "d101bd2416d505c0448a6ce8a282482678040a89" [[projects]] digest = "1:28deae5fe892797ff37a317b5bcda96d11d1c90dadd89f1337651df3bc4c586e" diff --git a/application.yml b/application.yml index 15796f5c4..90b1b0842 100644 --- a/application.yml +++ b/application.yml @@ -14,7 +14,7 @@ websocket: ping_timeout: 6000ms write_timeout: 6000ms log: - level: error + level: debug format: kibana storage: # name: sm-postgres @@ -27,8 +27,8 @@ api: client_id: cf operations: cleanup_interval: 30m - job_timeout: 12m - scheduled_deletion_timeout: 12h + action_timeout: 12m + reconciliation_operation_timeout: 12h polling_interval: 5s rescheduling_interval: 5s pools: diff --git a/config/config_test.go b/config/config_test.go index 0d6ab4c50..2735f10e5 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -242,7 +242,7 @@ var _ = Describe("config", func() { Context("when operation job timeout is < 0", func() { It("returns an error", func() { - config.Operations.JobTimeout = -time.Second + config.Operations.ActionTimeout = -time.Second assertErrorDuringValidate() }) }) @@ -254,16 +254,9 @@ var _ = Describe("config", func() { }) }) - Context("when operation mark orphans interval is < 0", func() { - It("returns an error", func() { - config.Operations.MarkOrphansInterval = -time.Second - assertErrorDuringValidate() - }) - }) - Context("when operation scheduled deletion timeoutt is < 0", func() { It("returns an error", func() { - config.Operations.ScheduledDeletionTimeout = -time.Second + config.Operations.ReconciliationOperationTimeout = -time.Second assertErrorDuringValidate() }) }) diff --git a/operations/config.go b/operations/config.go index 139ffdd4a..430b188e6 100644 --- a/operations/config.go +++ b/operations/config.go @@ -24,55 +24,52 @@ import ( const ( minTimePeriod = time.Nanosecond - defaultMarkOrphansInterval = 24 * time.Hour - defaultJobTimeout = 7*24*time.Hour - 1*time.Hour + defaultActionTimeout = 12 * time.Hour + defaultOperationLifespan = 7 * 24 * time.Hour - defaultCleanupInterval = 1 * time.Hour - defaultExpirationTime = 7 * 24 * time.Hour + defaultCleanupInterval = 24 * time.Hour ) // Settings type to be loaded from the environment type Settings struct { - JobTimeout time.Duration `mapstructure:"job_timeout" description:"timeout for async operations"` - MarkOrphansInterval time.Duration `mapstructure:"mark_orphans_interval" description:"interval denoting how often to mark orphan operations as failed"` - CleanupInterval time.Duration `mapstructure:"cleanup_interval" description:"cleanup interval of old operations"` - ExpirationTime time.Duration `mapstructure:"expiration_time" description:"after that time is passed since its creation, the operation can be cleaned up by the maintainer"` - DefaultPoolSize int `mapstructure:"default_pool_size" description:"default worker pool size"` - Pools []PoolSettings `mapstructure:"pools" description:"defines the different available worker pools"` - - ScheduledDeletionTimeout time.Duration `mapstructure:"scheduled_deletion_timeout" description:"the maximum allowed timeout for auto rescheduling of operation actions"` - ReschedulingInterval time.Duration `mapstructure:"rescheduling_interval" description:"the interval between auto rescheduling of operation actions"` - PollingInterval time.Duration `mapstructure:"polling_interval" description:"the interval between polls for async requests"` + ActionTimeout time.Duration `mapstructure:"action_timeout" description:"timeout for async operations"` + ReconciliationOperationTimeout time.Duration `mapstructure:"reconciliation_operation_timeout" description:"the maximum allowed timeout for auto rescheduling of operation actions"` + + CleanupInterval time.Duration `mapstructure:"cleanup_interval" description:"cleanup interval of old operations"` + Lifespan time.Duration `mapstructure:"lifespan" description:"after that time is passed since its creation, the operation can be cleaned up by the maintainer"` + + ReschedulingInterval time.Duration `mapstructure:"rescheduling_interval" description:"the interval between auto rescheduling of operation actions"` + PollingInterval time.Duration `mapstructure:"polling_interval" description:"the interval between polls for async requests"` + + DefaultPoolSize int `mapstructure:"default_pool_size" description:"default worker pool size"` + Pools []PoolSettings `mapstructure:"pools" description:"defines the different available worker pools"` } // DefaultSettings returns default values for API settings func DefaultSettings() *Settings { return &Settings{ - JobTimeout: defaultJobTimeout, - MarkOrphansInterval: defaultMarkOrphansInterval, - CleanupInterval: defaultCleanupInterval, - ExpirationTime: defaultExpirationTime, - DefaultPoolSize: 20, - Pools: []PoolSettings{}, - ScheduledDeletionTimeout: 12 * time.Hour, - ReschedulingInterval: 1 * time.Second, - PollingInterval: 1 * time.Second, + ActionTimeout: defaultActionTimeout, + CleanupInterval: defaultCleanupInterval, + Lifespan: defaultOperationLifespan, + DefaultPoolSize: 20, + Pools: []PoolSettings{}, + ReconciliationOperationTimeout: defaultOperationLifespan, + + ReschedulingInterval: 1 * time.Second, + PollingInterval: 1 * time.Second, } } // Validate validates the Operations settings func (s *Settings) Validate() error { - if s.JobTimeout <= minTimePeriod { - return fmt.Errorf("validate Settings: JobTimeout must be larger than %s", minTimePeriod) - } - if s.MarkOrphansInterval <= minTimePeriod { - return fmt.Errorf("validate Settings: MarkOrphanscInterval must be larger than %s", minTimePeriod) + if s.ActionTimeout <= minTimePeriod { + return fmt.Errorf("validate Settings: ActionTimeout must be larger than %s", minTimePeriod) } if s.CleanupInterval <= minTimePeriod { return fmt.Errorf("validate Settings: CleanupInterval must be larger than %s", minTimePeriod) } - if s.ScheduledDeletionTimeout <= minTimePeriod { - return fmt.Errorf("validate Settings: ScheduledDeletionTimeout must be larger than %s", minTimePeriod) + if s.ReconciliationOperationTimeout <= minTimePeriod { + return fmt.Errorf("validate Settings: ReconciliationOperationTimeout must be larger than %s", minTimePeriod) } if s.ReschedulingInterval <= minTimePeriod { return fmt.Errorf("validate Settings: ReschedulingInterval must be larger than %s", minTimePeriod) diff --git a/operations/maintainer.go b/operations/maintainer.go index 9562f7309..dba0ac4ad 100644 --- a/operations/maintainer.go +++ b/operations/maintainer.go @@ -18,6 +18,7 @@ package operations import ( "context" + "sync" "time" "github.com/Peripli/service-manager/pkg/log" @@ -27,89 +28,303 @@ import ( "github.com/Peripli/service-manager/storage" ) +const ( + initialOperationsLockIndex = 200 + ZeroTime = "0001-01-01 00:00:00+00" +) + +// MaintainerFunctor represents a named maintainer function which runs over a pre-defined period +type MaintainerFunctor struct { + name string + interval time.Duration + execute func() +} + // Maintainer ensures that operations old enough are deleted // and that no orphan operations are left in the DB due to crashes/restarts of SM type Maintainer struct { - smCtx context.Context - repository storage.Repository - jobTimeout time.Duration - markOrphansInterval time.Duration - cleanupInterval time.Duration - operationExpirationTime time.Duration + smCtx context.Context + repository storage.Repository + scheduler *Scheduler + + settings *Settings + wg *sync.WaitGroup + + functors []MaintainerFunctor + operationLockers map[string]storage.Locker } // NewMaintainer constructs a Maintainer -func NewMaintainer(smCtx context.Context, repository storage.Repository, options *Settings) *Maintainer { - return &Maintainer{ - smCtx: smCtx, - repository: repository, - jobTimeout: options.JobTimeout, - markOrphansInterval: options.MarkOrphansInterval, - cleanupInterval: options.CleanupInterval, - operationExpirationTime: options.ExpirationTime, +func NewMaintainer(smCtx context.Context, repository storage.TransactionalRepository, lockerCreatorFunc storage.LockerCreatorFunc, options *Settings, wg *sync.WaitGroup) *Maintainer { + maintainer := &Maintainer{ + smCtx: smCtx, + repository: repository, + scheduler: NewScheduler(smCtx, repository, options, options.DefaultPoolSize, wg), + settings: options, + wg: wg, } + + maintainer.functors = []MaintainerFunctor{ + { + name: "cleanupExternalOperations", + execute: maintainer.cleanupExternalOperations, + interval: options.CleanupInterval, + }, + { + name: "cleanupInternalSuccessfulOperations", + execute: maintainer.cleanupInternalSuccessfulOperations, + interval: options.CleanupInterval, + }, + { + name: "cleanupInternalFailedOperations", + execute: maintainer.cleanupInternalFailedOperations, + interval: options.CleanupInterval, + }, + { + name: "markOrphanOperationsFailed", + execute: maintainer.markOrphanOperationsFailed, + interval: options.CleanupInterval, + }, + { + name: "rescheduleUnprocessedOperations", + execute: maintainer.rescheduleUnprocessedOperations, + interval: options.ActionTimeout / 2, + }, + { + name: "rescheduleOrphanMitigationOperations", + execute: maintainer.rescheduleOrphanMitigationOperations, + interval: options.ActionTimeout / 2, + }, + } + + operationLockers := make(map[string]storage.Locker) + advisoryLockStartIndex := initialOperationsLockIndex + for _, functor := range maintainer.functors { + operationLockers[functor.name] = lockerCreatorFunc(advisoryLockStartIndex) + advisoryLockStartIndex++ + } + + maintainer.operationLockers = operationLockers + + return maintainer } // Run starts the two recurring jobs responsible for cleaning up operations which are too old // and deleting orphan operations func (om *Maintainer) Run() { - om.cleanUpOldOperations() - om.markOrphanOperationsFailed() + for _, functor := range om.functors { + functor := functor + maintainerFunc := func() { + log.C(om.smCtx).Infof("Attempting to retrieve lock for maintainer functor (%s)", functor.name) + err := om.operationLockers[functor.name].TryLock(om.smCtx) + if err != nil { + log.C(om.smCtx).Infof("Failed to retrieve lock for maintainer functor (%s): %s", functor.name, err) + return + } + defer func() { + err := om.operationLockers[functor.name].Unlock(om.smCtx) + log.C(om.smCtx).Warnf("Could not unlock for maintainer functor (%s): %s", functor.name, err) + }() + log.C(om.smCtx).Infof("Successfully retrieved lock for maintainer functor (%s)", functor.name) + + functor.execute() + } - go om.processOldOperations() - go om.processOrphanOperations() + go maintainerFunc() + go om.processOperations(maintainerFunc, functor.name, functor.interval) + } } -// processOldOperations cleans up periodically all operations which are older than some specified time -func (om *Maintainer) processOldOperations() { - ticker := time.NewTicker(om.cleanupInterval) +func (om *Maintainer) processOperations(functor func(), functorName string, interval time.Duration) { + ticker := time.NewTicker(interval) defer ticker.Stop() for { select { case <-ticker.C: - om.cleanUpOldOperations() + func() { + om.wg.Add(1) + defer om.wg.Done() + log.C(om.smCtx).Infof("Starting execution of maintainer functor (%s)", functorName) + functor() + log.C(om.smCtx).Infof("Finished execution of maintainer functor (%s)", functorName) + }() case <-om.smCtx.Done(): ticker.Stop() - log.C(om.smCtx).Info("Server is shutting down. Stopping old operations maintainer...") + log.C(om.smCtx).Info("Server is shutting down. Stopping operations maintainer...") return } } } -// processOrphanOperations periodically checks for operations which are stuck in state IN_PROGRESS and updates their status to FAILED -func (om *Maintainer) processOrphanOperations() { - ticker := time.NewTicker(om.markOrphansInterval) - defer ticker.Stop() - for { - select { - case <-ticker.C: - om.markOrphanOperationsFailed() - case <-om.smCtx.Done(): - ticker.Stop() - log.C(om.smCtx).Info("Server is shutting down. Stopping stuck operations maintainer...") - return - } +// cleanUpExternalOperations cleans up periodically all external operations which are older than some specified time +func (om *Maintainer) cleanupExternalOperations() { + criteria := []query.Criterion{ + query.ByField(query.NotEqualsOperator, "platform_id", types.SMPlatform), + // check if operation hasn't been updated for the operation's maximum allowed time to live in DB + query.ByField(query.LessThanOperator, "updated_at", util.ToRFCNanoFormat(time.Now().Add(-om.settings.Lifespan))), } + + if err := om.repository.Delete(om.smCtx, types.OperationType, criteria...); err != nil && err != util.ErrNotFoundInStorage { + log.D().Debugf("Failed to cleanup operations: %s", err) + return + } + log.D().Debug("Finished cleaning up external operations") } -func (om *Maintainer) cleanUpOldOperations() { +// cleanupInternalSuccessfulOperations cleans up all successful internal operations which are older than some specified time +func (om *Maintainer) cleanupInternalSuccessfulOperations() { criteria := []query.Criterion{ + query.ByField(query.EqualsOperator, "platform_id", types.SMPlatform), + query.ByField(query.EqualsOperator, "state", string(types.SUCCEEDED)), + // check if operation hasn't been updated for the operation's maximum allowed time to live in DB + query.ByField(query.LessThanOperator, "updated_at", util.ToRFCNanoFormat(time.Now().Add(-om.settings.Lifespan))), + } + + if err := om.repository.Delete(om.smCtx, types.OperationType, criteria...); err != nil && err != util.ErrNotFoundInStorage { + log.D().Debugf("Failed to cleanup operations: %s", err) + return + } + log.D().Debug("Finished cleaning up successful internal operations") +} + +// cleanupInternalFailedOperations cleans up all failed internal operations which are older than some specified time +func (om *Maintainer) cleanupInternalFailedOperations() { + criteria := []query.Criterion{ + query.ByField(query.EqualsOperator, "platform_id", types.SMPlatform), query.ByField(query.EqualsOperator, "state", string(types.FAILED)), - query.ByField(query.NotEqualsOperator, "platform_id", types.SMPlatform), - query.ByField(query.LessThanOperator, "created_at", util.ToRFCNanoFormat(time.Now().Add(-om.operationExpirationTime))), + query.ByField(query.EqualsOperator, "reschedule", "false"), + query.ByField(query.EqualsOperator, "deletion_scheduled", ZeroTime), + // check if operation hasn't been updated for the operation's maximum allowed time to live in DB + query.ByField(query.LessThanOperator, "updated_at", util.ToRFCNanoFormat(time.Now().Add(-om.settings.Lifespan))), } - if err := om.repository.Delete(om.smCtx, types.OperationType, criteria...); err != nil { + if err := om.repository.Delete(om.smCtx, types.OperationType, criteria...); err != nil && err != util.ErrNotFoundInStorage { log.D().Debugf("Failed to cleanup operations: %s", err) return } - log.D().Debug("Successfully cleaned up operations") + log.D().Debug("Finished cleaning up failed internal operations") +} + +// rescheduleUnprocessedOperations reschedules IN_PROGRESS operations which are reschedulable, not scheduled for deletion and no goroutine is processing at the moment +func (om *Maintainer) rescheduleUnprocessedOperations() { + criteria := []query.Criterion{ + query.ByField(query.EqualsOperator, "platform_id", types.SMPlatform), + query.ByField(query.EqualsOperator, "state", string(types.IN_PROGRESS)), + query.ByField(query.EqualsOperator, "reschedule", "true"), + query.ByField(query.EqualsOperator, "deletion_scheduled", ZeroTime), + // check if operation hasn't been updated for the operation's maximum allowed time to execute + query.ByField(query.LessThanOperator, "updated_at", util.ToRFCNanoFormat(time.Now().Add(-om.settings.ActionTimeout))), + // check if operation is still eligible for processing + query.ByField(query.GreaterThanOperator, "created_at", util.ToRFCNanoFormat(time.Now().Add(-om.settings.ReconciliationOperationTimeout))), + } + + objectList, err := om.repository.List(om.smCtx, types.OperationType, criteria...) + if err != nil { + log.D().Debugf("Failed to fetch unprocessed operations: %s", err) + return + } + + operations := objectList.(*types.Operations) + for i := 0; i < operations.Len(); i++ { + operation := operations.ItemAt(i).(*types.Operation) + logger := log.ForContext(om.smCtx).WithField(log.FieldCorrelationID, operation.CorrelationID) + + var action storageAction + + switch operation.Type { + case types.CREATE: + object, err := om.repository.Get(om.smCtx, operation.ResourceType, query.ByField(query.EqualsOperator, "id", operation.ResourceID)) + if err != nil { + logger.Warnf("Failed to fetch resource with ID (%s) for operation with ID (%s): %s", operation.ResourceID, operation.ID, err) + return + } + + action = func(ctx context.Context, repository storage.Repository) (types.Object, error) { + object, err := repository.Create(ctx, object) + return object, util.HandleStorageError(err, operation.ResourceType.String()) + } + /* TODO: Uncomment and adapt once update flow is enabled + case types.UPDATE: + action = func(ctx context.Context, repository storage.Repository) (types.Object, error) { + object, err := repository.Update(ctx, objFromDB, labelChanges, criteria...) + return object, util.HandleStorageError(err, operation.ResourceType.String()) + } + */ + case types.DELETE: + byID := query.ByField(query.EqualsOperator, "id", operation.ResourceID) + + action = func(ctx context.Context, repository storage.Repository) (types.Object, error) { + err := repository.Delete(ctx, operation.ResourceType, byID) + if err != nil { + if err == util.ErrNotFoundInStorage { + return nil, nil + } + return nil, util.HandleStorageError(err, operation.ResourceType.String()) + } + return nil, nil + } + } + + if err := om.scheduler.ScheduleAsyncStorageAction(om.smCtx, operation, action); err != nil { + logger.Warnf("Failed to reschedule unprocessed operation with ID (%s): %s", operation.ID, err) + } + } + + log.D().Debug("Finished rescheduling unprocessed operations") } +// rescheduleOrphanMitigationOperations reschedules orphan mitigation operations which no goroutine is processing at the moment +func (om *Maintainer) rescheduleOrphanMitigationOperations() { + criteria := []query.Criterion{ + query.ByField(query.EqualsOperator, "platform_id", types.SMPlatform), + query.ByField(query.NotEqualsOperator, "deletion_scheduled", ZeroTime), + // check if operation hasn't been updated for the operation's maximum allowed time to execute + query.ByField(query.LessThanOperator, "updated_at", util.ToRFCNanoFormat(time.Now().Add(-om.settings.ActionTimeout))), + // check if operation is still eligible for processing + query.ByField(query.GreaterThanOperator, "created_at", util.ToRFCNanoFormat(time.Now().Add(-om.settings.ReconciliationOperationTimeout))), + } + + objectList, err := om.repository.List(om.smCtx, types.OperationType, criteria...) + if err != nil { + log.D().Debugf("Failed to fetch unprocessed orphan mitigation operations: %s", err) + return + } + + operations := objectList.(*types.Operations) + for i := 0; i < operations.Len(); i++ { + operation := operations.ItemAt(i).(*types.Operation) + logger := log.ForContext(om.smCtx).WithField(log.FieldCorrelationID, operation.CorrelationID) + + byID := query.ByField(query.EqualsOperator, "id", operation.ResourceID) + + action := func(ctx context.Context, repository storage.Repository) (types.Object, error) { + err := repository.Delete(ctx, operation.ResourceType, byID) + if err != nil { + if err == util.ErrNotFoundInStorage { + return nil, nil + } + return nil, util.HandleStorageError(err, operation.ResourceType.String()) + } + return nil, nil + } + + if err := om.scheduler.ScheduleAsyncStorageAction(om.smCtx, operation, action); err != nil { + logger.Warnf("Failed to reschedule unprocessed orphan mitigation operation with ID (%s): %s", operation.ID, err) + } + } + + log.D().Debug("Finished rescheduling unprocessed orphan mitigation operations") +} + +// markOrphanOperationsFailed checks for operations which are stuck in state IN_PROGRESS, updates their status to FAILED and schedules a delete action func (om *Maintainer) markOrphanOperationsFailed() { criteria := []query.Criterion{ + query.ByField(query.EqualsOperator, "platform_id", types.SMPlatform), query.ByField(query.EqualsOperator, "state", string(types.IN_PROGRESS)), - query.ByField(query.LessThanOperator, "created_at", util.ToRFCNanoFormat(time.Now().Add(-om.jobTimeout))), + query.ByField(query.EqualsOperator, "reschedule", "false"), + query.ByField(query.EqualsOperator, "deletion_scheduled", ZeroTime), + // check if operation hasn't been updated for the operation's maximum allowed time to execute + query.ByField(query.LessThanOperator, "updated_at", util.ToRFCNanoFormat(time.Now().Add(-om.settings.ActionTimeout))), } objectList, err := om.repository.List(om.smCtx, types.OperationType, criteria...) @@ -121,12 +336,31 @@ func (om *Maintainer) markOrphanOperationsFailed() { operations := objectList.(*types.Operations) for i := 0; i < operations.Len(); i++ { operation := operations.ItemAt(i).(*types.Operation) - operation.State = types.FAILED + logger := log.ForContext(om.smCtx).WithField(log.FieldCorrelationID, operation.CorrelationID) + + operation.DeletionScheduled = time.Now() if _, err := om.repository.Update(om.smCtx, operation, query.LabelChanges{}); err != nil { - log.D().Debugf("Failed to update orphan operation with ID (%s) state to FAILED: %s", operation.ID, err) + logger.Warnf("Failed to update orphan operation with ID (%s) state to FAILED: %s", operation.ID, err) + continue + } + + byID := query.ByField(query.EqualsOperator, "id", operation.ResourceID) + action := func(ctx context.Context, repository storage.Repository) (types.Object, error) { + err := repository.Delete(ctx, operation.ResourceType, byID) + if err != nil { + if err == util.ErrNotFoundInStorage { + return nil, nil + } + return nil, util.HandleStorageError(err, operation.ResourceType.String()) + } + return nil, nil + } + + if err := om.scheduler.ScheduleAsyncStorageAction(om.smCtx, operation, action); err != nil { + logger.Warnf("Failed to schedule delete action for operation with ID (%s): %s", operation.ID, err) } } - log.D().Debug("Successfully marked orphan operations as failed") + log.D().Debug("Finished marking orphan operations as failed") } diff --git a/operations/scheduler.go b/operations/scheduler.go index ac0d1334e..3eaf50f75 100644 --- a/operations/scheduler.go +++ b/operations/scheduler.go @@ -37,25 +37,25 @@ type storageAction func(ctx context.Context, repository storage.Repository) (typ // Scheduler is responsible for storing Operation entities in the DB // and also for spawning goroutines to execute the respective DB transaction asynchronously type Scheduler struct { - smCtx context.Context - repository storage.TransactionalRepository - workers chan struct{} - jobTimeout time.Duration - deletionTimeout time.Duration - reschedulingDelay time.Duration - wg *sync.WaitGroup + smCtx context.Context + repository storage.TransactionalRepository + workers chan struct{} + actionTimeout time.Duration + reconciliationOperationTimeout time.Duration + reschedulingDelay time.Duration + wg *sync.WaitGroup } // NewScheduler constructs a Scheduler func NewScheduler(smCtx context.Context, repository storage.TransactionalRepository, settings *Settings, poolSize int, wg *sync.WaitGroup) *Scheduler { return &Scheduler{ - smCtx: smCtx, - repository: repository, - workers: make(chan struct{}, poolSize), - jobTimeout: settings.JobTimeout, - deletionTimeout: settings.ScheduledDeletionTimeout, - reschedulingDelay: settings.ReschedulingInterval, - wg: wg, + smCtx: smCtx, + repository: repository, + workers: make(chan struct{}, poolSize), + actionTimeout: settings.ActionTimeout, + reconciliationOperationTimeout: settings.ReconciliationOperationTimeout, + reschedulingDelay: settings.ReschedulingInterval, + wg: wg, } } @@ -125,7 +125,7 @@ func (s *Scheduler) ScheduleAsyncStorageAction(ctx context.Context, operation *t return } - stateCtxWithOpAndTimeout, timeoutCtxCancel := context.WithTimeout(stateCtxWithOp, s.jobTimeout) + stateCtxWithOpAndTimeout, timeoutCtxCancel := context.WithTimeout(stateCtxWithOp, s.actionTimeout) defer timeoutCtxCancel() go func() { select { @@ -180,7 +180,7 @@ func (s *Scheduler) checkForConcurrentOperations(ctx context.Context, operation // for the outside world job timeout would have expired if the last update happened > job timeout time ago (this is worst case) // an "old" updated_at means that for a while nobody was processing this operation - isLastOpInProgress := lastOperation.State == types.IN_PROGRESS && time.Now().Before(lastOperation.UpdatedAt.Add(s.jobTimeout)) + isLastOpInProgress := lastOperation.State == types.IN_PROGRESS && time.Now().Before(lastOperation.UpdatedAt.Add(s.actionTimeout)) isAReschedule := lastOperation.Reschedule && operation.Reschedule @@ -402,7 +402,7 @@ func (s *Scheduler) handleActionResponse(ctx context.Context, actionObject types func (s *Scheduler) handleActionResponseFailure(ctx context.Context, actionError error, opAfterJob *types.Operation) error { if err := s.repository.InTransaction(ctx, func(ctx context.Context, storage storage.Repository) error { - if opErr := updateOperationState(ctx, s.repository, opAfterJob, types.FAILED, actionError); opErr != nil { + if opErr := updateOperationState(ctx, storage, opAfterJob, types.FAILED, actionError); opErr != nil { return fmt.Errorf("setting new operation state failed: %s", opErr) } // after a failed FAILED CREATE operation, update the ready field to false @@ -420,7 +420,7 @@ func (s *Scheduler) handleActionResponseFailure(ctx context.Context, actionError // we want to schedule deletion if the operation is marked for deletion and the deletion timeout is not yet reached isDeleteRescheduleRequired := !opAfterJob.DeletionScheduled.IsZero() && - time.Now().UTC().Before(opAfterJob.DeletionScheduled.Add(s.deletionTimeout)) && + time.Now().UTC().Before(opAfterJob.DeletionScheduled.Add(s.reconciliationOperationTimeout)) && opAfterJob.State != types.SUCCEEDED if isDeleteRescheduleRequired { diff --git a/pkg/sm/sm.go b/pkg/sm/sm.go index b58c65be1..2ba475f20 100644 --- a/pkg/sm/sm.go +++ b/pkg/sm/sm.go @@ -106,7 +106,7 @@ func New(ctx context.Context, cancel context.CancelFunc, e env.Environment, cfg } // Decorate the storage with credentials encryption/decryption - encryptingDecorator := storage.EncryptingDecorator(ctx, &security.AESEncrypter{}, smStorage) + encryptingDecorator := storage.EncryptingDecorator(ctx, &security.AESEncrypter{}, smStorage, postgres.EncryptingLocker(smStorage)) // Initialize the storage with graceful termination var transactionalRepository storage.TransactionalRepository @@ -155,7 +155,11 @@ func New(ctx context.Context, cancel context.CancelFunc, e env.Environment, cfg Settings: *cfg.Storage, } - operationMaintainer := operations.NewMaintainer(ctx, interceptableRepository, cfg.Operations) + postgresLockerCreatorFunc := func(advisoryIndex int) storage.Locker { + return &postgres.Locker{Storage: smStorage, AdvisoryIndex: advisoryIndex} + } + + operationMaintainer := operations.NewMaintainer(ctx, interceptableRepository, postgresLockerCreatorFunc, cfg.Operations, waitGroup) osbClientProvider := osb.NewBrokerClientProvider(cfg.HTTPClient.SkipSSLValidation, int(cfg.HTTPClient.ResponseHeaderTimeout.Seconds())) smb := &ServiceManagerBuilder{ diff --git a/storage/encrypting_repository.go b/storage/encrypting_repository.go index 3d3e1849a..6e317c53c 100644 --- a/storage/encrypting_repository.go +++ b/storage/encrypting_repository.go @@ -14,14 +14,23 @@ import ( "github.com/Peripli/service-manager/pkg/types" ) -// KeyStore interface for encryption key operations -type KeyStore interface { +// LockerCreatorFunc is a function building a storage.Locker with a specific advisory index +type LockerCreatorFunc func(advisoryIndex int) Locker + +// Locker provides basic Lock/Unlock functionality +type Locker interface { // Lock locks the storage so that only one process can manipulate the encryption key. Returns an error if the process has already acquired the lock Lock(ctx context.Context) error + // TryLock tries to lock the storage so that only one process can manipulate the encryption key. Returns an error if the process has already acquired the lock + TryLock(ctx context.Context) error + // Unlock releases the acquired lock. Unlock(ctx context.Context) error +} +// KeyStore interface for encryption key operations +type KeyStore interface { // GetEncryptionKey returns the encryption key from the storage after applying the specified transformation function GetEncryptionKey(ctx context.Context, transformationFunc func(context.Context, []byte, []byte) ([]byte, error)) ([]byte, error) @@ -30,16 +39,16 @@ type KeyStore interface { } // EncryptingDecorator creates a TransactionalRepositoryDecorator that can be used to add encrypting/decrypting logic to a TransactionalRepository -func EncryptingDecorator(ctx context.Context, encrypter security.Encrypter, keyStore KeyStore) TransactionalRepositoryDecorator { +func EncryptingDecorator(ctx context.Context, encrypter security.Encrypter, keyStore KeyStore, locker Locker) TransactionalRepositoryDecorator { return func(next TransactionalRepository) (TransactionalRepository, error) { ctx, cancelFunc := context.WithTimeout(ctx, 2*time.Second) defer cancelFunc() - if err := keyStore.Lock(ctx); err != nil { + if err := locker.Lock(ctx); err != nil { return nil, err } defer func() { - if err := keyStore.Unlock(ctx); err != nil { + if err := locker.Unlock(ctx); err != nil { log.C(ctx).WithError(err).Error("error while unlocking keystore") } }() diff --git a/storage/interceptors/smaap_service_binding_interceptor.go b/storage/interceptors/smaap_service_binding_interceptor.go index 1492a26cc..053a468a4 100644 --- a/storage/interceptors/smaap_service_binding_interceptor.go +++ b/storage/interceptors/smaap_service_binding_interceptor.go @@ -203,13 +203,13 @@ func (i *ServiceBindingInterceptor) AroundTxCreate(f storage.InterceptCreateArou log.C(ctx).Infof("Successful synchronous bind %s to broker %s returned response %s", logBindRequest(bindRequest), broker.Name, logBindResponse(bindResponse)) } - } - object, err := f(ctx, obj) - if err != nil { - return nil, err + object, err := f(ctx, obj) + if err != nil { + return nil, err + } + binding = object.(*types.ServiceBinding) } - binding = object.(*types.ServiceBinding) if operation.Reschedule { if err := i.pollServiceBinding(ctx, osbClient, binding, operation, broker.ID, service.CatalogID, plan.CatalogID, operation.ExternalID, true); err != nil { diff --git a/storage/interceptors/smaap_service_instance_interceptor.go b/storage/interceptors/smaap_service_instance_interceptor.go index 6d5465b33..de3bef076 100644 --- a/storage/interceptors/smaap_service_instance_interceptor.go +++ b/storage/interceptors/smaap_service_instance_interceptor.go @@ -183,13 +183,13 @@ func (i *ServiceInstanceInterceptor) AroundTxCreate(f storage.InterceptCreateAro logProvisionRequest(provisionRequest), broker.Name, logProvisionResponse(provisionResponse)) } - } - object, err := f(ctx, obj) - if err != nil { - return nil, err + object, err := f(ctx, obj) + if err != nil { + return nil, err + } + instance = object.(*types.ServiceInstance) } - instance = object.(*types.ServiceInstance) if operation.Reschedule { if err := i.pollServiceInstance(ctx, osbClient, instance, operation, broker.ID, service.CatalogID, plan.CatalogID, operation.ExternalID, true); err != nil { diff --git a/storage/postgres/keystore.go b/storage/postgres/keystore.go index 001ac26f4..523efcfeb 100644 --- a/storage/postgres/keystore.go +++ b/storage/postgres/keystore.go @@ -19,7 +19,6 @@ package postgres import ( "context" "database/sql" - "fmt" "time" "github.com/Peripli/service-manager/storage" @@ -33,6 +32,11 @@ const ( SafeTable = "safe" ) +// EncryptingLocker builds an encrypting storage.Locker with the pre-defined lock index +func EncryptingLocker(storage *Storage) storage.Locker { + return &Locker{Storage: storage, AdvisoryIndex: securityLockIndex} +} + // Safe represents a secret entity type Safe struct { Secret []byte `db:"secret"` @@ -72,42 +76,6 @@ func (s *Safe) LabelEntity() PostgresLabel { return nil } -// Lock acquires a database lock so that only one process can manipulate the encryption key. -// Returns an error if the process has already acquired the lock -func (s *Storage) Lock(ctx context.Context) error { - s.checkOpen() - - s.mutex.Lock() - defer s.mutex.Unlock() - if s.isLocked { - return fmt.Errorf("lock is already acquired") - } - if _, err := s.db.ExecContext(ctx, "SELECT pg_advisory_lock($1)", securityLockIndex); err != nil { - return err - } - s.isLocked = true - - return nil -} - -// Unlock releases the database lock. -func (s *Storage) Unlock(ctx context.Context) error { - s.checkOpen() - - s.mutex.Lock() - defer s.mutex.Unlock() - if !s.isLocked { - return nil - } - - if _, err := s.db.ExecContext(ctx, "SELECT pg_advisory_unlock($1)", securityLockIndex); err != nil { - return err - } - s.isLocked = false - - return nil -} - // GetEncryptionKey returns the encryption key used to encrypt the credentials for brokers func (s *Storage) GetEncryptionKey(ctx context.Context, transformationFunc func(context.Context, []byte, []byte) ([]byte, error)) ([]byte, error) { s.checkOpen() diff --git a/storage/postgres/keystore_test.go b/storage/postgres/keystore_test.go index 9d745b875..392d7f891 100644 --- a/storage/postgres/keystore_test.go +++ b/storage/postgres/keystore_test.go @@ -192,60 +192,6 @@ var _ = Describe("Secured Storage", func() { Expect(err).To(HaveOccurred()) }) }) - }) - - Describe("Lock", func() { - AfterEach(func() { - s.Unlock(context.TODO()) - }) - Context("When lock is already acquired", func() { - BeforeEach(func() { - mock.ExpectExec("SELECT pg_advisory_lock*").WillReturnResult(sqlmock.NewResult(1, 1)) - }) - - It("Should return an error", func() { - err := s.Lock(context.TODO()) - Expect(err).ToNot(HaveOccurred()) - - err = s.Lock(context.TODO()) - Expect(err).To(HaveOccurred()) - }) - }) - - Context("When lock is not yet acquired", func() { - BeforeEach(func() { - mock.ExpectExec("SELECT").WillReturnResult(sqlmock.NewResult(int64(1), int64(1))) - }) - - It("Should acquire lock", func() { - err := s.Lock(context.TODO()) - Expect(err).ToNot(HaveOccurred()) - }) - }) - }) - - Describe("Unlock", func() { - Context("When lock is not acquired", func() { - It("Should return nil", func() { - err := s.Unlock(context.TODO()) - Expect(err).ToNot(HaveOccurred()) - }) - }) - - Context("When lock is acquired", func() { - BeforeEach(func() { - mock.ExpectExec("SELECT pg_advisory_lock*").WillReturnResult(sqlmock.NewResult(1, 1)) - mock.ExpectExec("SELECT pg_advisory_unlock*").WithArgs(sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(1, 1)) - }) - - It("Should release lock", func() { - err := s.Lock(context.TODO()) - Expect(err).ToNot(HaveOccurred()) - - err = s.Unlock(context.TODO()) - Expect(err).To(BeNil()) - }) - }) }) }) diff --git a/storage/postgres/locker.go b/storage/postgres/locker.go new file mode 100644 index 000000000..47dea8d7a --- /dev/null +++ b/storage/postgres/locker.go @@ -0,0 +1,168 @@ +/* + * Copyright 2018 The Service Manager Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package postgres + +import ( + "context" + "database/sql" + "errors" + "fmt" + + "github.com/Peripli/service-manager/pkg/log" +) + +var ErrLockAcquisition = errors.New("failed to acquire lock") +var ErrUnlockAcquisition = errors.New("failed to unlock") + +type Locker struct { + *Storage + isLocked bool + AdvisoryIndex int + lockerCon *sql.Conn +} + +// Lock acquires a database lock so that only one process can manipulate the encryption key. +// Returns an error if the process has already acquired the lock +func (l *Locker) Lock(ctx context.Context) error { + log.C(ctx).Infof("Attempting to lock advisory lock with index (%d)", l.AdvisoryIndex) + l.mutex.Lock() + defer l.mutex.Unlock() + if l.isLocked || l.lockerCon != nil { + log.C(ctx).Infof("Locker with advisory index (%d) is locked, so no attempt to lock it", l.AdvisoryIndex) + return fmt.Errorf("lock is already acquired") + } + + var err error + if l.lockerCon, err = l.db.Conn(ctx); err != nil { + return err + } + + log.C(ctx).Infof("Executing lock of locker with advisory index (%d)", l.AdvisoryIndex) + rows, err := l.lockerCon.QueryContext(ctx, "SELECT pg_advisory_lock($1)", l.AdvisoryIndex) + if err != nil { + l.release(ctx) + log.C(ctx).Infof("Failed to lock locker with advisory index (%d)", l.AdvisoryIndex) + return err + } + defer func() { + if err := rows.Close(); err != nil { + log.C(ctx).WithError(err).Error("Could not close rows") + } + }() + + l.isLocked = true + + log.C(ctx).Infof("Successfully locked locker with advisory index (%d)", l.AdvisoryIndex) + return nil +} + +// Lock acquires a database lock so that only one process can manipulate the encryption key. +// Returns an error if the process has already acquired the lock +func (l *Locker) TryLock(ctx context.Context) error { + log.C(ctx).Infof("Attempting to try_lock advisory lock with index (%d)", l.AdvisoryIndex) + l.mutex.Lock() + defer l.mutex.Unlock() + if l.isLocked || l.lockerCon != nil { + log.C(ctx).Infof("Locker with advisory index (%d) is locked, so no attempt to try_lock it", l.AdvisoryIndex) + return fmt.Errorf("try_lock is already acquired") + } + + var err error + if l.lockerCon, err = l.db.Conn(ctx); err != nil { + return err + } + + log.C(ctx).Infof("Executing try_lock of locker with advisory index (%d)", l.AdvisoryIndex) + rows, err := l.lockerCon.QueryContext(ctx, "SELECT pg_try_advisory_lock($1)", l.AdvisoryIndex) + if err != nil { + l.release(ctx) + log.C(ctx).Infof("Failed to try_lock locker with advisory index (%d)", l.AdvisoryIndex) + return err + } + defer func() { + if err := rows.Close(); err != nil { + log.C(ctx).WithError(err).Error("Could not close rows") + } + }() + + var locked bool + for rows.Next() { + if err = rows.Scan(&locked); err != nil { + l.release(ctx) + return err + } + } + + if !locked { + l.release(ctx) + log.C(ctx).Infof("Failed to try_lock locker with advisory index (%d) - either already locked or failed to lock", l.AdvisoryIndex) + return ErrLockAcquisition + } + + l.isLocked = true + + log.C(ctx).Infof("Successfully try_locked locker with advisory index (%d)", l.AdvisoryIndex) + return nil +} + +// Unlock releases the database lock. +func (l *Locker) Unlock(ctx context.Context) error { + log.C(ctx).Infof("Attempting to unlock advisory lock with index (%d)", l.AdvisoryIndex) + l.mutex.Lock() + defer l.mutex.Unlock() + if !l.isLocked || l.lockerCon == nil { + log.C(ctx).Infof("Locker with advisory index (%d) is not locked, so no attempt to unlock it", l.AdvisoryIndex) + return nil + } + defer l.release(ctx) + + log.C(ctx).Infof("Executing unlock of locker with advisory index (%d)", l.AdvisoryIndex) + rows, err := l.lockerCon.QueryContext(ctx, "SELECT pg_advisory_unlock($1)", l.AdvisoryIndex) + if err != nil { + log.C(ctx).Infof("Failed to unlock locker with advisory index (%d)", l.AdvisoryIndex) + return err + } + defer func() { + if err := rows.Close(); err != nil { + log.C(ctx).WithError(err).Error("Could not close rows") + } + }() + + var unlocked bool + for rows.Next() { + if err = rows.Scan(&unlocked); err != nil { + return err + } + } + + if !unlocked { + log.C(ctx).Infof("Failed to unlock locker with advisory index (%d) - either already unlocked or failed to unlock", l.AdvisoryIndex) + return ErrUnlockAcquisition + } + + l.isLocked = false + + log.C(ctx).Infof("Successfully unlocked locker with advisory index (%d)", l.AdvisoryIndex) + return nil +} + +func (l *Locker) release(ctx context.Context) { + if err := l.lockerCon.Close(); err != nil { + log.C(ctx).WithError(err).Error("Could not release connection") + } + l.lockerCon = nil +} diff --git a/storage/postgres/locker_test.go b/storage/postgres/locker_test.go new file mode 100644 index 000000000..38203d215 --- /dev/null +++ b/storage/postgres/locker_test.go @@ -0,0 +1,160 @@ +/* + * Copyright 2018 The Service Manager Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package postgres + +import ( + "context" + "crypto/rand" + "database/sql" + + "github.com/Peripli/service-manager/storage" + + "github.com/Peripli/service-manager/pkg/security" + + "github.com/Peripli/service-manager/pkg/security/securityfakes" + + "github.com/DATA-DOG/go-sqlmock" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("Storage Locker", func() { + var s *Storage + var locker *Locker + var mockdb *sql.DB + var mock sqlmock.Sqlmock + + var envEncryptionKey []byte + + var fakeEncrypter *securityfakes.FakeEncrypter + + sucessLockRow := func() *sqlmock.Rows { return sqlmock.NewRows([]string{"pg_advisory_lock"}).FromCSVString("true") } + failLockRow := func() *sqlmock.Rows { return sqlmock.NewRows([]string{"pg_advisory_lock"}).FromCSVString("false") } + sucessUnlockRow := func() *sqlmock.Rows { return sqlmock.NewRows([]string{"pg_advisory_unlock"}).FromCSVString("true") } + + BeforeEach(func() { + envEncryptionKey = make([]byte, 32) + _, err := rand.Read(envEncryptionKey) + Expect(err).ToNot(HaveOccurred()) + + mockdb, mock, err = sqlmock.New() + Expect(err).ToNot(HaveOccurred()) + + s = &Storage{ + ConnectFunc: func(driver string, url string) (*sql.DB, error) { + return mockdb, nil + }, + } + locker = &Locker{ + Storage: s, + AdvisoryIndex: 1, + } + mock.ExpectQuery(`SELECT CURRENT_DATABASE()`).WillReturnRows(sqlmock.NewRows([]string{"mock"}).FromCSVString("mock")) + mock.ExpectQuery(`SELECT COUNT(1)*`).WillReturnRows(sqlmock.NewRows([]string{"mock"}).FromCSVString("1")) + mock.ExpectExec("SELECT pg_advisory_lock*").WithArgs(sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectQuery(`SELECT version, dirty FROM "schema_migrations" LIMIT 1`).WillReturnRows(sqlmock.NewRows([]string{"version", "dirty"}).FromCSVString("20200131152000,false")) + mock.ExpectExec("SELECT pg_advisory_unlock*").WithArgs(sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(1, 1)) + + options := storage.DefaultSettings() + options.EncryptionKey = string(envEncryptionKey) + options.URI = "sqlmock://sqlmock" + err = s.Open(options) + Expect(err).ToNot(HaveOccurred()) + + fakeEncrypter = &securityfakes.FakeEncrypter{} + + fakeEncrypter.EncryptCalls(func(ctx context.Context, plainKey []byte, encryptionKey []byte) ([]byte, error) { + encrypter := &security.AESEncrypter{} + return encrypter.Encrypt(ctx, plainKey, encryptionKey) + }) + + fakeEncrypter.DecryptCalls(func(ctx context.Context, encryptedKey []byte, encryptionKey []byte) ([]byte, error) { + encrypter := &security.AESEncrypter{} + return encrypter.Decrypt(ctx, encryptedKey, encryptionKey) + }) + }) + + AfterEach(func() { + s.Close() + }) + + Describe("Lock", func() { + AfterEach(func() { + mock.ExpectQuery("SELECT pg_advisory_unlock*").WithArgs(sqlmock.AnyArg()).WillReturnRows(sucessUnlockRow()) + err := locker.Unlock(context.TODO()) + Expect(err).ShouldNot(HaveOccurred()) + }) + + BeforeEach(func() { + mock.ExpectQuery("SELECT pg_advisory_lock*").WillReturnRows(sucessLockRow()) + }) + + Context("When lock is already acquired", func() { + It("Should return an error", func() { + err := locker.Lock(context.TODO()) + Expect(err).ToNot(HaveOccurred()) + + err = locker.Lock(context.TODO()) + Expect(err).To(HaveOccurred()) + }) + }) + + Context("When lock is not yet acquired", func() { + It("Should acquire lock", func() { + err := locker.Lock(context.TODO()) + Expect(err).ToNot(HaveOccurred()) + }) + }) + }) + + Describe("TryLock", func() { + Context("When lock is already acquired by another lock", func() { + BeforeEach(func() { + mock.ExpectQuery("SELECT pg_try_advisory_lock*").WillReturnRows(failLockRow()) + }) + + It("Should return an error", func() { + err := locker.Lock(context.TODO()) + Expect(err).To(HaveOccurred()) + }) + }) + }) + + Describe("Unlock", func() { + Context("When lock is not acquired", func() { + It("Should return nil", func() { + err := locker.Unlock(context.TODO()) + Expect(err).ToNot(HaveOccurred()) + }) + }) + + Context("When lock is acquired", func() { + BeforeEach(func() { + mock.ExpectQuery("SELECT pg_advisory_lock*").WillReturnRows(sucessLockRow()) + mock.ExpectQuery("SELECT pg_advisory_unlock*").WillReturnRows(sucessUnlockRow()) + }) + + It("Should release lock", func() { + err := locker.Lock(context.TODO()) + Expect(err).ToNot(HaveOccurred()) + + err = locker.Unlock(context.TODO()) + Expect(err).To(BeNil()) + }) + }) + }) +}) diff --git a/storage/postgres/storage.go b/storage/postgres/storage.go index 8f86668ef..726203ccb 100644 --- a/storage/postgres/storage.go +++ b/storage/postgres/storage.go @@ -51,7 +51,6 @@ type Storage struct { state *storageState layerOneEncryptionKey []byte scheme *scheme - isLocked bool mutex sync.Mutex } diff --git a/storage/postgres/storage_test.go b/storage/postgres/storage_test.go index ac1cecd98..95625973d 100644 --- a/storage/postgres/storage_test.go +++ b/storage/postgres/storage_test.go @@ -18,6 +18,7 @@ package postgres import ( "context" + "github.com/Peripli/service-manager/storage" . "github.com/onsi/ginkgo" @@ -27,22 +28,6 @@ import ( var _ = Describe("Postgres Storage", func() { pgStorage := &Storage{} - Describe("Lock", func() { - Context("Called with uninitialized db", func() { - It("Should panic", func() { - Expect(func() { pgStorage.Lock(context.TODO()) }).To(Panic()) - }) - }) - }) - - Context("Unlock", func() { - Context("Called with uninitialized db", func() { - It("Should panic", func() { - Expect(func() { pgStorage.Unlock(context.TODO()) }).To(Panic()) - }) - }) - }) - Context("GetEncryptionKey", func() { Context("Called with uninitialized db", func() { It("Should panic", func() { diff --git a/test/broker_test/broker_test.go b/test/broker_test/broker_test.go index 78be7cca4..7051a9570 100644 --- a/test/broker_test/broker_test.go +++ b/test/broker_test/broker_test.go @@ -69,7 +69,7 @@ var _ = test.DescribeTestsFor(test.TestCase{ ResourceBlueprint: blueprint(true), ResourceWithoutNullableFieldsBlueprint: blueprint(false), PatchResource: test.APIResourcePatch, - AdditionalTests: func(ctx *common.TestContext) { + AdditionalTests: func(ctx *common.TestContext, t *test.TestCase) { Context("additional non-generic tests", func() { var ( brokerServer *common.BrokerServer diff --git a/test/common/test_context.go b/test/common/test_context.go index 0d2c4d322..2bd82867a 100644 --- a/test/common/test_context.go +++ b/test/common/test_context.go @@ -168,7 +168,7 @@ func NewTestContextBuilder() *TestContextBuilder { Environment: TestEnv, envPostHooks: []func(env env.Environment, servers map[string]FakeServer){ func(env env.Environment, servers map[string]FakeServer) { - env.Set("api.token_issuer_url", servers["oauth-server"].URL()) + env.Set("api.token_issuer_url", servers[OauthServer].URL()) }, func(env env.Environment, servers map[string]FakeServer) { flag.VisitAll(func(flag *flag.Flag) { @@ -183,9 +183,7 @@ func NewTestContextBuilder() *TestContextBuilder { smExtensions: []func(ctx context.Context, smb *sm.ServiceManagerBuilder, env env.Environment) error{}, defaultTokenClaims: make(map[string]interface{}, 0), tenantTokenClaims: make(map[string]interface{}, 0), - Servers: map[string]FakeServer{ - "oauth-server": NewOAuthServer(), - }, + Servers: map[string]FakeServer{}, HttpClient: &http.Client{ Transport: &http.Transport{ Proxy: http.ProxyFromEnvironment, @@ -315,6 +313,7 @@ func (tcb *TestContextBuilder) BuildWithoutCleanup() *TestContext { func (tcb *TestContextBuilder) BuildWithListener(listener net.Listener, cleanup bool) *TestContext { environment := tcb.Environment(tcb.envPreHooks...) + tcb.Servers[OauthServer] = NewOAuthServer() for _, envPostHook := range tcb.envPostHooks { envPostHook(environment, tcb.Servers) } @@ -538,11 +537,17 @@ func (ctx *TestContext) CleanupBroker(id string) { } func (ctx *TestContext) Cleanup() { + ctx.CleanupAll(true) +} + +func (ctx *TestContext) CleanupAll(cleanupResources bool) { if ctx == nil { return } - ctx.CleanupAdditionalResources() + if cleanupResources { + ctx.CleanupAdditionalResources() + } for _, server := range ctx.Servers { server.Close() diff --git a/test/configuration_test/configuration_test.go b/test/configuration_test/configuration_test.go index 0fff9fe2b..fab0d7364 100644 --- a/test/configuration_test/configuration_test.go +++ b/test/configuration_test/configuration_test.go @@ -84,14 +84,13 @@ var _ = Describe("Service Manager Config API", func() { "label_key": "tenant" }, "operations": { - "cleanup_interval": "1h0m0s", + "cleanup_interval": "24h0m0s", "default_pool_size": 20, - "job_timeout": "167h0m0s", - "mark_orphans_interval": "24h0m0s", + "action_timeout": "12h0m0s", "polling_interval": "1ms", "pools": "", "rescheduling_interval": "1ms", - "scheduled_deletion_timeout": "12h0m0s" + "reconciliation_operation_timeout": "168h0m0s" }, "server": { "host": "", diff --git a/test/operations_test/operations_test.go b/test/operations_test/operations_test.go index 7f65cf189..686b63638 100644 --- a/test/operations_test/operations_test.go +++ b/test/operations_test/operations_test.go @@ -57,7 +57,7 @@ var _ = Describe("Operations", func() { Context("Scheduler", func() { BeforeEach(func() { postHook := func(e env.Environment, servers map[string]common.FakeServer) { - e.Set("operations.job_timeout", 5*time.Nanosecond) + e.Set("operations.action_timeout", 5*time.Nanosecond) e.Set("operations.mark_orphans_interval", 1*time.Hour) } @@ -185,7 +185,7 @@ var _ = Describe("Operations", func() { Context("Maintainer", func() { const ( - jobTimeout = 1 * time.Second + actionTimeout = 1 * time.Second cleanupInterval = 2 * time.Second operationExpiration = 2 * time.Second ) @@ -194,10 +194,10 @@ var _ = Describe("Operations", func() { postHookWithOperationsConfig := func() func(e env.Environment, servers map[string]common.FakeServer) { return func(e env.Environment, servers map[string]common.FakeServer) { - e.Set("operations.job_timeout", jobTimeout) - e.Set("operations.mark_orphans_interval", jobTimeout) + e.Set("operations.action_timeout", actionTimeout) e.Set("operations.cleanup_interval", cleanupInterval) - e.Set("operations.expiration_time", operationExpiration) + e.Set("operations.lifespan", operationExpiration) + e.Set("operations.reconciliation_operation_timeout", 9999*time.Hour) } } @@ -215,8 +215,7 @@ var _ = Describe("Operations", func() { When("Specified cleanup interval passes", func() { Context("operation platform is service Manager", func() { - - It("Does not delete operations older than that interval", func() { + It("Deletes operations older than that interval", func() { ctx.SMWithOAuth.DELETE(web.ServiceBrokersURL+"/non-existent-broker-id").WithQuery("async", true). Expect(). Status(http.StatusAccepted) @@ -224,8 +223,12 @@ var _ = Describe("Operations", func() { byPlatformID := query.ByField(query.EqualsOperator, "platform_id", types.SMPlatform) assertOperationCount(2, byPlatformID) - time.Sleep(cleanupInterval + time.Second) - assertOperationCount(2, byPlatformID) + Eventually(func() int { + count, err := ctx.SMRepository.Count(context.Background(), types.OperationType, byPlatformID) + Expect(err).To(BeNil()) + + return count + }, cleanupInterval*2).Should(Equal(0)) }) }) @@ -287,6 +290,40 @@ var _ = Describe("Operations", func() { }, cleanupInterval*2).Should(Equal(0)) }) }) + + Context("with external operations for Service Manager", func() { + BeforeEach(func() { + operation := &types.Operation{ + Base: types.Base{ + ID: defaultOperationID, + UpdatedAt: time.Now().Add(-cleanupInterval + time.Second), + Labels: make(map[string][]string), + Ready: true, + }, + Reschedule: false, + Type: types.CREATE, + State: types.IN_PROGRESS, + ResourceID: "test-resource-id", + ResourceType: web.ServiceBrokersURL, + PlatformID: "cloudfoundry", + CorrelationID: "test-correlation-id", + } + object, err := ctx.SMRepository.Create(context.Background(), operation) + Expect(err).To(BeNil()) + Expect(object).To(Not(BeNil())) + }) + + It("should cleanup external old ones", func() { + byPlatformID := query.ByField(query.EqualsOperator, "platform_id", types.SMPlatform) + assertOperationCount(1, byPlatformID) + Eventually(func() int { + count, err := ctx.SMRepository.Count(context.Background(), types.OperationType, byPlatformID) + Expect(err).To(BeNil()) + + return count + }, operationExpiration*2).Should(Equal(0)) + }) + }) }) When("Specified job timeout passes", func() { @@ -298,10 +335,12 @@ var _ = Describe("Operations", func() { Labels: make(map[string][]string), Ready: true, }, + Reschedule: false, Type: types.CREATE, State: types.IN_PROGRESS, ResourceID: "test-resource-id", ResourceType: web.ServiceBrokersURL, + PlatformID: types.SMPlatform, CorrelationID: "test-correlation-id", } @@ -316,7 +355,7 @@ var _ = Describe("Operations", func() { op := object.(*types.Operation) return op.State - }, jobTimeout*5).Should(Equal(types.FAILED)) + }, actionTimeout*5).Should(Equal(types.FAILED)) }) }) }) diff --git a/test/platform_test/platform_test.go b/test/platform_test/platform_test.go index 4c359d3de..480fba21b 100644 --- a/test/platform_test/platform_test.go +++ b/test/platform_test/platform_test.go @@ -60,7 +60,7 @@ var _ = test.DescribeTestsFor(test.TestCase{ ResourceBlueprint: blueprint(true), ResourceWithoutNullableFieldsBlueprint: blueprint(false), PatchResource: test.APIResourcePatch, - AdditionalTests: func(ctx *common.TestContext) { + AdditionalTests: func(ctx *common.TestContext, t *test.TestCase) { Context("non-generic tests", func() { BeforeEach(func() { common.RemoveAllPlatforms(ctx.SMRepository) diff --git a/test/service_binding_test/service_binding_test.go b/test/service_binding_test/service_binding_test.go index 5edb2aded..bd9f76345 100644 --- a/test/service_binding_test/service_binding_test.go +++ b/test/service_binding_test/service_binding_test.go @@ -17,9 +17,14 @@ package service_binding_test import ( + "context" "fmt" + "github.com/Peripli/service-manager/operations" + "github.com/Peripli/service-manager/pkg/env" + "github.com/Peripli/service-manager/pkg/query" "net/http" "strconv" + "sync/atomic" "time" "github.com/gofrs/uuid" @@ -78,7 +83,7 @@ var _ = DescribeTestsFor(TestCase{ ResourceWithoutNullableFieldsBlueprint: blueprint, ResourcePropertiesToIgnore: []string{"volume_mounts", "endpoints", "bind_resource", "credentials"}, PatchResource: StorageResourcePatch, - AdditionalTests: func(ctx *TestContext) { + AdditionalTests: func(ctx *TestContext, t *TestCase) { Context("additional non-generic tests", func() { var ( postBindingRequest Object @@ -96,6 +101,7 @@ var _ = DescribeTestsFor(TestCase{ expectedCreateSuccessStatusCode int expectedDeleteSuccessStatusCode int expectedBrokerFailureStatusCode int + expectedSMCrashStatusCode int } testCases := []testCase{ @@ -104,12 +110,14 @@ var _ = DescribeTestsFor(TestCase{ expectedCreateSuccessStatusCode: http.StatusCreated, expectedDeleteSuccessStatusCode: http.StatusOK, expectedBrokerFailureStatusCode: http.StatusBadGateway, + expectedSMCrashStatusCode: http.StatusBadGateway, }, { async: true, expectedCreateSuccessStatusCode: http.StatusAccepted, expectedDeleteSuccessStatusCode: http.StatusAccepted, expectedBrokerFailureStatusCode: http.StatusAccepted, + expectedSMCrashStatusCode: http.StatusAccepted, }, } @@ -594,6 +602,71 @@ var _ = DescribeTestsFor(TestCase{ }) }) + XWhen("SM crashes after storing operation before storing resource", func() { + var newCtx *TestContext + + postHookWithShutdownTimeout := func() func(e env.Environment, servers map[string]FakeServer) { + return func(e env.Environment, servers map[string]FakeServer) { + e.Set("server.shutdown_timeout", 1*time.Second) + e.Set("httpclient.response_header_timeout", 1*time.Second) + } + } + + BeforeEach(func() { + ctxMaintainerBuilder := t.ContextBuilder.WithEnvPostExtensions(postHookWithShutdownTimeout()) + newCtx = ctxMaintainerBuilder.BuildWithoutCleanup() + + brokerServer.BindingHandlerFunc(http.MethodPut, http.MethodPut+"3", func(_ *http.Request) (int, map[string]interface{}) { + defer newCtx.CleanupAll(false) + return http.StatusOK, Object{"state": types.IN_PROGRESS} + }) + + brokerServer.BindingHandlerFunc(http.MethodDelete, http.MethodDelete+"3", ParameterizedHandler(http.StatusAccepted, Object{"async": true})) + brokerServer.BindingLastOpHandlerFunc(http.MethodDelete+"3", func(_ *http.Request) (int, map[string]interface{}) { + return http.StatusOK, Object{"state": types.SUCCEEDED} + }) + }) + + It("Should mark operation as failed and trigger orphan mitigation", func() { + opChan := make(chan *types.Operation) + defer close(opChan) + + opCriteria := []query.Criterion{ + query.ByField(query.EqualsOperator, "type", string(types.CREATE)), + query.ByField(query.EqualsOperator, "state", string(types.IN_PROGRESS)), + query.ByField(query.EqualsOperator, "resource_type", string(types.ServiceBindingType)), + query.ByField(query.EqualsOperator, "reschedule", "false"), + query.ByField(query.EqualsOperator, "deletion_scheduled", operations.ZeroTime), + } + + go func() { + for { + object, err := ctx.SMRepository.Get(context.TODO(), types.OperationType, opCriteria...) + if err == nil { + opChan <- object.(*types.Operation) + break + } + } + }() + + createBinding(newCtx.SMWithOAuthForTenant, testCase.async, testCase.expectedSMCrashStatusCode) + operation := <-opChan + + verifyBindingDoesNotExist(ctx.SMWithOAuthForTenant, operation.ResourceID) + + operationExpectation := OperationExpectations{ + Category: types.CREATE, + State: types.FAILED, + ResourceType: types.ServiceBindingType, + Reschedulable: false, + DeletionScheduled: false, + } + + bindingID, _ = VerifyOperationExists(ctx, fmt.Sprintf("%s/%s%s/%s", web.ServiceBindingsURL, operation.ResourceID, web.OperationsURL, operation.ID), operationExpectation) + verifyBindingDoesNotExist(ctx.SMWithOAuthForTenant, bindingID) + }) + }) + When("broker responds with synchronous success", func() { BeforeEach(func() { brokerServer.BindingHandlerFunc(http.MethodPut, http.MethodPut+"1", ParameterizedHandler(http.StatusCreated, syncBindingResponse)) @@ -635,12 +708,12 @@ var _ = DescribeTestsFor(TestCase{ }) if testCase.async { - When("job timeout is reached while polling", func() { + When("action timeout is reached while polling", func() { var oldCtx *TestContext BeforeEach(func() { oldCtx = ctx ctx = NewTestContextBuilderWithSecurity().WithEnvPreExtensions(func(set *pflag.FlagSet) { - Expect(set.Set("operations.job_timeout", (2 * time.Second).String())).ToNot(HaveOccurred()) + Expect(set.Set("operations.action_timeout", (2 * time.Second).String())).ToNot(HaveOccurred()) }).BuildWithoutCleanup() brokerServer.BindingHandlerFunc(http.MethodPut, http.MethodPut+"1", ParameterizedHandler(http.StatusAccepted, Object{"async": true})) @@ -665,6 +738,57 @@ var _ = DescribeTestsFor(TestCase{ verifyBindingExists(ctx.SMWithOAuthForTenant, bindingID, false) }) }) + + XWhen("SM crashes while polling", func() { + var newCtx *TestContext + var isBound atomic.Value + + postHookWithShutdownTimeout := func() func(e env.Environment, servers map[string]FakeServer) { + return func(e env.Environment, servers map[string]FakeServer) { + e.Set("server.shutdown_timeout", 1*time.Second) + } + } + + BeforeEach(func() { + ctxMaintainerBuilder := t.ContextBuilder.WithEnvPostExtensions(postHookWithShutdownTimeout()) + newCtx = ctxMaintainerBuilder.BuildWithoutCleanup() + + brokerServer.BindingHandlerFunc(http.MethodPut, http.MethodPut+"1", ParameterizedHandler(http.StatusAccepted, Object{"async": true})) + brokerServer.BindingLastOpHandlerFunc(http.MethodPut+"1", func(_ *http.Request) (int, map[string]interface{}) { + if isBound.Load() != nil { + return http.StatusOK, Object{"state": types.SUCCEEDED} + } else { + return http.StatusOK, Object{"state": types.IN_PROGRESS} + } + }) + + }) + + It("should start restart polling through maintainer and eventually binding is set to ready", func() { + resp := createBinding(newCtx.SMWithOAuthForTenant, true, http.StatusAccepted) + + operationExpectations := OperationExpectations{ + Category: types.CREATE, + State: types.IN_PROGRESS, + ResourceType: types.ServiceBindingType, + Reschedulable: true, + DeletionScheduled: false, + } + + bindingID, _ = VerifyOperationExists(newCtx, resp.Header("Location").Raw(), operationExpectations) + verifyBindingExists(ctx.SMWithOAuthForTenant, bindingID, false) + + newCtx.CleanupAll(false) + + isBound.Store(true) + + operationExpectations.State = types.SUCCEEDED + operationExpectations.Reschedulable = false + + bindingID, _ = VerifyOperationExists(ctx, resp.Header("Location").Raw(), operationExpectations) + verifyBindingExists(ctx.SMWithOAuthForTenant, bindingID, true) + }) + }) } When("polling responds with unexpected state and eventually with success state", func() { @@ -881,6 +1005,55 @@ var _ = DescribeTestsFor(TestCase{ }) } + XWhen("SM crashes while orphan mitigating", func() { + var newCtx *TestContext + var isUnbound atomic.Value + + postHookWithShutdownTimeout := func() func(e env.Environment, servers map[string]FakeServer) { + return func(e env.Environment, servers map[string]FakeServer) { + e.Set("server.shutdown_timeout", 1*time.Second) + } + } + + BeforeEach(func() { + ctxMaintainerBuilder := t.ContextBuilder.WithEnvPostExtensions(postHookWithShutdownTimeout()) + newCtx = ctxMaintainerBuilder.BuildWithoutCleanup() + + brokerServer.BindingHandlerFunc(http.MethodDelete, http.MethodDelete+"3", ParameterizedHandler(http.StatusAccepted, Object{"async": true})) + brokerServer.BindingLastOpHandlerFunc(http.MethodDelete+"3", func(_ *http.Request) (int, map[string]interface{}) { + if isUnbound.Load() != nil { + return http.StatusOK, Object{"state": types.SUCCEEDED} + } else { + return http.StatusOK, Object{"state": types.IN_PROGRESS} + } + }) + }) + + It("should restart orphan mitigation through maintainer and eventually succeeds", func() { + resp := createBinding(newCtx.SMWithOAuthForTenant, testCase.async, testCase.expectedBrokerFailureStatusCode) + + operationExpectations := OperationExpectations{ + Category: types.CREATE, + State: types.FAILED, + ResourceType: types.ServiceBindingType, + Reschedulable: true, + DeletionScheduled: true, + } + + bindingID, _ = VerifyOperationExists(newCtx, resp.Header("Location").Raw(), operationExpectations) + + newCtx.CleanupAll(false) + isUnbound.Store(true) + + operationExpectations.DeletionScheduled = false + operationExpectations.Reschedulable = false + + bindingID, _ = VerifyOperationExists(ctx, resp.Header("Location").Raw(), operationExpectations) + verifyBindingDoesNotExist(ctx.SMWithOAuthForTenant, bindingID) + }) + + }) + When("broker orphan mitigation unbind asynchronously fails with an error that will continue further orphan mitigation and eventually succeed", func() { BeforeEach(func() { brokerServer.BindingHandlerFunc(http.MethodDelete, http.MethodDelete+"3", ParameterizedHandler(http.StatusAccepted, Object{"async": true})) @@ -1153,6 +1326,58 @@ var _ = DescribeTestsFor(TestCase{ verifyBindingDoesNotExist(ctx.SMWithOAuthForTenant, bindingID) }) + if testCase.async { + XWhen("SM crashes while polling", func() { + var newCtx *TestContext + var isBound atomic.Value + + postHookWithShutdownTimeout := func() func(e env.Environment, servers map[string]FakeServer) { + return func(e env.Environment, servers map[string]FakeServer) { + e.Set("server.shutdown_timeout", 1*time.Second) + } + } + + BeforeEach(func() { + ctxMaintainerBuilder := t.ContextBuilder.WithEnvPostExtensions(postHookWithShutdownTimeout()) + newCtx = ctxMaintainerBuilder.BuildWithoutCleanup() + + brokerServer.BindingHandlerFunc(http.MethodDelete, http.MethodDelete+"1", ParameterizedHandler(http.StatusAccepted, Object{"async": true})) + brokerServer.BindingLastOpHandlerFunc(http.MethodDelete+"1", func(_ *http.Request) (int, map[string]interface{}) { + if isBound.Load() != nil { + return http.StatusOK, Object{"state": types.SUCCEEDED} + } else { + return http.StatusOK, Object{"state": types.IN_PROGRESS} + } + }) + + }) + + It("should start restart polling through maintainer and eventually binding is set to ready", func() { + resp := deleteBinding(newCtx.SMWithOAuthForTenant, true, http.StatusAccepted) + + operationExpectations := OperationExpectations{ + Category: types.DELETE, + State: types.IN_PROGRESS, + ResourceType: types.ServiceBindingType, + Reschedulable: true, + DeletionScheduled: false, + } + + bindingID, _ = VerifyOperationExists(newCtx, resp.Header("Location").Raw(), operationExpectations) + verifyBindingExists(ctx.SMWithOAuthForTenant, bindingID, true) + + newCtx.CleanupAll(false) + isBound.Store(true) + + operationExpectations.State = types.SUCCEEDED + operationExpectations.Reschedulable = false + + bindingID, _ = VerifyOperationExists(ctx, resp.Header("Location").Raw(), operationExpectations) + verifyBindingDoesNotExist(ctx.SMWithOAuthForTenant, bindingID) + }) + }) + } + When("polling responds 410 GONE", func() { BeforeEach(func() { brokerServer.BindingHandlerFunc(http.MethodDelete, http.MethodDelete+"1", ParameterizedHandler(http.StatusAccepted, Object{"async": true})) diff --git a/test/service_instance_test/service_instance_test.go b/test/service_instance_test/service_instance_test.go index b2f5be080..2f8f463e1 100644 --- a/test/service_instance_test/service_instance_test.go +++ b/test/service_instance_test/service_instance_test.go @@ -17,7 +17,12 @@ package service_test import ( + "context" "fmt" + "github.com/Peripli/service-manager/operations" + "github.com/Peripli/service-manager/pkg/env" + "github.com/Peripli/service-manager/pkg/query" + "sync/atomic" "time" "github.com/tidwall/gjson" @@ -80,7 +85,7 @@ var _ = DescribeTestsFor(TestCase{ ResourceWithoutNullableFieldsBlueprint: blueprint, ResourcePropertiesToIgnore: []string{"platform_id"}, PatchResource: APIResourcePatch, - AdditionalTests: func(ctx *TestContext) { + AdditionalTests: func(ctx *TestContext, t *TestCase) { Context("additional non-generic tests", func() { var ( postInstanceRequest Object @@ -97,6 +102,7 @@ var _ = DescribeTestsFor(TestCase{ expectedCreateSuccessStatusCode int expectedDeleteSuccessStatusCode int expectedBrokerFailureStatusCode int + expectedSMCrashStatusCode int } testCases := []testCase{ @@ -105,12 +111,14 @@ var _ = DescribeTestsFor(TestCase{ expectedCreateSuccessStatusCode: http.StatusCreated, expectedDeleteSuccessStatusCode: http.StatusOK, expectedBrokerFailureStatusCode: http.StatusBadGateway, + expectedSMCrashStatusCode: http.StatusBadGateway, }, { async: true, expectedCreateSuccessStatusCode: http.StatusAccepted, expectedDeleteSuccessStatusCode: http.StatusAccepted, expectedBrokerFailureStatusCode: http.StatusAccepted, + expectedSMCrashStatusCode: http.StatusAccepted, }, } @@ -144,7 +152,7 @@ var _ = DescribeTestsFor(TestCase{ Status(expectedStatusCode) } - verifyInstanceExists := func(instanceID string, ready bool) { + verifyInstanceExists := func(ctx *TestContext, instanceID string, ready bool) { timeoutDuration := 15 * time.Second tickerInterval := 100 * time.Millisecond ticker := time.NewTicker(tickerInterval) @@ -325,7 +333,7 @@ var _ = DescribeTestsFor(TestCase{ DeletionScheduled: false, }) - verifyInstanceExists(instanceID, true) + verifyInstanceExists(ctx, instanceID, true) }) } @@ -482,7 +490,7 @@ var _ = DescribeTestsFor(TestCase{ DeletionScheduled: false, }) - verifyInstanceExists(instanceID, false) + verifyInstanceExists(ctx, instanceID, false) }) AfterEach(func() { @@ -520,6 +528,71 @@ var _ = DescribeTestsFor(TestCase{ }) }) + XWhen("SM crashes after storing operation before storing resource", func() { + var newCtx *TestContext + + postHookWithShutdownTimeout := func() func(e env.Environment, servers map[string]FakeServer) { + return func(e env.Environment, servers map[string]FakeServer) { + e.Set("server.shutdown_timeout", 1*time.Second) + e.Set("httpclient.response_header_timeout", 1*time.Second) + } + } + + BeforeEach(func() { + ctxMaintainerBuilder := t.ContextBuilder.WithEnvPostExtensions(postHookWithShutdownTimeout()) + newCtx = ctxMaintainerBuilder.BuildWithoutCleanup() + + brokerServer.ServiceInstanceHandlerFunc(http.MethodPut, http.MethodPut+"3", func(_ *http.Request) (int, map[string]interface{}) { + defer newCtx.CleanupAll(false) + return http.StatusOK, Object{"state": "in progress"} + }) + + brokerServer.ServiceInstanceHandlerFunc(http.MethodDelete, http.MethodDelete+"3", ParameterizedHandler(http.StatusAccepted, Object{"async": true})) + brokerServer.ServiceInstanceLastOpHandlerFunc(http.MethodDelete+"3", func(_ *http.Request) (int, map[string]interface{}) { + return http.StatusOK, Object{"state": "succeeded"} + }) + }) + + It("Should mark operation as failed and trigger orphan mitigation", func() { + opChan := make(chan *types.Operation) + defer close(opChan) + + opCriteria := []query.Criterion{ + query.ByField(query.EqualsOperator, "type", string(types.CREATE)), + query.ByField(query.EqualsOperator, "state", string(types.IN_PROGRESS)), + query.ByField(query.EqualsOperator, "resource_type", string(types.ServiceInstanceType)), + query.ByField(query.EqualsOperator, "reschedule", "false"), + query.ByField(query.EqualsOperator, "deletion_scheduled", operations.ZeroTime), + } + + go func() { + for { + object, err := ctx.SMRepository.Get(context.TODO(), types.OperationType, opCriteria...) + if err == nil { + opChan <- object.(*types.Operation) + break + } + } + }() + + createInstanceWithAsync(newCtx.SMWithOAuthForTenant, testCase.async, testCase.expectedSMCrashStatusCode) + operation := <-opChan + + verifyInstanceDoesNotExist(operation.ResourceID) + + operationExpectation := OperationExpectations{ + Category: types.CREATE, + State: types.FAILED, + ResourceType: types.ServiceInstanceType, + Reschedulable: false, + DeletionScheduled: false, + } + + instanceID, _ = VerifyOperationExists(ctx, fmt.Sprintf("%s/%s%s/%s", web.ServiceInstancesURL, operation.ResourceID, web.OperationsURL, operation.ID), operationExpectation) + verifyInstanceDoesNotExist(instanceID) + }) + }) + When("broker responds with synchronous success", func() { BeforeEach(func() { brokerServer.ServiceInstanceHandlerFunc(http.MethodPut, http.MethodPut+"1", ParameterizedHandler(http.StatusCreated, Object{"async": false})) @@ -536,7 +609,7 @@ var _ = DescribeTestsFor(TestCase{ DeletionScheduled: false, }) - verifyInstanceExists(instanceID, true) + verifyInstanceExists(ctx, instanceID, true) }) }) @@ -557,17 +630,17 @@ var _ = DescribeTestsFor(TestCase{ DeletionScheduled: false, }) - verifyInstanceExists(instanceID, true) + verifyInstanceExists(ctx, instanceID, true) }) if testCase.async { - When("job timeout is reached while polling", func() { + When("action timeout is reached while polling", func() { var oldCtx *TestContext BeforeEach(func() { oldCtx = ctx ctx = NewTestContextBuilderWithSecurity().WithEnvPreExtensions(func(set *pflag.FlagSet) { - Expect(set.Set("operations.job_timeout", (2 * time.Second).String())).ToNot(HaveOccurred()) + Expect(set.Set("operations.action_timeout", (2 * time.Second).String())).ToNot(HaveOccurred()) }).BuildWithoutCleanup() brokerServer.ServiceInstanceHandlerFunc(http.MethodPut, http.MethodPut+"1", ParameterizedHandler(http.StatusAccepted, Object{"async": true})) @@ -589,7 +662,56 @@ var _ = DescribeTestsFor(TestCase{ DeletionScheduled: false, }) - verifyInstanceExists(instanceID, false) + verifyInstanceExists(ctx, instanceID, false) + }) + }) + + XWhen("SM crashes while polling", func() { + var newCtx *TestContext + var isProvisioned atomic.Value + + postHookWithShutdownTimeout := func() func(e env.Environment, servers map[string]FakeServer) { + return func(e env.Environment, servers map[string]FakeServer) { + e.Set("server.shutdown_timeout", 1*time.Second) + } + } + + BeforeEach(func() { + ctxMaintainerBuilder := t.ContextBuilder.WithEnvPostExtensions(postHookWithShutdownTimeout()) + newCtx = ctxMaintainerBuilder.BuildWithoutCleanup() + + brokerServer.ServiceInstanceLastOpHandlerFunc(http.MethodPut+"1", func(_ *http.Request) (int, map[string]interface{}) { + if isProvisioned.Load() != nil { + return http.StatusOK, Object{"state": types.SUCCEEDED} + } else { + return http.StatusOK, Object{"state": types.IN_PROGRESS} + } + }) + }) + + It("should start restart polling through maintainer and eventually instance is set to ready", func() { + resp := createInstanceWithAsync(newCtx.SMWithOAuthForTenant, testCase.async, testCase.expectedCreateSuccessStatusCode) + + operationExpectation := OperationExpectations{ + Category: types.CREATE, + State: types.IN_PROGRESS, + ResourceType: types.ServiceInstanceType, + Reschedulable: true, + DeletionScheduled: false, + } + + instanceID, _ = VerifyOperationExists(newCtx, resp.Header("Location").Raw(), operationExpectation) + verifyInstanceExists(newCtx, instanceID, false) + + newCtx.CleanupAll(false) + + isProvisioned.Store(true) + + operationExpectation.State = types.SUCCEEDED + operationExpectation.Reschedulable = false + + instanceID, _ = VerifyOperationExists(ctx, resp.Header("Location").Raw(), operationExpectation) + verifyInstanceExists(ctx, instanceID, true) }) }) } @@ -610,7 +732,7 @@ var _ = DescribeTestsFor(TestCase{ Reschedulable: false, DeletionScheduled: false, }) - verifyInstanceExists(instanceID, true) + verifyInstanceExists(ctx, instanceID, true) }) }) @@ -660,7 +782,7 @@ var _ = DescribeTestsFor(TestCase{ DeletionScheduled: true, }) - verifyInstanceExists(instanceID, false) + verifyInstanceExists(ctx, instanceID, false) }) }) @@ -705,7 +827,7 @@ var _ = DescribeTestsFor(TestCase{ DeletionScheduled: false, }) - verifyInstanceExists(instanceID, false) + verifyInstanceExists(ctx, instanceID, false) }) }) }) @@ -785,7 +907,7 @@ var _ = DescribeTestsFor(TestCase{ BeforeEach(func() { oldCtx = ctx ctx = NewTestContextBuilderWithSecurity().WithEnvPreExtensions(func(set *pflag.FlagSet) { - Expect(set.Set("operations.scheduled_deletion_timeout", (2 * time.Millisecond).String())).ToNot(HaveOccurred()) + Expect(set.Set("operations.reconciliation_operation_timeout", (2 * time.Millisecond).String())).ToNot(HaveOccurred()) }).BuildWithoutCleanup() }) @@ -804,7 +926,7 @@ var _ = DescribeTestsFor(TestCase{ DeletionScheduled: true, }) - verifyInstanceExists(instanceID, false) + verifyInstanceExists(ctx, instanceID, false) }) }) }) @@ -827,11 +949,61 @@ var _ = DescribeTestsFor(TestCase{ DeletionScheduled: true, }) - verifyInstanceExists(instanceID, false) + verifyInstanceExists(ctx, instanceID, false) }) }) } + XWhen("SM crashes while orphan mitigating", func() { + var newCtx *TestContext + var isDeprovisioned atomic.Value + + postHookWithShutdownTimeout := func() func(e env.Environment, servers map[string]FakeServer) { + return func(e env.Environment, servers map[string]FakeServer) { + e.Set("server.shutdown_timeout", 1*time.Second) + } + } + + BeforeEach(func() { + ctxMaintainerBuilder := t.ContextBuilder.WithEnvPostExtensions(postHookWithShutdownTimeout()) + newCtx = ctxMaintainerBuilder.BuildWithoutCleanup() + + brokerServer.ServiceInstanceHandlerFunc(http.MethodDelete, http.MethodDelete+"3", ParameterizedHandler(http.StatusAccepted, Object{"async": true})) + brokerServer.ServiceInstanceLastOpHandlerFunc(http.MethodDelete+"3", func(_ *http.Request) (int, map[string]interface{}) { + if isDeprovisioned.Load() != nil { + return http.StatusOK, Object{"state": "succeeded"} + } else { + return http.StatusOK, Object{"state": "in progress"} + } + }) + }) + + It("should restart orphan mitigation through maintainer and eventually succeeds", func() { + resp := createInstanceWithAsync(newCtx.SMWithOAuthForTenant, testCase.async, testCase.expectedBrokerFailureStatusCode) + + operationExpectations := OperationExpectations{ + Category: types.CREATE, + State: types.FAILED, + ResourceType: types.ServiceInstanceType, + Reschedulable: true, + DeletionScheduled: true, + } + + instanceID, _ = VerifyOperationExists(newCtx, resp.Header("Location").Raw(), operationExpectations) + + newCtx.CleanupAll(false) + + isDeprovisioned.Store(true) + + operationExpectations.DeletionScheduled = false + operationExpectations.Reschedulable = false + instanceID, _ = VerifyOperationExists(ctx, resp.Header("Location").Raw(), operationExpectations) + + verifyInstanceDoesNotExist(instanceID) + }) + + }) + When("broker orphan mitigation deprovision asynchronously fails with an error that will continue further orphan mitigation and eventually succeed", func() { BeforeEach(func() { brokerServer.ServiceInstanceHandlerFunc(http.MethodDelete, http.MethodDelete+"3", ParameterizedHandler(http.StatusAccepted, Object{"async": true})) @@ -858,7 +1030,7 @@ var _ = DescribeTestsFor(TestCase{ }) }) - When("provision responds with error due to times out", func() { + When("provision responds with error due to time out", func() { var doneChannel chan interface{} var oldCtx *TestContext @@ -1213,7 +1385,7 @@ var _ = DescribeTestsFor(TestCase{ DeletionScheduled: false, }) - verifyInstanceExists(instanceID, true) + verifyInstanceExists(ctx, instanceID, true) }) When("a delete operation is already in progress", func() { @@ -1234,7 +1406,7 @@ var _ = DescribeTestsFor(TestCase{ DeletionScheduled: false, }) - verifyInstanceExists(instanceID, true) + verifyInstanceExists(ctx, instanceID, true) }) AfterEach(func() { @@ -1284,7 +1456,7 @@ var _ = DescribeTestsFor(TestCase{ DeletionScheduled: false, }) - verifyInstanceExists(instanceID, true) + verifyInstanceExists(ctx, instanceID, true) }) }) @@ -1348,13 +1520,65 @@ var _ = DescribeTestsFor(TestCase{ verifyInstanceDoesNotExist(instanceID) }) + if testCase.async { + XWhen("SM crashes while polling", func() { + var newCtx *TestContext + var isDeprovisioned atomic.Value + + postHookWithShutdownTimeout := func() func(e env.Environment, servers map[string]FakeServer) { + return func(e env.Environment, servers map[string]FakeServer) { + e.Set("server.shutdown_timeout", 1*time.Second) + } + } + + BeforeEach(func() { + ctxMaintainerBuilder := t.ContextBuilder.WithEnvPostExtensions(postHookWithShutdownTimeout()) + newCtx = ctxMaintainerBuilder.BuildWithoutCleanup() + + brokerServer.ServiceInstanceLastOpHandlerFunc(http.MethodDelete+"1", func(_ *http.Request) (int, map[string]interface{}) { + if isDeprovisioned.Load() != nil { + return http.StatusOK, Object{"state": "succeeded"} + } else { + return http.StatusOK, Object{"state": "in progress"} + } + }) + }) + + It("should restart polling through maintainer and eventually deletes the instance", func() { + resp := deleteInstance(newCtx.SMWithOAuthForTenant, true, http.StatusAccepted) + + operationExpectations := OperationExpectations{ + Category: types.DELETE, + State: types.IN_PROGRESS, + ResourceType: types.ServiceInstanceType, + Reschedulable: true, + DeletionScheduled: false, + } + + instanceID, _ = VerifyOperationExists(newCtx, resp.Header("Location").Raw(), operationExpectations) + verifyInstanceExists(newCtx, instanceID, true) + + newCtx.CleanupAll(false) + + isDeprovisioned.Store(true) + + operationExpectations.State = types.SUCCEEDED + operationExpectations.Reschedulable = false + + instanceID, _ = VerifyOperationExists(ctx, resp.Header("Location").Raw(), operationExpectations) + verifyInstanceDoesNotExist(instanceID) + + }) + }) + } + When("polling responds 410 GONE", func() { BeforeEach(func() { brokerServer.ServiceInstanceHandlerFunc(http.MethodDelete, http.MethodDelete+"1", ParameterizedHandler(http.StatusAccepted, Object{"async": true})) brokerServer.ServiceInstanceLastOpHandlerFunc(http.MethodDelete+"1", ParameterizedHandler(http.StatusGone, Object{})) }) - It("keeps polling and eventually deletes the binding and marks the operation as success", func() { + It("keeps polling and eventually deletes the instance and marks the operation as success", func() { resp := deleteInstance(ctx.SMWithOAuthForTenant, testCase.async, testCase.expectedDeleteSuccessStatusCode) instanceID, _ = VerifyOperationExists(ctx, resp.Header("Location").Raw(), OperationExpectations{ @@ -1444,7 +1668,7 @@ var _ = DescribeTestsFor(TestCase{ DeletionScheduled: true, }) - verifyInstanceExists(instanceID, true) + verifyInstanceExists(ctx, instanceID, true) }) }) @@ -1482,7 +1706,7 @@ var _ = DescribeTestsFor(TestCase{ BeforeEach(func() { oldCtx = ctx ctx = NewTestContextBuilderWithSecurity().WithEnvPreExtensions(func(set *pflag.FlagSet) { - Expect(set.Set("operations.scheduled_deletion_timeout", (2 * time.Millisecond).String())).ToNot(HaveOccurred()) + Expect(set.Set("operations.reconciliation_operation_timeout", (2 * time.Millisecond).String())).ToNot(HaveOccurred()) }).BuildWithoutCleanup() }) @@ -1501,7 +1725,7 @@ var _ = DescribeTestsFor(TestCase{ DeletionScheduled: true, }) - verifyInstanceExists(instanceID, true) + verifyInstanceExists(ctx, instanceID, true) }) }) }) @@ -1523,7 +1747,7 @@ var _ = DescribeTestsFor(TestCase{ DeletionScheduled: false, }) - verifyInstanceExists(instanceID, true) + verifyInstanceExists(ctx, instanceID, true) }) }) }) @@ -1545,7 +1769,7 @@ var _ = DescribeTestsFor(TestCase{ DeletionScheduled: false, }) - verifyInstanceExists(instanceID, true) + verifyInstanceExists(ctx, instanceID, true) }) }) @@ -1564,7 +1788,7 @@ var _ = DescribeTestsFor(TestCase{ DeletionScheduled: false, }) - verifyInstanceExists(instanceID, true) + verifyInstanceExists(ctx, instanceID, true) }) }) @@ -1624,7 +1848,7 @@ var _ = DescribeTestsFor(TestCase{ DeletionScheduled: true, }) - verifyInstanceExists(instanceID, true) + verifyInstanceExists(ctx, instanceID, true) }) }) } @@ -1661,25 +1885,29 @@ var _ = DescribeTestsFor(TestCase{ }) When("deprovision responds with error due to times out", func() { + var newCtx *TestContext var doneChannel chan interface{} BeforeEach(func() { doneChannel = make(chan interface{}) - ctx = NewTestContextBuilderWithSecurity().WithEnvPreExtensions(func(set *pflag.FlagSet) { + newCtx = t.ContextBuilder.WithEnvPreExtensions(func(set *pflag.FlagSet) { Expect(set.Set("httpclient.response_header_timeout", (1 * time.Second).String())).ToNot(HaveOccurred()) }).BuildWithoutCleanup() brokerServer.ServiceInstanceHandlerFunc(http.MethodDelete, http.MethodDelete+"1", DelayingHandler(doneChannel)) + }) + AfterEach(func() { + newCtx.CleanupAll(false) }) It("orphan mitigates the instance", func() { - resp := deleteInstance(ctx.SMWithOAuthForTenant, testCase.async, testCase.expectedBrokerFailureStatusCode) + resp := deleteInstance(newCtx.SMWithOAuthForTenant, testCase.async, testCase.expectedBrokerFailureStatusCode) <-time.After(1100 * time.Millisecond) close(doneChannel) - instanceID, _ = VerifyOperationExists(ctx, resp.Header("Location").Raw(), OperationExpectations{ + instanceID, _ = VerifyOperationExists(newCtx, resp.Header("Location").Raw(), OperationExpectations{ Category: types.DELETE, State: types.FAILED, ResourceType: types.ServiceInstanceType, @@ -1689,7 +1917,7 @@ var _ = DescribeTestsFor(TestCase{ brokerServer.ServiceInstanceHandlerFunc(http.MethodDelete, http.MethodDelete+"1", ParameterizedHandler(http.StatusOK, Object{"async": false})) - instanceID, _ = VerifyOperationExists(ctx, resp.Header("Location").Raw(), OperationExpectations{ + instanceID, _ = VerifyOperationExists(newCtx, resp.Header("Location").Raw(), OperationExpectations{ Category: types.DELETE, State: types.SUCCEEDED, ResourceType: types.ServiceInstanceType, diff --git a/test/service_offering_test/service_offering_test.go b/test/service_offering_test/service_offering_test.go index 9bfb86fa6..dfcefba7b 100644 --- a/test/service_offering_test/service_offering_test.go +++ b/test/service_offering_test/service_offering_test.go @@ -52,7 +52,7 @@ var _ = test.DescribeTestsFor(test.TestCase{ ResourceBlueprint: blueprint, ResourceWithoutNullableFieldsBlueprint: blueprint, PatchResource: test.APIResourcePatch, - AdditionalTests: func(ctx *common.TestContext) { + AdditionalTests: func(ctx *common.TestContext, t *test.TestCase) { Context("additional non-generic tests", func() { Describe("PATCH", func() { var id string diff --git a/test/service_plan_test/service_plan_test.go b/test/service_plan_test/service_plan_test.go index 029384dd4..39aa2156f 100644 --- a/test/service_plan_test/service_plan_test.go +++ b/test/service_plan_test/service_plan_test.go @@ -48,7 +48,7 @@ var _ = test.DescribeTestsFor(test.TestCase{ ResourceBlueprint: blueprint, ResourceWithoutNullableFieldsBlueprint: blueprint, PatchResource: test.APIResourcePatch, - AdditionalTests: func(ctx *common.TestContext) { + AdditionalTests: func(ctx *common.TestContext, t *test.TestCase) { Context("additional non-generic tests", func() { Describe("PATCH", func() { var id string diff --git a/test/test.go b/test/test.go index 0cca3774d..4726c0ecc 100644 --- a/test/test.go +++ b/test/test.go @@ -62,8 +62,21 @@ const ( Sync ResponseMode = false Async ResponseMode = true + + JobTimeout = 15 * time.Second + cleanupInterval = 60 * time.Second + operationExpiration = 60 * time.Second ) +func postHookWithOperationsConfig() func(e env.Environment, servers map[string]common.FakeServer) { + return func(e env.Environment, servers map[string]common.FakeServer) { + e.Set("operations.action_timeout", JobTimeout) + e.Set("operations.cleanup_interval", cleanupInterval) + e.Set("operations.lifespan", operationExpiration) + e.Set("operations.reconciliation_operation_timeout", 9999*time.Hour) + } +} + type MultitenancySettings struct { ClientID string ClientIDTokenClaim string @@ -88,7 +101,8 @@ type TestCase struct { ResourceWithoutNullableFieldsBlueprint func(ctx *common.TestContext, smClient *common.SMExpect, async bool) common.Object PatchResource func(ctx *common.TestContext, tenantScoped bool, apiPath string, objID string, resourceType types.ObjectType, patchLabels []*query.LabelChange, async bool) - AdditionalTests func(ctx *common.TestContext) + AdditionalTests func(ctx *common.TestContext, t *TestCase) + ContextBuilder *common.TestContextBuilder } func stripObject(obj common.Object, properties ...string) { @@ -237,11 +251,8 @@ func DescribeTestsFor(t TestCase) bool { ctx.Cleanup() }) - func() { - By("==== Preparation for SM tests... ====") - - defer GinkgoRecover() - ctxBuilder := common.NewTestContextBuilderWithSecurity() + ctxBuilder := func() *common.TestContextBuilder { + ctxBuilder := common.NewTestContextBuilderWithSecurity().WithEnvPostExtensions(postHookWithOperationsConfig()) if t.MultitenancySettings != nil { ctxBuilder. @@ -268,7 +279,16 @@ func DescribeTestsFor(t TestCase) bool { return nil }) } - ctx = ctxBuilder.Build() + return ctxBuilder + } + + t.ContextBuilder = ctxBuilder() + + func() { + By("==== Preparation for SM tests... ====") + + defer GinkgoRecover() + ctx = ctxBuilder().Build() // A panic outside of Ginkgo's primitives (during test setup) would be recovered // by the deferred GinkgoRecover() and the error will be associated with the first @@ -307,7 +327,7 @@ func DescribeTestsFor(t TestCase) bool { } if t.AdditionalTests != nil { - t.AdditionalTests(ctx) + t.AdditionalTests(ctx, &t) } By("==== Successfully finished preparation for SM tests. Running API tests suite... ====") diff --git a/test/visibility_test/visibility_test.go b/test/visibility_test/visibility_test.go index 6ba91c5d5..2bd4d2c33 100644 --- a/test/visibility_test/visibility_test.go +++ b/test/visibility_test/visibility_test.go @@ -48,7 +48,7 @@ var _ = test.DescribeTestsFor(test.TestCase{ ResourceBlueprint: blueprint(true), ResourceWithoutNullableFieldsBlueprint: blueprint(false), PatchResource: test.APIResourcePatch, - AdditionalTests: func(ctx *common.TestContext) { + AdditionalTests: func(ctx *common.TestContext, t *test.TestCase) { Context("non-generic tests", func() { var ( existingPlatformID string