Skip to content

Commit

Permalink
feat: add the ability to pause and resume jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn committed Feb 2, 2024
1 parent 7a5e12f commit 028b631
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 20 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ type Scheduler interface {
// scheduler's execution queue.
DeleteJob(jobKey *JobKey) error

// PauseJob suspends the job with the specified key from being
// executed by the scheduler.
PauseJob(jobKey *JobKey) error

// ResumeJob restarts the suspended job with the specified key.
ResumeJob(jobKey *JobKey) error

// Clear removes all of the scheduled jobs.
Clear() error

Expand Down
13 changes: 10 additions & 3 deletions quartz/job_detail.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package quartz

import "time"
import (
"time"
)

// JobDetailOptions represents additional JobDetail properties.
type JobDetailOptions struct {
Expand All @@ -13,19 +15,24 @@ type JobDetailOptions struct {
// Default: 1 second.
RetryInterval time.Duration

// Replace specifies whether the job should replace an existing job
// Replace indicates whether the job should replace an existing job
// with the same key.
// Default: false.
Replace bool

// Suspended indicates whether the job is paused.
// Default: false.
Suspended bool
}

// NewDefaultJobDetailOptions returns a new instance of JobDetailOptions
// with the default values.
func NewDefaultJobDetailOptions() *JobDetailOptions {
return &JobDetailOptions{
return &JobDetailOptions{ // using explicit default values for visibility
MaxRetries: 0,
RetryInterval: time.Second,
Replace: false,
Suspended: false,
}
}

Expand Down
97 changes: 86 additions & 11 deletions quartz/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package quartz
import (
"context"
"fmt"
"math"
"sync"
"time"

Expand Down Expand Up @@ -42,6 +43,13 @@ type Scheduler interface {
// scheduler's execution queue.
DeleteJob(jobKey *JobKey) error

// PauseJob suspends the job with the specified key from being
// executed by the scheduler.
PauseJob(jobKey *JobKey) error

// ResumeJob restarts the suspended job with the specified key.
ResumeJob(jobKey *JobKey) error

// Clear removes all of the scheduled jobs.
Clear() error

Expand Down Expand Up @@ -147,11 +155,14 @@ func (sched *StdScheduler) ScheduleJob(
if trigger == nil {
return illegalArgumentError("trigger is nil")
}
nextRunTime, err := trigger.NextFireTime(NowNano())
if err != nil {
return err
nextRunTime := int64(math.MaxInt64)
var err error
if !jobDetail.opts.Suspended {
nextRunTime, err = trigger.NextFireTime(NowNano())
if err != nil {
return err
}
}

toSchedule := &scheduledJob{
job: jobDetail,
trigger: trigger,
Expand Down Expand Up @@ -226,13 +237,7 @@ func (sched *StdScheduler) GetScheduledJob(jobKey *JobKey) (ScheduledJob, error)
if jobKey == nil {
return nil, illegalArgumentError("jobKey is nil")
}
scheduledJobs := sched.queue.ScheduledJobs()
for _, scheduled := range scheduledJobs {
if scheduled.JobDetail().jobKey.Equals(jobKey) {
return scheduled, nil
}
}
return nil, jobNotFoundError(fmt.Sprintf("for key %s", jobKey))
return sched.queue.Get(jobKey)
}

// DeleteJob removes the Job with the specified key if present.
Expand All @@ -250,6 +255,73 @@ func (sched *StdScheduler) DeleteJob(jobKey *JobKey) error {
return err
}

// PauseJob suspends the job with the specified key from being
// executed by the scheduler.
func (sched *StdScheduler) PauseJob(jobKey *JobKey) error {
if jobKey == nil {
return illegalArgumentError("jobKey is nil")
}
job, err := sched.queue.Get(jobKey)
if err != nil {
return err
}
if job.JobDetail().opts.Suspended {
return illegalStateError(fmt.Sprintf("job %s is suspended", jobKey))
}
job, err = sched.queue.Remove(jobKey)
if err == nil {
job.JobDetail().opts.Suspended = true
paused := &scheduledJob{
job: job.JobDetail(),
trigger: job.Trigger(),
priority: int64(math.MaxInt64),
}
err = sched.queue.Push(paused)
if err == nil {
logger.Debugf("Successfully paused job %s.", jobKey)
if sched.IsStarted() {
sched.reset()
}
}
}
return err
}

// ResumeJob restarts the suspended job with the specified key.
func (sched *StdScheduler) ResumeJob(jobKey *JobKey) error {
if jobKey == nil {
return illegalArgumentError("jobKey is nil")
}
job, err := sched.queue.Get(jobKey)
if err != nil {
return err
}
if !job.JobDetail().opts.Suspended {
return illegalStateError(fmt.Sprintf("job %s is active", jobKey))
}
job, err = sched.queue.Remove(jobKey)
if err == nil {
job.JobDetail().opts.Suspended = false
nextRunTime, err := job.Trigger().NextFireTime(NowNano())
if err != nil {
return err
}
resumed := &scheduledJob{
job: job.JobDetail(),
trigger: job.Trigger(),
priority: nextRunTime,
}
err = sched.queue.Push(resumed)
if err == nil {
logger.Debugf("Successfully resumed job %s.", jobKey)
if sched.IsStarted() {
sched.reset()
}
}
}
return err
}

// Clear removes all of the scheduled jobs.
func (sched *StdScheduler) Clear() error {
// reset the job queue
Expand Down Expand Up @@ -418,6 +490,9 @@ retryLoop:
}

func (sched *StdScheduler) validateJob(job ScheduledJob) (bool, func() (int64, error)) {
if job.JobDetail().opts.Suspended {
return false, func() (int64, error) { return math.MaxInt64, nil }
}
now := NowNano()
if job.NextRunTime() < now-sched.opts.OutdatedThreshold.Nanoseconds() {
duration := time.Duration(now - job.NextRunTime())
Expand Down
76 changes: 70 additions & 6 deletions quartz/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestScheduler(t *testing.T) {
assert.Equal(t, errCurlJob.JobStatus(), job.StatusFailure)
}

func TestSchedulerBlockingSemantics(t *testing.T) {
func TestScheduler_BlockingSemantics(t *testing.T) {
for _, tt := range []string{"Blocking", "NonBlocking", "WorkerSmall", "WorkerLarge"} {
t.Run(tt, func(t *testing.T) {
var opts quartz.StdSchedulerOptions
Expand Down Expand Up @@ -194,7 +194,7 @@ func TestSchedulerBlockingSemantics(t *testing.T) {
}
}

func TestSchedulerCancel(t *testing.T) {
func TestScheduler_Cancel(t *testing.T) {
hourJob := func(ctx context.Context) (bool, error) {
timer := time.NewTimer(time.Hour)
defer timer.Stop()
Expand Down Expand Up @@ -286,7 +286,7 @@ func TestSchedulerCancel(t *testing.T) {
}
}

func TestSchedulerJobWithRetries(t *testing.T) {
func TestScheduler_JobWithRetries(t *testing.T) {
var n int32
funcRetryJob := job.NewFunctionJob(func(_ context.Context) (string, error) {
atomic.AddInt32(&n, 1)
Expand Down Expand Up @@ -328,7 +328,7 @@ func TestSchedulerJobWithRetries(t *testing.T) {
sched.Stop()
}

func TestSchedulerJobWithRetriesCtxDone(t *testing.T) {
func TestScheduler_JobWithRetriesCtxDone(t *testing.T) {
var n int32
funcRetryJob := job.NewFunctionJob(func(_ context.Context) (string, error) {
atomic.AddInt32(&n, 1)
Expand Down Expand Up @@ -372,7 +372,63 @@ func TestSchedulerJobWithRetriesCtxDone(t *testing.T) {
sched.Stop()
}

func TestSchedulerArgumentValidationErrors(t *testing.T) {
func TestScheduler_PauseResume(t *testing.T) {
var n int32
funcJob := job.NewFunctionJob(func(_ context.Context) (string, error) {
atomic.AddInt32(&n, 1)
return "ok", nil
})
sched := quartz.NewStdScheduler()
jobDetail := quartz.NewJobDetail(funcJob, quartz.NewJobKey("funcJob"))
err := sched.ScheduleJob(jobDetail, quartz.NewSimpleTrigger(10*time.Millisecond))
assert.Equal(t, err, nil)

assert.Equal(t, int(atomic.LoadInt32(&n)), 0)
sched.Start(context.Background())

time.Sleep(55 * time.Millisecond)
assert.Equal(t, int(atomic.LoadInt32(&n)), 5)

err = sched.PauseJob(jobDetail.JobKey())
assert.Equal(t, err, nil)

time.Sleep(55 * time.Millisecond)
assert.Equal(t, int(atomic.LoadInt32(&n)), 5)

err = sched.ResumeJob(jobDetail.JobKey())
assert.Equal(t, err, nil)

time.Sleep(55 * time.Millisecond)
assert.Equal(t, int(atomic.LoadInt32(&n)), 10)

sched.Stop()
}

func TestScheduler_PauseResumeErrors(t *testing.T) {
funcJob := job.NewFunctionJob(func(_ context.Context) (string, error) {
return "ok", nil
})
sched := quartz.NewStdScheduler()
jobDetail := quartz.NewJobDetail(funcJob, quartz.NewJobKey("funcJob"))
err := sched.ScheduleJob(jobDetail, quartz.NewSimpleTrigger(10*time.Millisecond))
assert.Equal(t, err, nil)

err = sched.ResumeJob(jobDetail.JobKey())
assert.NotEqual(t, err, nil)
err = sched.ResumeJob(quartz.NewJobKey("funcJob2"))
assert.NotEqual(t, err, nil)

err = sched.PauseJob(jobDetail.JobKey())
assert.Equal(t, err, nil)
err = sched.PauseJob(jobDetail.JobKey())
assert.NotEqual(t, err, nil)
err = sched.PauseJob(quartz.NewJobKey("funcJob2"))
assert.NotEqual(t, err, nil)

sched.Stop()
}

func TestScheduler_ArgumentValidationErrors(t *testing.T) {
sched := quartz.NewStdScheduler()
job := job.NewShellJob("ls -la")
trigger := quartz.NewRunOnceTrigger(time.Millisecond)
Expand All @@ -393,11 +449,19 @@ func TestSchedulerArgumentValidationErrors(t *testing.T) {
err = sched.DeleteJob(nil)
assert.ErrorContains(t, err, "jobKey is nil")

err = sched.PauseJob(nil)
assert.ErrorContains(t, err, "jobKey is nil")

err = sched.ResumeJob(nil)
assert.ErrorContains(t, err, "jobKey is nil")

_, err = sched.GetScheduledJob(nil)
assert.ErrorContains(t, err, "jobKey is nil")

sched.Stop()
}

func TestSchedulerStartStop(t *testing.T) {
func TestScheduler_StartStop(t *testing.T) {
sched := quartz.NewStdScheduler()
ctx := context.Background()
sched.Start(ctx)
Expand Down

0 comments on commit 028b631

Please sign in to comment.