Skip to content

Commit

Permalink
fix: add transactions for numbers
Browse files Browse the repository at this point in the history
Signed-off-by: Vladislav Sukhin <[email protected]>
  • Loading branch information
vsukhin committed Aug 5, 2024
1 parent c25e05d commit 00b6f5c
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 46 deletions.
4 changes: 2 additions & 2 deletions pkg/cloud/data/testworkflow/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ func (r *CloudRepository) GetExecutionsSummary(ctx context.Context, filter testw
return pass(r.executor, ctx, req, process)
}

func (r *CloudRepository) Insert(ctx context.Context, result testkube.TestWorkflowExecution) (err error) {
req := ExecutionInsertRequest{WorkflowExecution: result}
func (r *CloudRepository) Insert(ctx context.Context, result *testkube.TestWorkflowExecution) (err error) {
req := ExecutionInsertRequest{WorkflowExecution: *result}
return passNoContent(r.executor, ctx, req)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/repository/testworkflow/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type Repository interface {
// GetPreviousFinishedState gets previous finished execution state by test
GetPreviousFinishedState(ctx context.Context, testName string, date time.Time) (testkube.TestWorkflowStatus, error)
// Insert inserts new execution result
Insert(ctx context.Context, result testkube.TestWorkflowExecution) error
Insert(ctx context.Context, result *testkube.TestWorkflowExecution) error
// Update updates execution
Update(ctx context.Context, result testkube.TestWorkflowExecution) error
// UpdateResult updates execution result
Expand Down
2 changes: 1 addition & 1 deletion pkg/repository/testworkflow/mock_repository.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 33 additions & 2 deletions pkg/repository/testworkflow/mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/writeconcern"

"github.com/kubeshop/testkube/pkg/api/v1/testkube"
"github.com/kubeshop/testkube/pkg/repository/sequence"
Expand Down Expand Up @@ -268,12 +269,42 @@ func (r *MongoRepository) GetExecutionsSummary(ctx context.Context, filter Filte
return
}

func (r *MongoRepository) Insert(ctx context.Context, result testkube.TestWorkflowExecution) (err error) {
func (r *MongoRepository) Insert(ctx context.Context, result *testkube.TestWorkflowExecution) (err error) {
result.EscapeDots()
if result.Reports == nil {
result.Reports = []testkube.TestWorkflowReport{}
}
_, err = r.Coll.InsertOne(ctx, result)

wc := writeconcern.New(writeconcern.WMajority())
txnOptions := options.Transaction().SetWriteConcern(wc)
session, err := r.db.Client().StartSession()
if err != nil {
return err
}
defer session.EndSession(ctx)

_, err = session.WithTransaction(ctx, func(sessCtx mongo.SessionContext) (interface{}, error) {
number, err := r.sequenceRepository.GetNextExecutionNumber(sessCtx, result.Workflow.Name, sequence.ExecutionTypeTestWorkflow)
if err != nil {
return nil, err
}

result.Number = number
if result.Name == "" {
result.Name = fmt.Sprintf("%s-%d", result.Workflow.Name, result.Number)
}

// Ensure it is unique name
// TODO: Consider if we shouldn't make name unique across all TestWorkflows
next, _ := r.GetByNameAndTestWorkflow(sessCtx, result.Name, result.Workflow.Name)
if next.Name == result.Name {
return nil, errors.New("execution name already exists")
}

_, err = r.Coll.InsertOne(sessCtx, result)
return nil, err
}, txnOptions)

return
}

Expand Down
6 changes: 4 additions & 2 deletions pkg/repository/testworkflow/mongo_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"go.mongodb.org/mongo-driver/mongo/options"

"github.com/kubeshop/testkube/pkg/api/v1/testkube"
"github.com/kubeshop/testkube/pkg/repository/sequence"
)

var (
Expand All @@ -32,13 +33,14 @@ func TestNewMongoRepository_UpdateReport_Integration(t *testing.T) {
db.Drop(ctx)
})

repo := NewMongoRepository(db, false)
seq := sequence.NewMongoRepository(db)
repo := NewMongoRepository(db, false, WithMongoRepositorySequence(seq))

execution := testkube.TestWorkflowExecution{
Id: "test-id",
Name: "test-name",
}
if err := repo.Insert(ctx, execution); err != nil {
if err := repo.Insert(ctx, &execution); err != nil {
t.Fatalf("error inserting execution: %v", err)
}

Expand Down
63 changes: 25 additions & 38 deletions pkg/testworkflows/testworkflowexecutor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,69 +464,56 @@ func (e *executor) Execute(ctx context.Context, workflow testworkflowsv1.TestWor
return execution, errors.Wrap(err, "processing error")
}

// Load execution identifier data
number, err := e.repository.GetNextExecutionNumber(context.Background(), workflow.Name)
if err != nil {
log.DefaultLogger.Errorw("failed to retrieve TestWorkflow execution number", "id", id, "error", err)
}

executionName := request.Name
if executionName == "" {
executionName = fmt.Sprintf("%s-%d", workflow.Name, number)
}

testWorkflowExecutionName := request.TestWorkflowExecutionName
// Ensure it is unique name
// TODO: Consider if we shouldn't make name unique across all TestWorkflows
next, _ := e.repository.GetByNameAndTestWorkflow(ctx, executionName, workflow.Name)
if next.Name == executionName {
return execution, errors.Wrap(err, "execution name already exists")
}

// Build machine with actual execution data
executionMachine := expressions.NewMachine().Register("execution", map[string]interface{}{
"id": id,
"name": executionName,
"number": number,
"scheduledAt": now.UTC().Format(constants.RFC3339Millis),
"disableWebhooks": request.DisableWebhooks,
})

// Process the TestWorkflow
bundle, err := e.processor.Bundle(ctx, &workflow, machine, executionMachine)
if err != nil {
return execution, errors.Wrap(err, "processing error")
}

// Build Execution entity
// TODO: Consider storing "config" as well
execution = testkube.TestWorkflowExecution{
Id: id,
Name: executionName,
Name: request.Name,
Namespace: namespace,
Number: number,
ScheduledAt: now,
StatusAt: now,
Signature: stage.MapSignatureListToInternal(bundle.Signature),
Result: &testkube.TestWorkflowResult{
Status: common.Ptr(testkube.QUEUED_TestWorkflowStatus),
PredictedStatus: common.Ptr(testkube.PASSED_TestWorkflowStatus),
Initialization: &testkube.TestWorkflowStepResult{
Status: common.Ptr(testkube.QUEUED_TestWorkflowStepStatus),
},
Steps: stage.MapSignatureListToStepResults(bundle.Signature),
},
Output: []testkube.TestWorkflowOutput{},
Workflow: testworkflowmappers.MapKubeToAPI(initialWorkflow),
ResolvedWorkflow: testworkflowmappers.MapKubeToAPI(resolvedWorkflow),
TestWorkflowExecutionName: testWorkflowExecutionName,
DisableWebhooks: request.DisableWebhooks,
}
err = e.repository.Insert(ctx, execution)
err = e.repository.Insert(ctx, &execution)
if err != nil {
return execution, errors.Wrap(err, "inserting execution to storage")
}

// Build machine with actual execution data
executionMachine := expressions.NewMachine().Register("execution", map[string]interface{}{
"id": id,
"name": execution.Name,
"number": execution.Number,
"scheduledAt": now.UTC().Format(constants.RFC3339Millis),
"disableWebhooks": request.DisableWebhooks,
})

// Process the TestWorkflow
bundle, err := e.processor.Bundle(ctx, &workflow, machine, executionMachine)
if err != nil {
return execution, errors.Wrap(err, "processing error")
}

execution.Signature = stage.MapSignatureListToInternal(bundle.Signature)
execution.Result.Steps = stage.MapSignatureListToStepResults(bundle.Signature)

err = e.repository.Update(ctx, execution)
if err != nil {
return execution, errors.Wrap(err, "updating execution to storage")
}

// Inform about execution start
e.emitter.Notify(testkube.NewEventQueueTestWorkflow(&execution))

Expand Down

0 comments on commit 00b6f5c

Please sign in to comment.