Skip to content

Commit

Permalink
Merge branch 'queue-retry' into queue-dev
Browse files Browse the repository at this point in the history
  • Loading branch information
wdbaruni committed May 30, 2024
2 parents d5af090 + f1bdea0 commit f294272
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 3 deletions.
1 change: 1 addition & 0 deletions pkg/orchestrator/planner/state_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func (s *StateUpdater) Process(ctx context.Context, plan *models.Plan) error {
// and the job state is not being updated, there is nothing to do.
if len(plan.NewExecutions) == 0 &&
len(plan.UpdatedExecutions) == 0 &&
len(plan.NewEvaluations) == 0 &&
plan.DesiredJobState.IsUndefined() {
return nil
}
Expand Down
31 changes: 29 additions & 2 deletions pkg/orchestrator/planner/state_updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ import (
"errors"
"testing"

"github.com/stretchr/testify/suite"
"go.uber.org/mock/gomock"

"github.com/bacalhau-project/bacalhau/pkg/jobstore"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/test/mock"
"github.com/stretchr/testify/suite"
"go.uber.org/mock/gomock"
)

type StateUpdaterSuite struct {
Expand Down Expand Up @@ -86,6 +87,29 @@ func (suite *StateUpdaterSuite) TestStateUpdater_Process_UpdateJobState_Error()
suite.Error(suite.stateUpdater.Process(suite.ctx, plan))
}

func (suite *StateUpdaterSuite) TestStateUpdater_Process_CreateEvaluations_Success() {
plan := mock.Plan()
evaluation1, evaluation2 := mockCreateEvaluations(plan)

suite.mockStore.EXPECT().BeginTx(suite.ctx).Return(suite.mockTxContext, nil).Times(1)
suite.mockStore.EXPECT().CreateEvaluation(suite.mockTxContext, *evaluation1).Times(1)
suite.mockStore.EXPECT().CreateEvaluation(suite.mockTxContext, *evaluation2).Times(1)
suite.mockTxContext.EXPECT().Commit()

suite.NoError(suite.stateUpdater.Process(suite.ctx, plan))
}

func (suite *StateUpdaterSuite) TestStateUpdater_Process_CreateEvaluations_Error() {
plan := mock.Plan()
evaluation1, _ := mockCreateEvaluations(plan)

suite.mockStore.EXPECT().BeginTx(suite.ctx).Return(suite.mockTxContext, nil).Times(1)
suite.mockStore.EXPECT().CreateEvaluation(suite.mockTxContext, *evaluation1).Return(errors.New("create error")).Times(1)
suite.mockTxContext.EXPECT().Rollback()

suite.Error(suite.stateUpdater.Process(suite.ctx, plan))
}

func (suite *StateUpdaterSuite) TestStateUpdater_Process_NoOp() {
plan := mock.Plan()
suite.NoError(suite.stateUpdater.Process(suite.ctx, plan))
Expand All @@ -96,6 +120,7 @@ func (suite *StateUpdaterSuite) TestStateUpdater_Process_MultiOp() {
execution1, execution2 := mockCreateExecutions(plan)

update1, update2 := mockUpdateExecutions(plan)
evaluation1, evaluation2 := mockCreateEvaluations(plan)
plan.DesiredJobState = models.JobStateTypeCompleted

suite.mockStore.EXPECT().BeginTx(suite.ctx).Return(suite.mockTxContext, nil).Times(1)
Expand All @@ -104,6 +129,8 @@ func (suite *StateUpdaterSuite) TestStateUpdater_Process_MultiOp() {
suite.mockStore.EXPECT().UpdateExecution(suite.mockTxContext, NewUpdateExecutionMatcherFromPlanUpdate(suite.T(), update1)).Times(1)
suite.mockStore.EXPECT().UpdateExecution(suite.mockTxContext, NewUpdateExecutionMatcherFromPlanUpdate(suite.T(), update2)).Times(1)
suite.mockStore.EXPECT().UpdateJobState(suite.mockTxContext, NewUpdateJobMatcherFromPlanUpdate(suite.T(), plan)).Times(1)
suite.mockStore.EXPECT().CreateEvaluation(suite.mockTxContext, *evaluation1).Times(1)
suite.mockStore.EXPECT().CreateEvaluation(suite.mockTxContext, *evaluation2).Times(1)
suite.mockTxContext.EXPECT().Commit()

suite.NoError(suite.stateUpdater.Process(suite.ctx, plan))
Expand Down
12 changes: 11 additions & 1 deletion pkg/orchestrator/planner/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ import (
"reflect"
"testing"

"github.com/stretchr/testify/assert"

"github.com/bacalhau-project/bacalhau/pkg/compute"
"github.com/bacalhau-project/bacalhau/pkg/jobstore"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/test/mock"
"github.com/stretchr/testify/assert"
)

func mockCreateExecutions(plan *models.Plan) (*models.Execution, *models.Execution) {
Expand Down Expand Up @@ -41,6 +42,15 @@ func mockUpdateExecutions(plan *models.Plan) (*models.PlanExecutionDesiredUpdate
return update1, update2
}

func mockCreateEvaluations(plan *models.Plan) (*models.Evaluation, *models.Evaluation) {
evaluation1 := mock.EvalForJob(plan.Job)
evaluation2 := mock.EvalForJob(plan.Job)
evaluation1.ID = "NewEval1"
evaluation2.ID = "NewEval2"
plan.NewEvaluations = []*models.Evaluation{evaluation1, evaluation2}
return evaluation1, evaluation2
}

// UpdateExecutionMatcher is a matcher for the UpdateExecutionState method of the JobStore interface.
type UpdateExecutionMatcher struct {
t *testing.T
Expand Down

0 comments on commit f294272

Please sign in to comment.