Skip to content

Commit

Permalink
Implemented optimistic locking for Deployments connector updates/patch.
Browse files Browse the repository at this point in the history
  • Loading branch information
valdar committed Apr 21, 2023
1 parent 8c8e09d commit 5877552
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 11 deletions.
4 changes: 2 additions & 2 deletions internal/connector/internal/handlers/connector_admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,12 +395,11 @@ func (h *ConnectorAdminHandler) DeleteConnector(writer http.ResponseWriter, requ

// check force flag to force deletion of connector and deployments
if parseBoolParam(request.URL.Query().Get("force")) {
serviceError = h.ConnectorsService.ForceDelete(request.Context(), connectorId)
return nil, h.ConnectorsService.ForceDelete(request.Context(), connectorId)
} else {
ctx := request.Context()
return nil, HandleConnectorDelete(ctx, h.ConnectorsService, h.NamespaceService, connectorId)
}
return nil, serviceError
},
}

Expand Down Expand Up @@ -605,6 +604,7 @@ func (h *ConnectorAdminHandler) PatchConnectorDeployment(writer http.ResponseWri
// Handle the fields that support being updated...
var updatedDeployment dbapi.ConnectorDeployment
updatedDeployment.ID = existingDeployment.ID
updatedDeployment.Version = existingDeployment.Version
if len(resource.Spec.ShardMetadata) != 0 {
// channel update
updateRevision, err := workers.GetShardMetadataRevision(resource.Spec.ShardMetadata)
Expand Down
5 changes: 3 additions & 2 deletions internal/connector/internal/handlers/connector_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package handlers

import (
"context"
"strings"

"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/internal/connector/internal/api/dbapi"
"k8s.io/apimachinery/pkg/util/validation"
"strings"

"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/shared/utils/arrays"

Expand Down Expand Up @@ -36,7 +37,7 @@ func connectorValidationFunction(connectorTypesService services.ConnectorTypesSe
return func() *errors.ServiceError {
ct, err := connectorTypesService.Get(*connectorTypeId)
if err != nil {
return errors.BadRequest("YYY invalid connector type id %v : %s", connectorTypeId, err)
return errors.BadRequest("Invalid connector type id %v : %s", connectorTypeId, err)
}

if !arrays.Contains(ct.ChannelNames(), string(*channel)) {
Expand Down
32 changes: 25 additions & 7 deletions internal/connector/internal/services/connector_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,11 +396,20 @@ func (k *connectorClusterService) GetClientID(clusterID string) (string, error)
// SaveDeployment creates a connector deployment in the database
func (k *connectorClusterService) SaveDeployment(ctx context.Context, resource *dbapi.ConnectorDeployment) *errors.ServiceError {
dbConn := k.connectionFactory.New()
if resource.Version != 0 {
dbConn = dbConn.Where("version = ?", resource.Version)
}

if err := dbConn.Save(resource).Error; err != nil {
saveResult := dbConn.Save(resource)
if err := saveResult.Error; err != nil {
return services.HandleCreateError(`Connector deployment`, err)
}
if saveResult.RowsAffected == 0 {
return errors.Conflict("Optimistic locking: resource version changed from %v", resource.Version)
}

//refresh version
dbConn = k.connectionFactory.New()
if err := dbConn.Where("id = ?", resource.ID).Select("version").First(&resource).Error; err != nil {
return services.HandleGetError(`Connector deployment`, "id", resource.ID, err)
}
Expand All @@ -416,11 +425,20 @@ func (k *connectorClusterService) SaveDeployment(ctx context.Context, resource *

func (k *connectorClusterService) UpdateDeployment(resource *dbapi.ConnectorDeployment) *errors.ServiceError {
dbConn := k.connectionFactory.New()

if resource.Version != 0 {
dbConn = dbConn.Where("version = ?", resource.Version)
}

updates := dbConn.Where("id = ?", resource.ID).
Updates(resource)
if err := updates.Error; err != nil {
return services.HandleUpdateError(`Connector namespace`, err)
return services.HandleUpdateError(`Connector deployment`, err)
}
if updates.RowsAffected == 0 {
return errors.Conflict("Optimistic locking: resource version changed from %v", resource.Version)
}

return nil
}

Expand Down Expand Up @@ -494,7 +512,7 @@ func (k *connectorClusterService) ListConnectorDeployments(ctx context.Context,
func (k *connectorClusterService) UpdateConnectorDeploymentStatus(ctx context.Context, deploymentStatus dbapi.ConnectorDeploymentStatus) *errors.ServiceError {
dbConn := k.connectionFactory.New()

// lets get the connector id of the deployment..
// let's get the connector id of the deployment...
deployment := dbapi.ConnectorDeployment{}
if err := dbConn.Unscoped().Select("connector_id", "deleted_at").
Where("id = ?", deploymentStatus.ID).
Expand All @@ -505,7 +523,7 @@ func (k *connectorClusterService) UpdateConnectorDeploymentStatus(ctx context.Co
return services.HandleGoneError("Connector deployment", "id", deploymentStatus.ID)
}

if err := dbConn.Model(&deploymentStatus).Where("id = ? and version <= ?", deploymentStatus.ID, deploymentStatus.Version).Save(&deploymentStatus).Error; err != nil {
if err := dbConn.Where("id = ? and version <= ?", deploymentStatus.ID, deploymentStatus.Version).Save(&deploymentStatus).Error; err != nil {
return errors.Conflict("failed to update deployment status: %s, probably a stale deployment status version was used: %d", err.Error(), deploymentStatus.Version)
}

Expand All @@ -531,8 +549,8 @@ func (k *connectorClusterService) UpdateConnectorDeploymentStatus(ctx context.Co
}
}

// update the connector status
if err := dbConn.Where("id = ?", deployment.ConnectorID).Updates(&connectorStatus).Error; err != nil {
// update the connector status, don't updated deleted statues
if err := dbConn.Where("deleted_at IS NULL AND id = ?", deployment.ConnectorID).Updates(&connectorStatus).Error; err != nil {
return services.HandleUpdateError("Connector status", err)
}

Expand Down Expand Up @@ -564,7 +582,7 @@ func (k *connectorClusterService) FindAvailableNamespace(owner string, orgID str

func (k *connectorClusterService) GetDeploymentByConnectorId(ctx context.Context, connectorID string) (resource dbapi.ConnectorDeployment, serr *errors.ServiceError) {

if err := k.connectionFactory.New().Preload(clause.Associations).
if err := k.connectionFactory.New().
Joins("Status").Joins("ConnectorShardMetadata").Joins("Connector").
Where("connector_id = ?", connectorID).First(&resource).Error; err != nil {
return resource, services.HandleGetError("Connector deployment", "connector_id", connectorID, err)
Expand Down

0 comments on commit 5877552

Please sign in to comment.