From 6bcf070e431a7f5cb29a3aaac0e773a3598e1493 Mon Sep 17 00:00:00 2001 From: Eugene R Date: Fri, 2 Feb 2024 22:43:40 +0200 Subject: [PATCH] feat: add the ability to pause and resume jobs (#102) --- README.md | 7 +++ quartz/job_detail.go | 13 ++++-- quartz/scheduler.go | 97 +++++++++++++++++++++++++++++++++++----- quartz/scheduler_test.go | 76 ++++++++++++++++++++++++++++--- 4 files changed, 173 insertions(+), 20 deletions(-) diff --git a/README.md b/README.md index c70606a..a14d965 100644 --- a/README.md +++ b/README.md @@ -38,6 +38,13 @@ type Scheduler interface { // scheduler's execution queue. DeleteJob(jobKey *JobKey) error + // PauseJob suspends the job with the specified key from being + // executed by the scheduler. + PauseJob(jobKey *JobKey) error + + // ResumeJob restarts the suspended job with the specified key. + ResumeJob(jobKey *JobKey) error + // Clear removes all of the scheduled jobs. Clear() error diff --git a/quartz/job_detail.go b/quartz/job_detail.go index ca0a796..bbf005f 100644 --- a/quartz/job_detail.go +++ b/quartz/job_detail.go @@ -1,6 +1,8 @@ package quartz -import "time" +import ( + "time" +) // JobDetailOptions represents additional JobDetail properties. type JobDetailOptions struct { @@ -13,19 +15,24 @@ type JobDetailOptions struct { // Default: 1 second. RetryInterval time.Duration - // Replace specifies whether the job should replace an existing job + // Replace indicates whether the job should replace an existing job // with the same key. // Default: false. Replace bool + + // Suspended indicates whether the job is paused. + // Default: false. + Suspended bool } // NewDefaultJobDetailOptions returns a new instance of JobDetailOptions // with the default values. func NewDefaultJobDetailOptions() *JobDetailOptions { - return &JobDetailOptions{ + return &JobDetailOptions{ // using explicit default values for visibility MaxRetries: 0, RetryInterval: time.Second, Replace: false, + Suspended: false, } } diff --git a/quartz/scheduler.go b/quartz/scheduler.go index 394ec0e..b9bae0b 100644 --- a/quartz/scheduler.go +++ b/quartz/scheduler.go @@ -3,6 +3,7 @@ package quartz import ( "context" "fmt" + "math" "sync" "time" @@ -42,6 +43,13 @@ type Scheduler interface { // scheduler's execution queue. DeleteJob(jobKey *JobKey) error + // PauseJob suspends the job with the specified key from being + // executed by the scheduler. + PauseJob(jobKey *JobKey) error + + // ResumeJob restarts the suspended job with the specified key. + ResumeJob(jobKey *JobKey) error + // Clear removes all of the scheduled jobs. Clear() error @@ -147,11 +155,14 @@ func (sched *StdScheduler) ScheduleJob( if trigger == nil { return illegalArgumentError("trigger is nil") } - nextRunTime, err := trigger.NextFireTime(NowNano()) - if err != nil { - return err + nextRunTime := int64(math.MaxInt64) + var err error + if !jobDetail.opts.Suspended { + nextRunTime, err = trigger.NextFireTime(NowNano()) + if err != nil { + return err + } } - toSchedule := &scheduledJob{ job: jobDetail, trigger: trigger, @@ -226,13 +237,7 @@ func (sched *StdScheduler) GetScheduledJob(jobKey *JobKey) (ScheduledJob, error) if jobKey == nil { return nil, illegalArgumentError("jobKey is nil") } - scheduledJobs := sched.queue.ScheduledJobs() - for _, scheduled := range scheduledJobs { - if scheduled.JobDetail().jobKey.Equals(jobKey) { - return scheduled, nil - } - } - return nil, jobNotFoundError(fmt.Sprintf("for key %s", jobKey)) + return sched.queue.Get(jobKey) } // DeleteJob removes the Job with the specified key if present. @@ -250,6 +255,73 @@ func (sched *StdScheduler) DeleteJob(jobKey *JobKey) error { return err } +// PauseJob suspends the job with the specified key from being +// executed by the scheduler. +func (sched *StdScheduler) PauseJob(jobKey *JobKey) error { + if jobKey == nil { + return illegalArgumentError("jobKey is nil") + } + job, err := sched.queue.Get(jobKey) + if err != nil { + return err + } + if job.JobDetail().opts.Suspended { + return illegalStateError(fmt.Sprintf("job %s is suspended", jobKey)) + } + job, err = sched.queue.Remove(jobKey) + if err == nil { + job.JobDetail().opts.Suspended = true + paused := &scheduledJob{ + job: job.JobDetail(), + trigger: job.Trigger(), + priority: int64(math.MaxInt64), + } + err = sched.queue.Push(paused) + if err == nil { + logger.Debugf("Successfully paused job %s.", jobKey) + if sched.IsStarted() { + sched.reset() + } + } + } + return err +} + +// ResumeJob restarts the suspended job with the specified key. +func (sched *StdScheduler) ResumeJob(jobKey *JobKey) error { + if jobKey == nil { + return illegalArgumentError("jobKey is nil") + } + job, err := sched.queue.Get(jobKey) + if err != nil { + return err + } + if !job.JobDetail().opts.Suspended { + return illegalStateError(fmt.Sprintf("job %s is active", jobKey)) + } + job, err = sched.queue.Remove(jobKey) + if err == nil { + job.JobDetail().opts.Suspended = false + nextRunTime, err := job.Trigger().NextFireTime(NowNano()) + if err != nil { + return err + } + resumed := &scheduledJob{ + job: job.JobDetail(), + trigger: job.Trigger(), + priority: nextRunTime, + } + err = sched.queue.Push(resumed) + if err == nil { + logger.Debugf("Successfully resumed job %s.", jobKey) + if sched.IsStarted() { + sched.reset() + } + } + } + return err +} + // Clear removes all of the scheduled jobs. func (sched *StdScheduler) Clear() error { // reset the job queue @@ -418,6 +490,9 @@ retryLoop: } func (sched *StdScheduler) validateJob(job ScheduledJob) (bool, func() (int64, error)) { + if job.JobDetail().opts.Suspended { + return false, func() (int64, error) { return math.MaxInt64, nil } + } now := NowNano() if job.NextRunTime() < now-sched.opts.OutdatedThreshold.Nanoseconds() { duration := time.Duration(now - job.NextRunTime()) diff --git a/quartz/scheduler_test.go b/quartz/scheduler_test.go index ae90c2a..7f85d26 100644 --- a/quartz/scheduler_test.go +++ b/quartz/scheduler_test.go @@ -88,7 +88,7 @@ func TestScheduler(t *testing.T) { assert.Equal(t, errCurlJob.JobStatus(), job.StatusFailure) } -func TestSchedulerBlockingSemantics(t *testing.T) { +func TestScheduler_BlockingSemantics(t *testing.T) { for _, tt := range []string{"Blocking", "NonBlocking", "WorkerSmall", "WorkerLarge"} { t.Run(tt, func(t *testing.T) { var opts quartz.StdSchedulerOptions @@ -194,7 +194,7 @@ func TestSchedulerBlockingSemantics(t *testing.T) { } } -func TestSchedulerCancel(t *testing.T) { +func TestScheduler_Cancel(t *testing.T) { hourJob := func(ctx context.Context) (bool, error) { timer := time.NewTimer(time.Hour) defer timer.Stop() @@ -286,7 +286,7 @@ func TestSchedulerCancel(t *testing.T) { } } -func TestSchedulerJobWithRetries(t *testing.T) { +func TestScheduler_JobWithRetries(t *testing.T) { var n int32 funcRetryJob := job.NewFunctionJob(func(_ context.Context) (string, error) { atomic.AddInt32(&n, 1) @@ -328,7 +328,7 @@ func TestSchedulerJobWithRetries(t *testing.T) { sched.Stop() } -func TestSchedulerJobWithRetriesCtxDone(t *testing.T) { +func TestScheduler_JobWithRetriesCtxDone(t *testing.T) { var n int32 funcRetryJob := job.NewFunctionJob(func(_ context.Context) (string, error) { atomic.AddInt32(&n, 1) @@ -372,7 +372,63 @@ func TestSchedulerJobWithRetriesCtxDone(t *testing.T) { sched.Stop() } -func TestSchedulerArgumentValidationErrors(t *testing.T) { +func TestScheduler_PauseResume(t *testing.T) { + var n int32 + funcJob := job.NewFunctionJob(func(_ context.Context) (string, error) { + atomic.AddInt32(&n, 1) + return "ok", nil + }) + sched := quartz.NewStdScheduler() + jobDetail := quartz.NewJobDetail(funcJob, quartz.NewJobKey("funcJob")) + err := sched.ScheduleJob(jobDetail, quartz.NewSimpleTrigger(10*time.Millisecond)) + assert.Equal(t, err, nil) + + assert.Equal(t, int(atomic.LoadInt32(&n)), 0) + sched.Start(context.Background()) + + time.Sleep(55 * time.Millisecond) + assert.Equal(t, int(atomic.LoadInt32(&n)), 5) + + err = sched.PauseJob(jobDetail.JobKey()) + assert.Equal(t, err, nil) + + time.Sleep(55 * time.Millisecond) + assert.Equal(t, int(atomic.LoadInt32(&n)), 5) + + err = sched.ResumeJob(jobDetail.JobKey()) + assert.Equal(t, err, nil) + + time.Sleep(55 * time.Millisecond) + assert.Equal(t, int(atomic.LoadInt32(&n)), 10) + + sched.Stop() +} + +func TestScheduler_PauseResumeErrors(t *testing.T) { + funcJob := job.NewFunctionJob(func(_ context.Context) (string, error) { + return "ok", nil + }) + sched := quartz.NewStdScheduler() + jobDetail := quartz.NewJobDetail(funcJob, quartz.NewJobKey("funcJob")) + err := sched.ScheduleJob(jobDetail, quartz.NewSimpleTrigger(10*time.Millisecond)) + assert.Equal(t, err, nil) + + err = sched.ResumeJob(jobDetail.JobKey()) + assert.NotEqual(t, err, nil) + err = sched.ResumeJob(quartz.NewJobKey("funcJob2")) + assert.NotEqual(t, err, nil) + + err = sched.PauseJob(jobDetail.JobKey()) + assert.Equal(t, err, nil) + err = sched.PauseJob(jobDetail.JobKey()) + assert.NotEqual(t, err, nil) + err = sched.PauseJob(quartz.NewJobKey("funcJob2")) + assert.NotEqual(t, err, nil) + + sched.Stop() +} + +func TestScheduler_ArgumentValidationErrors(t *testing.T) { sched := quartz.NewStdScheduler() job := job.NewShellJob("ls -la") trigger := quartz.NewRunOnceTrigger(time.Millisecond) @@ -393,11 +449,19 @@ func TestSchedulerArgumentValidationErrors(t *testing.T) { err = sched.DeleteJob(nil) assert.ErrorContains(t, err, "jobKey is nil") + err = sched.PauseJob(nil) + assert.ErrorContains(t, err, "jobKey is nil") + + err = sched.ResumeJob(nil) + assert.ErrorContains(t, err, "jobKey is nil") + _, err = sched.GetScheduledJob(nil) assert.ErrorContains(t, err, "jobKey is nil") + + sched.Stop() } -func TestSchedulerStartStop(t *testing.T) { +func TestScheduler_StartStop(t *testing.T) { sched := quartz.NewStdScheduler() ctx := context.Background() sched.Start(ctx)