Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow cancel of queued job #43

Merged
merged 5 commits into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions prunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,13 @@ func (j *PipelineJob) deinitScheduler() {
j.taskRunner = nil
}

func (j *PipelineJob) markAsCanceled() {
j.Canceled = true
for i := range j.Tasks {
j.Tasks[i].Canceled = true
}
}

// jobTask is a single task invocation inside the PipelineJob
type jobTask struct {
definition.TaskDef
Expand Down Expand Up @@ -185,7 +192,6 @@ var errNoQueue = errors.New("concurrency exceeded and queueing disabled for pipe
var errQueueFull = errors.New("concurrency exceeded and queue limit reached for pipeline")
var ErrJobNotFound = errors.New("job not found")
var errJobAlreadyCompleted = errors.New("job is already completed")
var errJobNotStarted = errors.New("job is not started")
var ErrShuttingDown = errors.New("runner is shutting down")

func (r *PipelineRunner) ScheduleAsync(pipeline string, opts ScheduleOpts) (*PipelineJob, error) {
Expand Down Expand Up @@ -937,7 +943,15 @@ func (r *PipelineRunner) cancelJobInternal(id uuid.UUID) error {
}

if job.Start == nil {
return errJobNotStarted
job.markAsCanceled()

log.
WithField("component", "runner").
WithField("pipeline", job.Pipeline).
WithField("jobID", job.ID).
Debugf("Marked job as canceled, since it was not started")

return nil
}

log.
Expand Down
78 changes: 78 additions & 0 deletions prunner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,72 @@ func TestPipelineRunner_CancelJob_WithRunningJob(t *testing.T) {
}
}

func TestPipelineRunner_CancelJob_WithQueuedJob(t *testing.T) {
var defs = &definition.PipelinesDef{
Pipelines: map[string]definition.PipelineDef{
"long_running": {
// Concurrency of 1 is the default for a single concurrent execution
Concurrency: 1,
QueueLimit: nil,
Tasks: map[string]definition.TaskDef{
"sleep": {
Script: []string{"# that takes long"},
},
},
SourcePath: "fixtures",
},
},
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var mx sync.Mutex
var startedJobsIDs []string
var wait = make(chan struct{})

pRunner, err := NewPipelineRunner(ctx, defs, func(j *PipelineJob) taskctl.Runner {
return &test.MockRunner{
OnRun: func(t *task.Task) error {
jobID := t.Variables.Get(taskctl.JobIDVariableName)
mx.Lock()
startedJobsIDs = append(startedJobsIDs, jobID.(string))
mx.Unlock()

// Wait until the job should proceed (wait channel is closed)
<-wait

return nil
},
}
}, nil, test.NewMockOutputStore())
require.NoError(t, err)

job1, err := pRunner.ScheduleAsync("long_running", ScheduleOpts{})
require.NoError(t, err)

job2, err := pRunner.ScheduleAsync("long_running", ScheduleOpts{})
require.NoError(t, err)

waitForStartedJobTask(t, pRunner, job1.ID, "sleep")

// Make sure the queued job can be canceled
err = pRunner.CancelJob(job2.ID)
require.NoError(t, err)

// Close the channel to let the first job proceed
close(wait)

waitForCompletedJob(t, pRunner, job1.ID)
waitForCanceledJob(t, pRunner, job2.ID)

assert.Equal(t, true, job2.Tasks.ByName("sleep").Canceled, "job task was marked as canceled")

mx.Lock()
defer mx.Unlock()
assert.Equal(t, []string{job1.ID.String()}, startedJobsIDs, "only job1 was started")
}

func TestPipelineRunner_CancelJob_WithStoppedJob_ShouldNotThrowFatalError(t *testing.T) {
var defs = &definition.PipelinesDef{
Pipelines: map[string]definition.PipelineDef{
Expand Down Expand Up @@ -456,6 +522,18 @@ func waitForCompletedJob(t *testing.T, pRunner *PipelineRunner, jobID uuid.UUID)
}, 1*time.Millisecond, "job completed")
}

func waitForCanceledJob(t *testing.T, pRunner *PipelineRunner, jobID uuid.UUID) {
t.Helper()

test.WaitForCondition(t, func() bool {
var canceled bool
_ = pRunner.ReadJob(jobID, func(j *PipelineJob) {
canceled = j.Canceled
})
return canceled
}, 1*time.Millisecond, "job canceled")
}

func TestPipelineRunner_ShouldRemoveOldJobsWhenRetentionPeriodIsConfigured(t *testing.T) {
var defs = &definition.PipelinesDef{
Pipelines: map[string]definition.PipelineDef{
Expand Down
Loading