Skip to content

Commit

Permalink
Support delete for async_not_defined mode (#547)
Browse files Browse the repository at this point in the history
Finalize 'neutral' mode (not explicitly sync/async) for delete operations

- Support delete for async_not_defined
- Move the scheduling logic to the scheduler
- Confirm API supports async mode before scheduling a storage action
  • Loading branch information
i500866 authored Aug 11, 2020
1 parent 8e269b7 commit 4a19d22
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 103 deletions.
84 changes: 19 additions & 65 deletions api/base_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,25 +196,13 @@ func (c *BaseController) CreateObject(r *web.Request) (*web.Response, error) {
Context: c.prepareOperationContextByRequest(r),
}

var createdObj types.Object
if operation.Context.IsAsyncNotDefined {
log.C(ctx).Debugf("Request will be executed by broker response")
createdObj, err = c.scheduler.ScheduleStorageAction(ctx, operation, action)
if err != nil {
return nil, err
}
if createdObj == nil {
return util.NewLocationResponse(operation.GetID(), operation.ResourceID, c.resourceBaseURL)
}
} else if operation.Context.Async {
log.C(ctx).Debugf("Request will be executed asynchronously")
return c.executeAsync(ctx, operation, action, result.GetID())
} else {
log.C(ctx).Debugf("Request will be executed synchronously")
createdObj, err = c.scheduler.ScheduleSyncStorageAction(ctx, operation, action)
if err != nil {
return nil, util.HandleStorageError(err, c.objectType.String())
}
createdObj, isAsync, err := c.scheduler.ScheduleStorageAction(ctx, operation, action, c.supportsAsync)
if err != nil {
return nil, err
}

if isAsync {
return util.NewLocationResponse(operation.GetID(), operation.ResourceID, c.resourceBaseURL)
}

if err := attachLastOperation(ctx, createdObj.GetID(), createdObj, c.repository); err != nil {
Expand All @@ -225,16 +213,6 @@ func (c *BaseController) CreateObject(r *web.Request) (*web.Response, error) {
return util.NewJSONResponse(http.StatusCreated, createdObj)
}

func (c *BaseController) executeAsync(ctx context.Context, operation *types.Operation, action func(ctx context.Context, repository storage.Repository) (types.Object, error), resourceID string) (*web.Response, error) {
if err := c.checkAsyncSupport(); err != nil {
return nil, err
}
if err := c.scheduler.ScheduleAsyncStorageAction(ctx, operation, action); err != nil {
return nil, err
}
return util.NewLocationResponse(operation.GetID(), resourceID, c.resourceBaseURL)
}

// DeleteObjects handles the deletion of the objects specified in the request
func (c *BaseController) DeleteObjects(r *web.Request) (*web.Response, error) {
ctx := r.Context()
Expand Down Expand Up @@ -299,14 +277,13 @@ func (c *BaseController) DeleteSingleObject(r *web.Request) (*web.Response, erro
Context: c.prepareOperationContextByRequest(r),
}

if operation.Context.Async {
log.C(ctx).Debugf("Request will be executed asynchronously due to client request async=true")
return c.executeAsync(ctx, operation, action, objectID)
_, isAsync, err := c.scheduler.ScheduleStorageAction(ctx, operation, action, c.supportsAsync)
if err != nil {
return nil, err
}

log.C(ctx).Debugf("Request will be executed synchronously")
if _, err := c.scheduler.ScheduleSyncStorageAction(ctx, operation, action); err != nil {
return nil, util.HandleStorageError(err, c.objectType.String())
if isAsync {
return util.NewLocationResponse(operation.GetID(), operation.ResourceID, c.resourceBaseURL)
}

return util.NewJSONResponse(http.StatusOK, map[string]string{})
Expand Down Expand Up @@ -496,25 +473,13 @@ func (c *BaseController) PatchObject(r *web.Request) (*web.Response, error) {
Context: c.prepareOperationContextByRequest(r),
}

var object types.Object
if operation.Context.IsAsyncNotDefined {
log.C(ctx).Debugf("Request will be executed by broker response")
object, err = c.scheduler.ScheduleStorageAction(ctx, operation, action)
if err != nil {
return nil, err
}
if object == nil {
return util.NewLocationResponse(operation.GetID(), operation.ResourceID, c.resourceBaseURL)
}
} else if operation.Context.Async {
log.C(ctx).Debugf("Request will be executed asynchronously")
return c.executeAsync(ctx, operation, action, objFromDB.GetID())
} else {
log.C(ctx).Debugf("Request will be executed synchronously")
object, err = c.scheduler.ScheduleSyncStorageAction(ctx, operation, action)
if err != nil {
return nil, util.HandleStorageError(err, c.objectType.String())
}
object, isAsync, err := c.scheduler.ScheduleStorageAction(ctx, operation, action, c.supportsAsync)
if err != nil {
return nil, err
}

if isAsync {
return util.NewLocationResponse(operation.GetID(), operation.ResourceID, c.resourceBaseURL)
}

if err := attachLastOperation(ctx, object.GetID(), object, c.repository); err != nil {
Expand Down Expand Up @@ -680,17 +645,6 @@ func (c *BaseController) prepareOperationContextByRequest(r *web.Request) *types
return operationContext
}

func (c *BaseController) checkAsyncSupport() error {
if !c.supportsAsync {
return &util.HTTPError{
ErrorType: "InvalidRequest",
Description: fmt.Sprintf("requested %s api doesn't support asynchronous operations", c.objectType),
StatusCode: http.StatusBadRequest,
}
}
return nil
}

func generateTokenForItem(obj types.Object) string {
nextPageToken := obj.GetPagingSequence()
return base64.StdEncoding.EncodeToString([]byte(strconv.FormatInt(nextPageToken, 10)))
Expand Down
52 changes: 40 additions & 12 deletions operations/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,25 +63,53 @@ func NewScheduler(smCtx context.Context, repository storage.TransactionalReposit
}
}

func (s *Scheduler) ScheduleStorageAction(ctx context.Context, operation *types.Operation, action storageAction) (types.Object, error) {
createdObj, err := s.ScheduleSyncStorageAction(ctx, operation, action)
//Identifies the preferred execution mode and execute the storage action
func (s *Scheduler) ScheduleStorageAction(ctx context.Context, operation *types.Operation, action storageAction, isAsyncSupported bool) (types.Object, bool, error) {
var object types.Object
var err error

if err != nil {
return nil, err
}
if operation.Context.IsAsyncNotDefined && isAsyncSupported {
object, err = s.ScheduleSyncStorageAction(ctx, operation, action)

lastOperation, _, _, err := s.getResourceLastOperation(ctx, operation)
if err != nil {
return nil, err
if err != nil {
return nil, false, err
}

lastOperation, _, _, err := s.getResourceLastOperation(ctx, operation)
if err != nil {
return nil, false, err
}

if lastOperation.Reschedule {
if err := s.ScheduleAsyncStorageAction(ctx, operation, action); err != nil {
return nil, false, err
}
return nil, true, nil
}

return object, false, err
}

if lastOperation.Reschedule {
if operation.Context.Async {

if !isAsyncSupported {
return nil, false, &util.HTTPError{
ErrorType: "InvalidRequest",
Description: fmt.Sprintf("requested api doesn't support asynchronous operations"),
StatusCode: http.StatusBadRequest,
}
}

log.C(ctx).Debugf("Request will be executed asynchronously")
if err := s.ScheduleAsyncStorageAction(ctx, operation, action); err != nil {
return nil, err
return nil, true, err
}
return nil, nil
return nil, true, nil
}
return createdObj, nil

log.C(ctx).Debugf("Request will be executed synchronously")
object, err = s.ScheduleSyncStorageAction(ctx, operation, action)
return object, false, err
}

// ScheduleSyncStorageAction stores the job's Operation entity in DB and synchronously executes the CREATE/UPDATE/DELETE DB transaction
Expand Down
18 changes: 16 additions & 2 deletions storage/interceptors/smaap_service_binding_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func (i *ServiceBindingInterceptor) AroundTxDelete(f storage.InterceptDeleteArou
return err
}
} else if operation.InOrphanMitigationState() && operation.Context.ServiceInstanceID != "" {
// In case we don't have instance & use the operation context to perform orphan mitigation
// In case we don't have a binding & use the operation context to perform orphan mitigation
binding := types.ServiceBinding{}
binding.ServiceInstanceID = operation.Context.ServiceInstanceID
binding.ID = operation.ResourceID
Expand Down Expand Up @@ -295,6 +295,10 @@ func shouldRemoveResource(operation *types.Operation) bool {
return false
}
}
// Keep the instance in case the operation type is "DELETE" and the operation is rescheduled
if operation.Type == types.DELETE && operation.Reschedule {
return false
}
return true
}

Expand All @@ -309,6 +313,13 @@ func (i *ServiceBindingInterceptor) deleteSingleBinding(ctx context.Context, bin
return err
}

if operation.Reschedule {
if err := i.pollServiceBinding(ctx, osbClient, binding, instance, plan, operation, broker.ID, service.CatalogID, plan.CatalogID, operation.ExternalID, true); err != nil {
return err
}
return nil
}

var unbindResponse *osbc.UnbindResponse
if !operation.Reschedule {
unbindRequest := prepareUnbindRequest(instance, binding, service.CatalogID, plan.CatalogID, service.BindingsRetrievable)
Expand Down Expand Up @@ -341,6 +352,9 @@ func (i *ServiceBindingInterceptor) deleteSingleBinding(ctx context.Context, bin
log.C(ctx).Infof("Successful asynchronous unbind request %s to broker %s returned response %s",
logUnbindRequest(unbindRequest), broker.Name, logUnbindResponse(unbindResponse))
operation.Reschedule = true
if operation.Context.IsAsyncNotDefined {
operation.Context.Async = true
}
if operation.RescheduleTimestamp.IsZero() {
operation.RescheduleTimestamp = time.Now()
}
Expand All @@ -357,7 +371,7 @@ func (i *ServiceBindingInterceptor) deleteSingleBinding(ctx context.Context, bin
}
}

if operation.Reschedule {
if shouldStartPolling(operation) {
if err := i.pollServiceBinding(ctx, osbClient, binding, instance, plan, operation, broker.ID, service.CatalogID, plan.CatalogID, operation.ExternalID, true); err != nil {
return err
}
Expand Down
24 changes: 22 additions & 2 deletions storage/interceptors/smaap_service_instance_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,13 @@ func (i *ServiceInstanceInterceptor) deleteSingleInstance(ctx context.Context, i
log.C(ctx).Infof("Orphan mitigation in progress for instance with id %s and name %s triggered due to failure in operation %s", instance.ID, instance.Name, operation.Type)
}

if operation.Reschedule {
if err := i.pollServiceInstance(ctx, osbClient, instance, plan, operation, service.CatalogID, plan.CatalogID, true); err != nil {
return err
}
return nil
}

var deprovisionResponse *osbc.DeprovisionResponse
if !operation.Reschedule {
deprovisionRequest := prepareDeprovisionRequest(instance, service.CatalogID, plan.CatalogID)
Expand Down Expand Up @@ -461,6 +468,9 @@ func (i *ServiceInstanceInterceptor) deleteSingleInstance(ctx context.Context, i
log.C(ctx).Infof("Successful asynchronous deprovisioning request %s to broker %s returned response %s",
logDeprovisionRequest(deprovisionRequest), broker.Name, logDeprovisionResponse(deprovisionResponse))
operation.Reschedule = true
if operation.Context.IsAsyncNotDefined {
operation.Context.Async = true
}
if operation.RescheduleTimestamp.IsZero() {
operation.RescheduleTimestamp = time.Now()
}
Expand All @@ -477,7 +487,7 @@ func (i *ServiceInstanceInterceptor) deleteSingleInstance(ctx context.Context, i
}
}

if operation.Reschedule {
if shouldStartPolling(operation) {
if err := i.pollServiceInstance(ctx, osbClient, instance, plan, operation, service.CatalogID, plan.CatalogID, true); err != nil {
return err
}
Expand Down Expand Up @@ -609,7 +619,17 @@ func isUnreachableBroker(err error) bool {
}

func shouldStartPolling(operation *types.Operation) bool {
return !operation.Context.IsAsyncNotDefined && operation.Reschedule

// In case the operation not rescheduled, don't start polling
if !operation.Reschedule {
return false
}

if operation.Context != nil {
// The polling should start if the operation is rescheduled and (orphan mitigation state is in true state or async mode is defined)
return !operation.Context.IsAsyncNotDefined || operation.InOrphanMitigationState()
}
return true
}

func (i *ServiceInstanceInterceptor) processMaxPollingDurationElapsed(ctx context.Context, instance *types.ServiceInstance, plan *types.ServicePlan, operation *types.Operation, enableOrphanMitigation bool) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1743,7 +1743,7 @@ var _ = DescribeTestsFor(TestCase{
})

It("polling broker last operation until operation succeeds and eventually marks operation as success", func() {
resp := deleteBinding(ctx.SMWithOAuthForTenant, testCase.async, testCase.expectedDeleteSuccessStatusCode)
resp := deleteBinding(ctx.SMWithOAuthForTenant, testCase.async, testCase.responseByBrokerOrClientMode(testCase.expectedDeleteSuccessStatusCode, http.StatusAccepted))

bindingID, _ = VerifyOperationExists(ctx, resp.Header("Location").Raw(), OperationExpectations{
Category: types.DELETE,
Expand Down Expand Up @@ -1792,7 +1792,7 @@ var _ = DescribeTestsFor(TestCase{

When("orphan mitigation unbind synchronously succeeds", func() {
It("deletes the binding and marks the operation as success", func() {
resp := deleteBinding(newCtx.SMWithOAuthForTenant, testCase.async, testCase.expectedBrokerFailureStatusCode)
resp := deleteBinding(newCtx.SMWithOAuthForTenant, testCase.async, testCase.responseByBrokerOrClientMode(testCase.expectedBrokerFailureStatusCode, http.StatusAccepted))

bindingID, _ = VerifyOperationExists(newCtx, resp.Header("Location").Raw(), OperationExpectations{
Category: types.DELETE,
Expand Down Expand Up @@ -1825,7 +1825,7 @@ var _ = DescribeTestsFor(TestCase{
})

It("keeps the binding and marks the operation with deletion scheduled", func() {
resp := deleteBinding(newCtx.SMWithOAuthForTenant, testCase.async, testCase.expectedBrokerFailureStatusCode)
resp := deleteBinding(newCtx.SMWithOAuthForTenant, testCase.async, testCase.responseByBrokerOrClientMode(testCase.expectedBrokerFailureStatusCode, http.StatusAccepted))

bindingID, _ = VerifyOperationExists(newCtx, resp.Header("Location").Raw(), OperationExpectations{
Category: types.DELETE,
Expand Down Expand Up @@ -1855,7 +1855,7 @@ var _ = DescribeTestsFor(TestCase{

When("broker orphan mitigation unbind synchronously fails with an error that will continue further orphan mitigation and eventually succeed", func() {
It("deletes the binding and marks the operation that triggered the orphan mitigation as failed with no deletion scheduled and not reschedulable", func() {
resp := deleteBinding(newCtx.SMWithOAuthForTenant, testCase.async, testCase.expectedBrokerFailureStatusCode)
resp := deleteBinding(newCtx.SMWithOAuthForTenant, testCase.async, testCase.responseByBrokerOrClientMode(testCase.expectedBrokerFailureStatusCode, http.StatusAccepted))

bindingID, _ = VerifyOperationExists(newCtx, resp.Header("Location").Raw(), OperationExpectations{
Category: types.DELETE,
Expand Down Expand Up @@ -1956,7 +1956,7 @@ var _ = DescribeTestsFor(TestCase{
})

It("keeps polling and eventually deletes the binding and marks the operation as success", func() {
resp := deleteBinding(ctx.SMWithOAuthForTenant, testCase.async, testCase.expectedDeleteSuccessStatusCode)
resp := deleteBinding(ctx.SMWithOAuthForTenant, testCase.async, testCase.responseByBrokerOrClientMode(testCase.expectedDeleteSuccessStatusCode, http.StatusAccepted))

bindingID, _ = VerifyOperationExists(ctx, resp.Header("Location").Raw(), OperationExpectations{
Category: types.DELETE,
Expand All @@ -1980,7 +1980,7 @@ var _ = DescribeTestsFor(TestCase{
})

It("keeps polling and eventually deletes the binding and marks the operation as success", func() {
resp := deleteBinding(ctx.SMWithOAuthForTenant, testCase.async, testCase.expectedDeleteSuccessStatusCode)
resp := deleteBinding(ctx.SMWithOAuthForTenant, testCase.async, testCase.responseByBrokerOrClientMode(testCase.expectedDeleteSuccessStatusCode, http.StatusAccepted))

bindingID, _ = VerifyOperationExists(ctx, resp.Header("Location").Raw(), OperationExpectations{
Category: types.DELETE,
Expand All @@ -2005,7 +2005,7 @@ var _ = DescribeTestsFor(TestCase{

When("orphan mitigation unbind synchronously succeeds", func() {
It("deletes the binding and marks the operation as success", func() {
resp := deleteBinding(ctx.SMWithOAuthForTenant, testCase.async, testCase.expectedBrokerFailureStatusCode)
resp := deleteBinding(ctx.SMWithOAuthForTenant, testCase.async, testCase.responseByBrokerOrClientMode(testCase.expectedBrokerFailureStatusCode, http.StatusAccepted))

bindingID, _ = VerifyOperationExists(ctx, resp.Header("Location").Raw(), OperationExpectations{
Category: types.DELETE,
Expand Down Expand Up @@ -2038,7 +2038,7 @@ var _ = DescribeTestsFor(TestCase{
})

It("keeps the binding and marks the operation with deletion scheduled", func() {
resp := deleteBinding(ctx.SMWithOAuthForTenant, testCase.async, testCase.expectedBrokerFailureStatusCode)
resp := deleteBinding(ctx.SMWithOAuthForTenant, testCase.async, testCase.responseByBrokerOrClientMode(testCase.expectedBrokerFailureStatusCode, http.StatusAccepted))

bindingID, _ = VerifyOperationExists(ctx, resp.Header("Location").Raw(), OperationExpectations{
Category: types.DELETE,
Expand Down Expand Up @@ -2068,7 +2068,7 @@ var _ = DescribeTestsFor(TestCase{

When("broker orphan mitigation unbind synchronously fails with an error that will continue further orphan mitigation and eventually succeed", func() {
It("deletes the binding and marks the operation that triggered the orphan mitigation as failed with no deletion scheduled and not reschedulable", func() {
resp := deleteBinding(ctx.SMWithOAuthForTenant, testCase.async, testCase.expectedBrokerFailureStatusCode)
resp := deleteBinding(ctx.SMWithOAuthForTenant, testCase.async, testCase.responseByBrokerOrClientMode(testCase.expectedBrokerFailureStatusCode, http.StatusAccepted))

bindingID, _ = VerifyOperationExists(ctx, resp.Header("Location").Raw(), OperationExpectations{
Category: types.DELETE,
Expand Down Expand Up @@ -2106,7 +2106,7 @@ var _ = DescribeTestsFor(TestCase{
})

It("keeps the binding and stores the operation as reschedulable", func() {
resp := deleteBinding(ctx.SMWithOAuthForTenant, testCase.async, testCase.expectedBrokerFailureStatusCode)
resp := deleteBinding(ctx.SMWithOAuthForTenant, testCase.async, testCase.responseByBrokerOrClientMode(testCase.expectedBrokerFailureStatusCode, http.StatusAccepted))

bindingID, _ = VerifyOperationExists(ctx, resp.Header("Location").Raw(), OperationExpectations{
Category: types.DELETE,
Expand Down
Loading

0 comments on commit 4a19d22

Please sign in to comment.