Skip to content

Commit

Permalink
fix(scheduler): rescheduling outdated jobs (#96)
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn authored Jan 16, 2024
1 parent b51533b commit dc17340
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 36 deletions.
48 changes: 25 additions & 23 deletions quartz/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,14 +336,13 @@ func (sched *StdScheduler) calculateNextTick() time.Duration {
logger.Warnf("Failed to calculate next tick for %s, err: %s",
scheduledJob.JobDetail().jobKey, err)
} else {
var nextTick int64
var nextTickDuration time.Duration
nextRunTime := scheduledJob.NextRunTime()
now := NowNano()
if nextRunTime > now {
nextTick = nextRunTime - now
nextTickDuration = time.Duration(nextRunTime - now)
}
nextTickDuration := time.Duration(nextTick)
logger.Debugf("Next tick for %s in %s.", scheduledJob.JobDetail().jobKey,
logger.Tracef("Next tick is for %s in %s.", scheduledJob.JobDetail().jobKey,
nextTickDuration)
return nextTickDuration
}
Expand All @@ -364,18 +363,15 @@ func (sched *StdScheduler) executeAndReschedule(ctx context.Context) {
logger.Errorf("Failed to fetch a job from the queue: %s", err)
return
}
// try rescheduling the job immediately
sched.rescheduleJob(ctx, scheduled)

now := NowNano()
// check if the job is due to be processed
if scheduled.NextRunTime() > now {
logger.Tracef("Job %s is not due to run yet.", scheduled.JobDetail().jobKey)
return
}
// validate the job
valid, nextRunTimeExtractor := sched.validateJob(scheduled)

// try rescheduling the job immediately
sched.rescheduleJob(ctx, scheduled, nextRunTimeExtractor)

// execute the job
if sched.jobIsUpToDate(scheduled, now) {
if valid {
logger.Debugf("Job %s is about to be executed.", scheduled.JobDetail().jobKey)
switch {
case sched.opts.BlockingExecution:
Expand All @@ -393,9 +389,6 @@ func (sched *StdScheduler) executeAndReschedule(ctx context.Context) {
executeWithRetries(ctx, scheduled.JobDetail())
}()
}
} else {
logger.Debugf("Job %s skipped as outdated %s.", scheduled.JobDetail().jobKey,
time.Duration(now-scheduled.NextRunTime()))
}
}

Expand Down Expand Up @@ -424,13 +417,26 @@ retryLoop:
}
}

func (sched *StdScheduler) rescheduleJob(ctx context.Context, job ScheduledJob) {
nextRunTime, err := job.Trigger().NextFireTime(job.NextRunTime())
func (sched *StdScheduler) validateJob(job ScheduledJob) (bool, func() (int64, error)) {
now := NowNano()
if job.NextRunTime() < now-sched.opts.OutdatedThreshold.Nanoseconds() {
duration := time.Duration(now - job.NextRunTime())
logger.Debugf("Job %s skipped as outdated %s.", job.JobDetail().jobKey, duration)
return false, func() (int64, error) { return job.Trigger().NextFireTime(now) }
} else if job.NextRunTime() > now {
logger.Debugf("Job %s is not due to run yet.", job.JobDetail().jobKey)
return false, func() (int64, error) { return job.NextRunTime(), nil }
}
return true, func() (int64, error) { return job.Trigger().NextFireTime(job.NextRunTime()) }
}

func (sched *StdScheduler) rescheduleJob(ctx context.Context, job ScheduledJob,
nextRunTimeExtractor func() (int64, error)) {
nextRunTime, err := nextRunTimeExtractor()
if err != nil {
logger.Infof("Job %s exited the execution loop: %s.", job.JobDetail().jobKey, err)
return
}

select {
case <-ctx.Done():
case sched.feeder <- &scheduledJob{
Expand All @@ -441,10 +447,6 @@ func (sched *StdScheduler) rescheduleJob(ctx context.Context, job ScheduledJob)
}
}

func (sched *StdScheduler) jobIsUpToDate(job ScheduledJob, now int64) bool {
return job.NextRunTime() > now-sched.opts.OutdatedThreshold.Nanoseconds()
}

func (sched *StdScheduler) startFeedReader(ctx context.Context) {
defer sched.wg.Done()
for {
Expand Down
1 change: 0 additions & 1 deletion quartz/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,6 @@ func TestSchedulerBlockingSemantics(t *testing.T) {
}
})
}

}

func TestSchedulerCancel(t *testing.T) {
Expand Down
16 changes: 4 additions & 12 deletions quartz/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ func indexes(search []string, target []string) ([]int, error) {
}
searchIndexes = append(searchIndexes, index)
}

return searchIndexes, nil
}

Expand All @@ -43,10 +42,8 @@ func sliceAtoi(sa []string) ([]int, error) {
if err != nil {
return si, err
}

si = append(si, i)
}

return si, nil
}

Expand All @@ -60,7 +57,6 @@ func fillRangeValues(from, to int) ([]int, error) {
for i, j := from, 0; i <= to; i, j = i+1, j+1 {
rangeValues[j] = i
}

return rangeValues, nil
}

Expand All @@ -74,7 +70,6 @@ func fillStepValues(from, step, max int) ([]int, error) {
for i, j := from, 0; i <= max; i, j = i+step, j+1 {
stepValues[j] = i
}

return stepValues, nil
}

Expand All @@ -83,26 +78,23 @@ func normalize(field string, dict []string) int {
if err == nil {
return i
}

return intVal(dict, field)
}

func inScope(i, min, max int) bool {
if i >= min && i <= max {
return true
}

return false
}

func intVal(target []string, search string) int {
uSearch := strings.ToUpper(search)
for i, v := range target {
if v == uSearch {
func intVal(source []string, target string) int {
upperCaseTarget := strings.ToUpper(target)
for i, v := range source {
if v == upperCaseTarget {
return i
}
}

return -1 // TODO: return error
}

Expand Down

0 comments on commit dc17340

Please sign in to comment.