Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connectors flaky tests fix #1717

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
4 changes: 2 additions & 2 deletions internal/connector/internal/handlers/connector_admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,12 +395,11 @@ func (h *ConnectorAdminHandler) DeleteConnector(writer http.ResponseWriter, requ

// check force flag to force deletion of connector and deployments
if parseBoolParam(request.URL.Query().Get("force")) {
serviceError = h.ConnectorsService.ForceDelete(request.Context(), connectorId)
return nil, h.ConnectorsService.ForceDelete(request.Context(), connectorId)
} else {
ctx := request.Context()
return nil, HandleConnectorDelete(ctx, h.ConnectorsService, h.NamespaceService, connectorId)
}
return nil, serviceError
},
}

Expand Down Expand Up @@ -605,6 +604,7 @@ func (h *ConnectorAdminHandler) PatchConnectorDeployment(writer http.ResponseWri
// Handle the fields that support being updated...
var updatedDeployment dbapi.ConnectorDeployment
updatedDeployment.ID = existingDeployment.ID
updatedDeployment.Version = existingDeployment.Version
if len(resource.Spec.ShardMetadata) != 0 {
// channel update
updateRevision, err := workers.GetShardMetadataRevision(resource.Spec.ShardMetadata)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package handlers

import (
"context"
"strings"

"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/internal/connector/internal/api/dbapi"
"k8s.io/apimachinery/pkg/util/validation"
"strings"

"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/shared/utils/arrays"

Expand Down Expand Up @@ -36,7 +37,7 @@ func connectorValidationFunction(connectorTypesService services.ConnectorTypesSe
return func() *errors.ServiceError {
ct, err := connectorTypesService.Get(*connectorTypeId)
if err != nil {
return errors.BadRequest("YYY invalid connector type id %v : %s", connectorTypeId, err)
return errors.BadRequest("Invalid connector type id %v : %s", connectorTypeId, err)
}

if !arrays.Contains(ct.ChannelNames(), string(*channel)) {
Expand Down
32 changes: 25 additions & 7 deletions internal/connector/internal/services/connector_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,11 +396,20 @@ func (k *connectorClusterService) GetClientID(clusterID string) (string, error)
// SaveDeployment creates a connector deployment in the database
func (k *connectorClusterService) SaveDeployment(ctx context.Context, resource *dbapi.ConnectorDeployment) *errors.ServiceError {
dbConn := k.connectionFactory.New()
if resource.Version != 0 {
dbConn = dbConn.Where("version = ?", resource.Version)
}

if err := dbConn.Save(resource).Error; err != nil {
saveResult := dbConn.Save(resource)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I worry about this.

See https://github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pull/1637/files#diff-aee57c287964506015af5e0e2839d1e78a19635c9ce232369e8be08a91101d89R508-R517

The behaviour of Save has changed in newer versions of gorm.

I believe PR #1717 and #1637 are effectively in a "race condition".

We should agree which should merge first and then check the other is unaffected.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As stated in the more general comment I would vote for merging this one since it fixes tests and current behavior and then reason about how to deal with new gorm version behavior.

if err := saveResult.Error; err != nil {
return services.HandleCreateError(`Connector deployment`, err)
}
if saveResult.RowsAffected == 0 {
return errors.Conflict("Optimistic locking: resource version changed from %v", resource.Version)
}

//refresh version
dbConn = k.connectionFactory.New()
if err := dbConn.Where("id = ?", resource.ID).Select("version").First(&resource).Error; err != nil {
return services.HandleGetError(`Connector deployment`, "id", resource.ID, err)
}
Expand All @@ -416,11 +425,20 @@ func (k *connectorClusterService) SaveDeployment(ctx context.Context, resource *

func (k *connectorClusterService) UpdateDeployment(resource *dbapi.ConnectorDeployment) *errors.ServiceError {
dbConn := k.connectionFactory.New()

if resource.Version != 0 {
dbConn = dbConn.Where("version = ?", resource.Version)
}

updates := dbConn.Where("id = ?", resource.ID).
Updates(resource)
if err := updates.Error; err != nil {
return services.HandleUpdateError(`Connector namespace`, err)
return services.HandleUpdateError(`Connector deployment`, err)
}
if updates.RowsAffected == 0 {
return errors.Conflict("Optimistic locking: resource version changed from %v", resource.Version)
}

return nil
}

Expand Down Expand Up @@ -494,7 +512,7 @@ func (k *connectorClusterService) ListConnectorDeployments(ctx context.Context,
func (k *connectorClusterService) UpdateConnectorDeploymentStatus(ctx context.Context, deploymentStatus dbapi.ConnectorDeploymentStatus) *errors.ServiceError {
dbConn := k.connectionFactory.New()

// lets get the connector id of the deployment..
// let's get the connector id of the deployment...
deployment := dbapi.ConnectorDeployment{}
if err := dbConn.Unscoped().Select("connector_id", "deleted_at").
Where("id = ?", deploymentStatus.ID).
Expand All @@ -505,7 +523,7 @@ func (k *connectorClusterService) UpdateConnectorDeploymentStatus(ctx context.Co
return services.HandleGoneError("Connector deployment", "id", deploymentStatus.ID)
}

if err := dbConn.Model(&deploymentStatus).Where("id = ? and version <= ?", deploymentStatus.ID, deploymentStatus.Version).Save(&deploymentStatus).Error; err != nil {
if err := dbConn.Where("id = ? and version <= ?", deploymentStatus.ID, deploymentStatus.Version).Save(&deploymentStatus).Error; err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I just removed the use of Model() with Save() as is not supported and can lead to undefined behavior (as per documentation first NOTE: https://gorm.io/docs/update.html)

So I believe we can merge this one and then reason about the whole // See https://github.com/go-gorm/gorm/issues/6139 // We can no longer rely on the Save throwing a constraint violation exception when upserting a record that already exists. // By first checking that the record existences we ensure the Save only updates an existing record vs trying to create a duplicate.

return errors.Conflict("failed to update deployment status: %s, probably a stale deployment status version was used: %d", err.Error(), deploymentStatus.Version)
}

Expand All @@ -531,8 +549,8 @@ func (k *connectorClusterService) UpdateConnectorDeploymentStatus(ctx context.Co
}
}

// update the connector status
if err := dbConn.Where("id = ?", deployment.ConnectorID).Updates(&connectorStatus).Error; err != nil {
// update the connector status, don't updated deleted statues
if err := dbConn.Where("deleted_at IS NULL AND id = ?", deployment.ConnectorID).Updates(&connectorStatus).Error; err != nil {
return services.HandleUpdateError("Connector status", err)
}

Expand Down Expand Up @@ -564,7 +582,7 @@ func (k *connectorClusterService) FindAvailableNamespace(owner string, orgID str

func (k *connectorClusterService) GetDeploymentByConnectorId(ctx context.Context, connectorID string) (resource dbapi.ConnectorDeployment, serr *errors.ServiceError) {

if err := k.connectionFactory.New().Preload(clause.Associations).
if err := k.connectionFactory.New().
Joins("Status").Joins("ConnectorShardMetadata").Joins("Connector").
Where("connector_id = ?", connectorID).First(&resource).Error; err != nil {
return resource, services.HandleGetError("Connector deployment", "connector_id", connectorID, err)
Expand Down
17 changes: 10 additions & 7 deletions internal/connector/internal/services/connector_namespaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package services
import (
"context"
"fmt"
"gorm.io/gorm/clause"
"math/rand"
"strings"
"time"

"gorm.io/gorm/clause"

"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/internal/connector/internal/profiles"

"reflect"
Expand Down Expand Up @@ -214,12 +215,14 @@ func (k *connectorNamespaceService) setConnectorsDeployed(namespaces dbapi.Conne
Id string
Count int32
}, 0)
if err := k.connectionFactory.New().Model(&dbapi.ConnectorDeployment{}).
Select("namespace_id as id, count(*) as count").
Group("namespace_id").
Where("namespace_id in ?", ids).
Find(&result).Error; err != nil {
return services.HandleGetError(`Connector namespace`, `id`, ids, err)
if err := k.connectionFactory.New().
Table("connector_namespaces").
Joins("left join connector_deployments on connector_namespaces.id = namespace_id").
Select("connector_namespaces.id as id, count(*) as count").
Group("connector_namespaces.id").
Where("namespace_id is not null and connector_deployments.deleted_at is null and connector_namespaces.id in ?", ids).
Scan(&result).Error; err != nil {
return services.HandleGetError(`Connector namespace deployment count`, `namespaces ids`, ids, err)
}

// set counts for non-empty ns
Expand Down
16 changes: 11 additions & 5 deletions internal/connector/internal/services/connectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import (
"context"
"encoding/base64"
"fmt"
"gorm.io/gorm/clause"
"regexp"
"strings"

"gorm.io/gorm/clause"

"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/shared"

"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/internal/connector/internal/api/dbapi"
Expand Down Expand Up @@ -35,7 +36,7 @@ type ConnectorsService interface {
Update(ctx context.Context, resource *dbapi.Connector) *errors.ServiceError
SaveStatus(ctx context.Context, resource dbapi.ConnectorStatus) *errors.ServiceError
Delete(ctx context.Context, id string) *errors.ServiceError
ForEach(f func(*dbapi.Connector) *errors.ServiceError, query string, args ...interface{}) []error
ForEach(f func(*dbapi.Connector) *errors.ServiceError, joins string, query string, args ...interface{}) []error
ForceDelete(ctx context.Context, id string) *errors.ServiceError

ResolveConnectorRefsWithBase64Secrets(resource *dbapi.Connector) (bool, *errors.ServiceError)
Expand Down Expand Up @@ -351,16 +352,21 @@ func (k *connectorsService) Update(ctx context.Context, resource *dbapi.Connecto
return nil
}

func (k *connectorsService) SaveStatus(ctx context.Context, resource dbapi.ConnectorStatus) *errors.ServiceError {
func (k *connectorsService) SaveStatus(ctx context.Context, connectorStatus dbapi.ConnectorStatus) *errors.ServiceError {
dbConn := k.connectionFactory.New()
if err := dbConn.Model(resource).Save(resource).Error; err != nil {
if err := dbConn.Where("deleted_at IS NULL").Save(&connectorStatus).Error; err != nil {
valdar marked this conversation as resolved.
Show resolved Hide resolved
return errors.GeneralError("failed to update: %s", err.Error())
}
return nil
}

func (k *connectorsService) ForEach(f func(*dbapi.Connector) *errors.ServiceError, query string, args ...interface{}) []error {
func (k *connectorsService) ForEach(f func(*dbapi.Connector) *errors.ServiceError, joins string, query string, args ...interface{}) []error {
dbConn := k.connectionFactory.New()

if joins != "" {
dbConn = dbConn.Joins(joins)
}

rows, err := dbConn.
Model(&dbapi.Connector{}).
Where(query, args...).
Expand Down
29 changes: 8 additions & 21 deletions internal/connector/internal/workers/connector_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ type ConnectorManager struct {
connectorClusterService services.ConnectorClusterService
connectorTypesService services.ConnectorTypesService
vaultService vault.VaultService
lastVersion int64
db *db.ConnectionFactory
ctx context.Context
}
Expand Down Expand Up @@ -79,26 +78,25 @@ func (k *ConnectorManager) Reconcile() []error {
}

// reconcile assigning connectors in "ready" desired state with "assigning" phase and a valid namespace id
k.doReconcile(&errs, "assigning", k.reconcileAssigning,
k.doReconcile(&errs, "assigning", k.reconcileAssigning, "",
"desired_state = ? AND phase = ? AND connectors.namespace_id IS NOT NULL", dbapi.ConnectorReady, dbapi.ConnectorStatusPhaseAssigning)

// reconcile unassigned connectors in "unassigned" desired state and "deleted" phase
k.doReconcile(&errs, "unassigned", k.reconcileUnassigned,
k.doReconcile(&errs, "unassigned", k.reconcileUnassigned, "",
"desired_state = ? AND phase = ?", dbapi.ConnectorUnassigned, dbapi.ConnectorStatusPhaseDeleted)

// reconcile deleting connectors with no deployments
k.doReconcile(&errs, "deleting", k.reconcileDeleting,
k.doReconcile(&errs, "deleting", k.reconcileDeleting, "",
"desired_state = ? AND phase = ?", dbapi.ConnectorDeleted, dbapi.ConnectorStatusPhaseDeleting)

// reconcile deleted connectors with no deployments
k.doReconcile(&errs, "deleted", k.reconcileDeleted,
k.doReconcile(&errs, "deleted", k.reconcileDeleted, "",
"desired_state = ? AND phase IN ?", dbapi.ConnectorDeleted,
[]string{string(dbapi.ConnectorStatusPhaseAssigning), string(dbapi.ConnectorStatusPhaseDeleted)})

// reconcile connector updates for assigned connectors that aren't being deleted...
k.doReconcile(&errs, "updated", k.reconcileConnectorUpdate,
"version > ? AND phase NOT IN ?", k.lastVersion,
[]string{string(dbapi.ConnectorStatusPhaseAssigning), string(dbapi.ConnectorStatusPhaseDeleting), string(dbapi.ConnectorStatusPhaseDeleted)})
k.doReconcile(&errs, "updated", k.reconcileConnectorUpdate, "LEFT JOIN connector_deployments ON connectors.id = connector_deployments.connector_id",
"connectors.version <> connector_deployments.connector_version AND phase NOT IN ?", []string{string(dbapi.ConnectorStatusPhaseAssigning), string(dbapi.ConnectorStatusPhaseDeleting), string(dbapi.ConnectorStatusPhaseDeleted)})

return errs
}
Expand Down Expand Up @@ -248,21 +246,10 @@ func (k *ConnectorManager) reconcileConnectorUpdate(ctx context.Context, connect
}
}

if cerr := db.AddPostCommitAction(ctx, func() {
k.lastVersion = connector.Version
}); cerr != nil {
glog.Errorf("Failed to AddPostCommitAction to save lastVersion %d: %v", connector.Version, cerr.Error())
if err == nil {
err = cerr
} else {
err = errors.Errorf("Multiple errors in reconciling connector %s: %s; %s", connector.ID, err, cerr)
}
}

return err
}

func (k *ConnectorManager) doReconcile(errs *[]error, reconcilePhase string, reconcileFunc func(ctx context.Context, connector *dbapi.Connector) error, query string, args ...interface{}) {
func (k *ConnectorManager) doReconcile(errs *[]error, reconcilePhase string, reconcileFunc func(ctx context.Context, connector *dbapi.Connector) error, joins string, query string, args ...interface{}) {
var count int64
var serviceErrs []error
glog.V(5).Infof("Reconciling %s connectors...", reconcilePhase)
Expand All @@ -276,7 +263,7 @@ func (k *ConnectorManager) doReconcile(errs *[]error, reconcilePhase string, rec
count++
return nil
})
}, query, args...); len(serviceErrs) > 0 {
}, joins, query, args...); len(serviceErrs) > 0 {
*errs = append(*errs, serviceErrs...)
}
if count == 0 && len(serviceErrs) == 0 {
Expand Down
16 changes: 12 additions & 4 deletions internal/connector/test/integration/cucumber_steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/url"
"time"

"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/errors"
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/shared"
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/shared/utils/arrays"
"github.com/golang/glog"
Expand Down Expand Up @@ -38,7 +39,7 @@ func (s *extender) iResetTheVaultCounters() error {
}

func (s *extender) theVaultDeleteCounterShouldBe(expected int64) error {
// we can only check the delete count on the TmpVault service impl...
// we can only check delete count on the TmpVault service impl...
var service vault.VaultService
if err := s.Suite.Helper.Env.ServiceContainer.Resolve(&service); err != nil {
return err
Expand Down Expand Up @@ -106,19 +107,27 @@ func (s *extender) getAndStoreAccessTokenUsingTheAddonParameterResponseAs(as str

const clientIdList = "_client_id_list"

func (s *extender) deleteKeycloakClients(sc *godog.Scenario, err error) {
func (s *extender) deleteKeycloakClients() error {

if clientIds, ok := s.Variables[clientIdList].([]string); ok {
env := s.Suite.Helper.Env
var keycloakService sso.KafkaKeycloakService
env.MustResolve(&keycloakService)

var collectedErrors errors.ErrorList
for _, clientID := range clientIds {
if err := keycloakService.DeleteServiceAccountInternal(clientID); err != nil {
glog.Errorf("Error deleting keycloak client with clientId %s: %s", clientID, err)
collectedErrors.AddErrors(err)
}
}

if !collectedErrors.IsEmpty() {
return collectedErrors
}

}
return nil
}

func (s *extender) rememberKeycloakClientForCleanup(clientID string) error {
Expand Down Expand Up @@ -201,8 +210,7 @@ func init() {
ctx.Step(`^I delete or deprecate types not in latest connector catalog$`, e.iDeleteOrDeprecateRemovedTypes)

ctx.After(func(ctx context.Context, sc *godog.Scenario, err error) (context.Context, error) {
e.deleteKeycloakClients(sc, err)
return ctx, err
return ctx, e.deleteKeycloakClients()
})
})
}
14 changes: 6 additions & 8 deletions internal/connector/test/integration/feature_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,17 +116,15 @@ func TestFeatures(t *testing.T) {
testName = strings.ReplaceAll(testName, "-", "_")

t.Run(testName, func(t *testing.T) {
// To preserve the current behavior, the test are market to be "safely" run in parallel, however
// we may think to introduce a new naming convention i.e. files that ends with _parallel would
// cause t.Parallel() to be invoked, other tests won't, so they won't be executed concurrently.
//
// This could help reducing/removing the need of explicit lock
t.Parallel()
// The assumption is that features run sequentially and scenario in a feature runs concurrently
valdar marked this conversation as resolved.
Show resolved Hide resolved
// Running features in parallel brakes locking steps and test users session mechanisms
//t.Parallel()

o := opts
o.TestingT = t
o.Paths = []string{path.Join(root, info.Name())}
//o.Randomize = -1
o.Randomize = -1
o.StopOnFailure = true
_, exists := os.LookupEnv("GODOG_NO_COLORS")
if exists {
o.NoColors = true
Expand All @@ -135,7 +133,7 @@ func TestFeatures(t *testing.T) {
s := cucumber.NewTestSuite(helper)

status := godog.TestSuite{
Name: "connectors",
Name: "connectors-" + testName,
Options: &o,
ScenarioInitializer: s.InitializeScenario,
}.Run()
Expand Down
Loading