Skip to content

Commit

Permalink
fix: add sequence for test workflows
Browse files Browse the repository at this point in the history
Signed-off-by: Vladislav Sukhin <[email protected]>
  • Loading branch information
vsukhin committed Jul 23, 2024
1 parent e0b39a4 commit 91636ba
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 12 deletions.
7 changes: 4 additions & 3 deletions cmd/api-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,10 @@ func main() {
mongoResultsRepository := result.NewMongoRepository(db, cfg.APIMongoAllowDiskUse, isDocDb, result.WithFeatureFlags(features),
result.WithLogsClient(logGrpcClient), result.WithMongoRepositorySequence(sequenceRepository))
resultsRepository = mongoResultsRepository
testResultsRepository = testresult.NewMongoRepository(db, cfg.APIMongoAllowDiskUse, isDocDb, testresult.WithMongoRepositorySequence(sequenceRepository))
testWorkflowResultsRepository = testworkflow2.NewMongoRepository(db, cfg.APIMongoAllowDiskUse)
testResultsRepository = testresult.NewMongoRepository(db, cfg.APIMongoAllowDiskUse, isDocDb,
testresult.WithMongoRepositorySequence(sequenceRepository))
testWorkflowResultsRepository = testworkflow2.NewMongoRepository(db, cfg.APIMongoAllowDiskUse,
testworkflow2.WithMongoRepositorySequence(sequenceRepository))
configRepository = configrepository.NewMongoRepository(db)
triggerLeaseBackend = triggers.NewMongoLeaseBackend(db)
minioClient := newStorageClient(cfg)
Expand Down Expand Up @@ -573,7 +575,6 @@ func main() {
testWorkflowTemplatesClient,
testWorkflowProcessor,
configMapConfig,
resultsRepository,
testWorkflowExecutionsClient,
testWorkflowsClient,
metrics,
Expand Down
4 changes: 4 additions & 0 deletions pkg/cloud/data/testworkflow/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,7 @@ func (r *CloudRepository) GetPreviousFinishedState(ctx context.Context, workflow
}
return commandResponse.Result, nil
}

func (r *CloudRepository) GetNextExecutionNumber(ctx context.Context, name string) (number int32, err error) {
return 0, nil
}
6 changes: 6 additions & 0 deletions pkg/repository/testworkflow/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Filter interface {

//go:generate mockgen -destination=./mock_repository.go -package=testworkflow "github.com/kubeshop/testkube/pkg/repository/testworkflow" Repository
type Repository interface {
Sequences
// Get gets execution result by id or name
Get(ctx context.Context, id string) (testkube.TestWorkflowExecution, error)
// GetByNameAndTestWorkflow gets execution result by name
Expand Down Expand Up @@ -68,6 +69,11 @@ type Repository interface {
GetTestWorkflowMetrics(ctx context.Context, name string, limit, last int) (metrics testkube.ExecutionsMetrics, err error)
}

type Sequences interface {
// GetNextExecutionNumber gets next execution number by name
GetNextExecutionNumber(ctx context.Context, name string) (number int32, err error)
}

//go:generate mockgen -destination=./mock_output_repository.go -package=testworkflow "github.com/kubeshop/testkube/pkg/repository/testworkflow" OutputRepository
type OutputRepository interface {
// PresignSaveLog builds presigned storage URL to save the output in Minio
Expand Down
15 changes: 15 additions & 0 deletions pkg/repository/testworkflow/mock_repository.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 42 additions & 3 deletions pkg/repository/testworkflow/mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package testworkflow

import (
"context"
"errors"
"fmt"
"strings"
"time"
Expand All @@ -14,6 +15,7 @@ import (
"go.mongodb.org/mongo-driver/mongo/options"

"github.com/kubeshop/testkube/pkg/api/v1/testkube"
"github.com/kubeshop/testkube/pkg/repository/sequence"
)

var _ Repository = (*MongoRepository)(nil)
Expand All @@ -35,9 +37,10 @@ func NewMongoRepository(db *mongo.Database, allowDiskUse bool, opts ...MongoRepo
}

type MongoRepository struct {
db *mongo.Database
Coll *mongo.Collection
allowDiskUse bool
db *mongo.Database
Coll *mongo.Collection
allowDiskUse bool
sequenceRepository sequence.Repository
}

func WithMongoRepositoryCollection(collection *mongo.Collection) MongoRepositoryOpt {
Expand All @@ -46,6 +49,12 @@ func WithMongoRepositoryCollection(collection *mongo.Collection) MongoRepository
}
}

func WithMongoRepositorySequence(sequenceRepository sequence.Repository) MongoRepositoryOpt {
return func(r *MongoRepository) {
r.sequenceRepository = sequenceRepository
}
}

type MongoRepositoryOpt func(*MongoRepository)

func (r *MongoRepository) Get(ctx context.Context, id string) (result testkube.TestWorkflowExecution, err error) {
Expand Down Expand Up @@ -360,12 +369,26 @@ func composeQueryAndOpts(filter Filter) (bson.M, *options.FindOptions) {

// DeleteByTestWorkflow deletes execution results by workflow
func (r *MongoRepository) DeleteByTestWorkflow(ctx context.Context, workflowName string) (err error) {
if r.sequenceRepository != nil {
err = r.sequenceRepository.DeleteExecutionNumber(ctx, workflowName, sequence.ExecutionTypeTestWorkflow)
if err != nil {
return
}
}

_, err = r.Coll.DeleteMany(ctx, bson.M{"workflow.name": workflowName})
return
}

// DeleteAll deletes all execution results
func (r *MongoRepository) DeleteAll(ctx context.Context) (err error) {
if r.sequenceRepository != nil {
err = r.sequenceRepository.DeleteAllExecutionNumbers(ctx, sequence.ExecutionTypeTestWorkflow)
if err != nil {
return
}
}

_, err = r.Coll.DeleteMany(ctx, bson.M{})
return
}
Expand All @@ -383,6 +406,13 @@ func (r *MongoRepository) DeleteByTestWorkflows(ctx context.Context, workflowNam

filter := bson.M{"$or": conditions}

if r.sequenceRepository != nil {
err = r.sequenceRepository.DeleteExecutionNumbers(ctx, workflowNames, sequence.ExecutionTypeTestSuite)
if err != nil {
return
}
}

_, err = r.Coll.DeleteMany(ctx, filter)
return
}
Expand Down Expand Up @@ -456,3 +486,12 @@ func (r *MongoRepository) GetPreviousFinishedState(ctx context.Context, testWork

return *result.Result.Status, nil
}

// GetNextExecutionNumber gets next execution number by name
func (r *MongoRepository) GetNextExecutionNumber(ctx context.Context, name string) (number int32, err error) {
if r.sequenceRepository != nil {
return 0, errors.New("no sequence repository provided")
}

return r.sequenceRepository.GetNextExecutionNumber(ctx, name, sequence.ExecutionTypeTestWorkflow)
}
11 changes: 5 additions & 6 deletions pkg/testworkflows/testworkflowexecutor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/kubeshop/testkube/pkg/log"
testworkflowmappers "github.com/kubeshop/testkube/pkg/mapper/testworkflows"
configRepo "github.com/kubeshop/testkube/pkg/repository/config"
"github.com/kubeshop/testkube/pkg/repository/result"
"github.com/kubeshop/testkube/pkg/repository/testworkflow"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowcontroller"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor"
Expand All @@ -51,7 +50,6 @@ type executor struct {
testWorkflowTemplatesClient testworkflowsclientv1.TestWorkflowTemplatesInterface
processor testworkflowprocessor.Processor
configMap configRepo.Repository
executionResults result.Repository
testWorkflowExecutionsClient testworkflowsclientv1.TestWorkflowExecutionsInterface
testWorkflowsClient testworkflowsclientv1.Interface
metrics v1.Metrics
Expand All @@ -73,7 +71,6 @@ func New(emitter *event.Emitter,
testWorkflowTemplatesClient testworkflowsclientv1.TestWorkflowTemplatesInterface,
processor testworkflowprocessor.Processor,
configMap configRepo.Repository,
executionResults result.Repository,
testWorkflowExecutionsClient testworkflowsclientv1.TestWorkflowExecutionsInterface,
testWorkflowsClient testworkflowsclientv1.Interface,
metrics v1.Metrics,
Expand All @@ -92,7 +89,6 @@ func New(emitter *event.Emitter,
testWorkflowTemplatesClient: testWorkflowTemplatesClient,
processor: processor,
configMap: configMap,
executionResults: executionResults,
testWorkflowExecutionsClient: testWorkflowExecutionsClient,
testWorkflowsClient: testWorkflowsClient,
metrics: metrics,
Expand Down Expand Up @@ -465,8 +461,11 @@ func (e *executor) Execute(ctx context.Context, workflow testworkflowsv1.TestWor
}

// Load execution identifier data
// TODO: Consider if that should not be shared (as now it is between Tests and Test Suites)
number, _ := e.executionResults.GetNextExecutionNumber(context.Background(), workflow.Name)
number, err := e.repository.GetNextExecutionNumber(context.Background(), workflow.Name)
if err != nil {
log.DefaultLogger.Errorw("failed to retrieve TestWorkflow execution number", "id", id, "error", err)
}

executionName := request.Name
if executionName == "" {
executionName = fmt.Sprintf("%s-%d", workflow.Name, number)
Expand Down

0 comments on commit 91636ba

Please sign in to comment.