From 5877552dfed01705fded596e20736cf18e52da76 Mon Sep 17 00:00:00 2001 From: Andrea Tarocchi Date: Fri, 21 Apr 2023 22:31:03 +0200 Subject: [PATCH] Implemented optimistic locking for Deployments connector updates/patch. --- .../internal/handlers/connector_admin.go | 4 +-- .../internal/handlers/connector_validation.go | 5 +-- .../internal/services/connector_cluster.go | 32 +++++++++++++++---- 3 files changed, 30 insertions(+), 11 deletions(-) diff --git a/internal/connector/internal/handlers/connector_admin.go b/internal/connector/internal/handlers/connector_admin.go index 7c42a62ed..6e31cc082 100644 --- a/internal/connector/internal/handlers/connector_admin.go +++ b/internal/connector/internal/handlers/connector_admin.go @@ -395,12 +395,11 @@ func (h *ConnectorAdminHandler) DeleteConnector(writer http.ResponseWriter, requ // check force flag to force deletion of connector and deployments if parseBoolParam(request.URL.Query().Get("force")) { - serviceError = h.ConnectorsService.ForceDelete(request.Context(), connectorId) + return nil, h.ConnectorsService.ForceDelete(request.Context(), connectorId) } else { ctx := request.Context() return nil, HandleConnectorDelete(ctx, h.ConnectorsService, h.NamespaceService, connectorId) } - return nil, serviceError }, } @@ -605,6 +604,7 @@ func (h *ConnectorAdminHandler) PatchConnectorDeployment(writer http.ResponseWri // Handle the fields that support being updated... var updatedDeployment dbapi.ConnectorDeployment updatedDeployment.ID = existingDeployment.ID + updatedDeployment.Version = existingDeployment.Version if len(resource.Spec.ShardMetadata) != 0 { // channel update updateRevision, err := workers.GetShardMetadataRevision(resource.Spec.ShardMetadata) diff --git a/internal/connector/internal/handlers/connector_validation.go b/internal/connector/internal/handlers/connector_validation.go index d0b5a97be..e5afa096b 100644 --- a/internal/connector/internal/handlers/connector_validation.go +++ b/internal/connector/internal/handlers/connector_validation.go @@ -2,9 +2,10 @@ package handlers import ( "context" + "strings" + "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/internal/connector/internal/api/dbapi" "k8s.io/apimachinery/pkg/util/validation" - "strings" "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/shared/utils/arrays" @@ -36,7 +37,7 @@ func connectorValidationFunction(connectorTypesService services.ConnectorTypesSe return func() *errors.ServiceError { ct, err := connectorTypesService.Get(*connectorTypeId) if err != nil { - return errors.BadRequest("YYY invalid connector type id %v : %s", connectorTypeId, err) + return errors.BadRequest("Invalid connector type id %v : %s", connectorTypeId, err) } if !arrays.Contains(ct.ChannelNames(), string(*channel)) { diff --git a/internal/connector/internal/services/connector_cluster.go b/internal/connector/internal/services/connector_cluster.go index edf2dfe6d..df921e3fe 100644 --- a/internal/connector/internal/services/connector_cluster.go +++ b/internal/connector/internal/services/connector_cluster.go @@ -396,11 +396,20 @@ func (k *connectorClusterService) GetClientID(clusterID string) (string, error) // SaveDeployment creates a connector deployment in the database func (k *connectorClusterService) SaveDeployment(ctx context.Context, resource *dbapi.ConnectorDeployment) *errors.ServiceError { dbConn := k.connectionFactory.New() + if resource.Version != 0 { + dbConn = dbConn.Where("version = ?", resource.Version) + } - if err := dbConn.Save(resource).Error; err != nil { + saveResult := dbConn.Save(resource) + if err := saveResult.Error; err != nil { return services.HandleCreateError(`Connector deployment`, err) } + if saveResult.RowsAffected == 0 { + return errors.Conflict("Optimistic locking: resource version changed from %v", resource.Version) + } + //refresh version + dbConn = k.connectionFactory.New() if err := dbConn.Where("id = ?", resource.ID).Select("version").First(&resource).Error; err != nil { return services.HandleGetError(`Connector deployment`, "id", resource.ID, err) } @@ -416,11 +425,20 @@ func (k *connectorClusterService) SaveDeployment(ctx context.Context, resource * func (k *connectorClusterService) UpdateDeployment(resource *dbapi.ConnectorDeployment) *errors.ServiceError { dbConn := k.connectionFactory.New() + + if resource.Version != 0 { + dbConn = dbConn.Where("version = ?", resource.Version) + } + updates := dbConn.Where("id = ?", resource.ID). Updates(resource) if err := updates.Error; err != nil { - return services.HandleUpdateError(`Connector namespace`, err) + return services.HandleUpdateError(`Connector deployment`, err) + } + if updates.RowsAffected == 0 { + return errors.Conflict("Optimistic locking: resource version changed from %v", resource.Version) } + return nil } @@ -494,7 +512,7 @@ func (k *connectorClusterService) ListConnectorDeployments(ctx context.Context, func (k *connectorClusterService) UpdateConnectorDeploymentStatus(ctx context.Context, deploymentStatus dbapi.ConnectorDeploymentStatus) *errors.ServiceError { dbConn := k.connectionFactory.New() - // lets get the connector id of the deployment.. + // let's get the connector id of the deployment... deployment := dbapi.ConnectorDeployment{} if err := dbConn.Unscoped().Select("connector_id", "deleted_at"). Where("id = ?", deploymentStatus.ID). @@ -505,7 +523,7 @@ func (k *connectorClusterService) UpdateConnectorDeploymentStatus(ctx context.Co return services.HandleGoneError("Connector deployment", "id", deploymentStatus.ID) } - if err := dbConn.Model(&deploymentStatus).Where("id = ? and version <= ?", deploymentStatus.ID, deploymentStatus.Version).Save(&deploymentStatus).Error; err != nil { + if err := dbConn.Where("id = ? and version <= ?", deploymentStatus.ID, deploymentStatus.Version).Save(&deploymentStatus).Error; err != nil { return errors.Conflict("failed to update deployment status: %s, probably a stale deployment status version was used: %d", err.Error(), deploymentStatus.Version) } @@ -531,8 +549,8 @@ func (k *connectorClusterService) UpdateConnectorDeploymentStatus(ctx context.Co } } - // update the connector status - if err := dbConn.Where("id = ?", deployment.ConnectorID).Updates(&connectorStatus).Error; err != nil { + // update the connector status, don't updated deleted statues + if err := dbConn.Where("deleted_at IS NULL AND id = ?", deployment.ConnectorID).Updates(&connectorStatus).Error; err != nil { return services.HandleUpdateError("Connector status", err) } @@ -564,7 +582,7 @@ func (k *connectorClusterService) FindAvailableNamespace(owner string, orgID str func (k *connectorClusterService) GetDeploymentByConnectorId(ctx context.Context, connectorID string) (resource dbapi.ConnectorDeployment, serr *errors.ServiceError) { - if err := k.connectionFactory.New().Preload(clause.Associations). + if err := k.connectionFactory.New(). Joins("Status").Joins("ConnectorShardMetadata").Joins("Connector"). Where("connector_id = ?", connectorID).First(&resource).Error; err != nil { return resource, services.HandleGetError("Connector deployment", "connector_id", connectorID, err)