Skip to content

Commit

Permalink
feat(queue): handle errors in ScheduledJobs and Size methods (#125)
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn committed Mar 29, 2024
1 parent c63c855 commit 375ceeb
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 39 deletions.
31 changes: 23 additions & 8 deletions examples/queue/file_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,13 @@ func main() {
}, jobQueue)
scheduler.Start(ctx)

if jobQueue.Size() == 0 {
jobQueueSize, err := jobQueue.Size()
if err != nil {
logger.Errorf("Failed to fetch job queue size: %s", err)
return
}

if jobQueueSize == 0 {
logger.Info("Scheduling new jobs")
jobDetail1 := quartz.NewJobDetail(&printJob{5}, quartz.NewJobKey("job1"))
if err := scheduler.ScheduleJob(jobDetail1, quartz.NewSimpleTrigger(5*time.Second)); err != nil {
Expand All @@ -58,7 +64,11 @@ func main() {

<-ctx.Done()

scheduledJobs := jobQueue.ScheduledJobs(nil)
scheduledJobs, err := jobQueue.ScheduledJobs(nil)
if err != nil {
logger.Errorf("Failed to fetch scheduled jobs: %s", err)
return
}
jobNames := make([]string, 0, len(scheduledJobs))
for _, job := range scheduledJobs {
jobNames = append(jobNames, job.JobDetail().JobKey().String())
Expand Down Expand Up @@ -289,14 +299,16 @@ func (jq *jobQueue) Remove(jobKey *quartz.JobKey) (quartz.ScheduledJob, error) {
}

// ScheduledJobs returns the slice of all scheduled jobs in the queue.
func (jq *jobQueue) ScheduledJobs(matchers []quartz.Matcher[quartz.ScheduledJob]) []quartz.ScheduledJob {
func (jq *jobQueue) ScheduledJobs(
matchers []quartz.Matcher[quartz.ScheduledJob],
) ([]quartz.ScheduledJob, error) {
jq.mtx.Lock()
defer jq.mtx.Unlock()
logger.Trace("ScheduledJobs")
var jobs []quartz.ScheduledJob
fileInfo, err := os.ReadDir(dataFolder)
if err != nil {
return jobs
return nil, err
}
for _, file := range fileInfo {
if !file.IsDir() {
Expand All @@ -309,7 +321,7 @@ func (jq *jobQueue) ScheduledJobs(matchers []quartz.Matcher[quartz.ScheduledJob]
}
}
}
return jobs
return jobs, nil
}

func isMatch(job quartz.ScheduledJob, matchers []quartz.Matcher[quartz.ScheduledJob]) bool {
Expand All @@ -323,12 +335,15 @@ func isMatch(job quartz.ScheduledJob, matchers []quartz.Matcher[quartz.Scheduled
}

// Size returns the size of the job queue.
func (jq *jobQueue) Size() int {
func (jq *jobQueue) Size() (int, error) {
jq.mtx.Lock()
defer jq.mtx.Unlock()
logger.Trace("Size")
files, _ := os.ReadDir(dataFolder)
return len(files)
files, err := os.ReadDir(dataFolder)
if err != nil {
return 0, err
}
return len(files), nil
}

// Clear clears the job queue.
Expand Down
16 changes: 9 additions & 7 deletions quartz/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,11 @@ type JobQueue interface {
Push(job ScheduledJob) error

// Pop removes and returns the next to run scheduled job from the queue.
// Implementations should return quartz.ErrQueueEmpty if the queue is empty.
Pop() (ScheduledJob, error)

// Head returns the first scheduled job without removing it from the queue.
// Implementations should return quartz.ErrQueueEmpty if the queue is empty.
Head() (ScheduledJob, error)

// Get returns the scheduled job with the specified key without removing it
Expand All @@ -75,10 +77,10 @@ type JobQueue interface {
// // ... WHERE group_name = m.Pattern
// }
// }
ScheduledJobs([]Matcher[ScheduledJob]) []ScheduledJob
ScheduledJobs([]Matcher[ScheduledJob]) ([]ScheduledJob, error)

// Size returns the size of the job queue.
Size() int
Size() (int, error)

// Clear clears the job queue.
Clear() error
Expand Down Expand Up @@ -212,11 +214,11 @@ func (jq *jobQueue) Remove(jobKey *JobKey) (ScheduledJob, error) {
// ScheduledJobs returns a slice of scheduled jobs in the queue.
// For a job to be returned, it must satisfy all of the specified matchers.
// Given an empty matchers it returns all scheduled jobs.
func (jq *jobQueue) ScheduledJobs(matchers []Matcher[ScheduledJob]) []ScheduledJob {
func (jq *jobQueue) ScheduledJobs(matchers []Matcher[ScheduledJob]) ([]ScheduledJob, error) {
jq.mtx.Lock()
defer jq.mtx.Unlock()
if len(matchers) == 0 {
return jq.scheduledJobs()
return jq.scheduledJobs(), nil
}
matchedJobs := make([]ScheduledJob, 0)
JobLoop:
Expand All @@ -229,7 +231,7 @@ JobLoop:
}
matchedJobs = append(matchedJobs, job)
}
return matchedJobs
return matchedJobs, nil
}

// scheduledJobs returns all scheduled jobs.
Expand All @@ -242,10 +244,10 @@ func (jq *jobQueue) scheduledJobs() []ScheduledJob {
}

// Size returns the size of the job queue.
func (jq *jobQueue) Size() int {
func (jq *jobQueue) Size() (int, error) {
jq.mtx.Lock()
defer jq.mtx.Unlock()
return len(jq.delegate)
return len(jq.delegate), nil
}

// Clear clears the job queue.
Expand Down
50 changes: 26 additions & 24 deletions quartz/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func (sched *StdScheduler) GetJobKeys(matchers ...Matcher[ScheduledJob]) []*JobK
sched.mtx.Lock()
defer sched.mtx.Unlock()

scheduledJobs := sched.queue.ScheduledJobs(matchers)
scheduledJobs, _ := sched.queue.ScheduledJobs(matchers)
keys := make([]*JobKey, 0, len(scheduledJobs))
for _, scheduled := range scheduledJobs {
keys = append(keys, scheduled.JobDetail().jobKey)
Expand Down Expand Up @@ -382,31 +382,33 @@ func (sched *StdScheduler) Stop() {

func (sched *StdScheduler) startExecutionLoop(ctx context.Context) {
defer sched.wg.Done()
maxTimerDuration := time.Duration(1<<63 - 1)
timer := time.NewTimer(maxTimerDuration)
for {
if sched.queue.Size() == 0 {
select {
case <-sched.interrupt:
logger.Trace("Interrupted in empty queue.")
case <-ctx.Done():
logger.Info("Exit the empty execution loop.")
return
}
} else {
timer := time.NewTimer(sched.calculateNextTick())
select {
case <-timer.C:
logger.Trace("Tick.")
sched.executeAndReschedule(ctx)
queueSize, err := sched.queue.Size()
switch {
case err != nil:
logger.Errorf("Failed to fetch queue size: %s", err)
timer.Reset(sched.opts.RetryInterval)
case queueSize == 0:
logger.Trace("Queue is empty.")
timer.Reset(maxTimerDuration)
default:
timer.Reset(sched.calculateNextTick())
}
select {
case <-timer.C:
logger.Trace("Tick.")
sched.executeAndReschedule(ctx)

case <-sched.interrupt:
logger.Trace("Interrupted waiting for next tick.")
timer.Stop()
case <-sched.interrupt:
logger.Trace("Interrupted waiting for next tick.")
timer.Stop()

case <-ctx.Done():
logger.Info("Exit the execution loop.")
timer.Stop()
return
}
case <-ctx.Done():
logger.Info("Exit the execution loop.")
timer.Stop()
return
}
}
}
Expand Down Expand Up @@ -439,7 +441,7 @@ func (sched *StdScheduler) calculateNextTick() time.Duration {
logger.Debug("Queue is empty")
return nextTickDuration
}
logger.Warnf("Failed to calculate next tick: %s", err)
logger.Errorf("Failed to calculate next tick: %s", err)
return sched.opts.RetryInterval
}
nextRunTime := scheduledJob.NextRunTime()
Expand Down

0 comments on commit 375ceeb

Please sign in to comment.