Skip to content

Commit

Permalink
feat: add Get method to the JobQueue interface
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn committed Feb 2, 2024
1 parent ce9e1c4 commit 79c9f3a
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 3 deletions.
26 changes: 26 additions & 0 deletions examples/queue/file_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,32 @@ func findHead() (quartz.ScheduledJob, error) {
return job, nil
}

// Get returns the scheduled job with the specified key without removing it
// from the queue.
func (jq *jobQueue) Get(jobKey *quartz.JobKey) (quartz.ScheduledJob, error) {
jq.mtx.Lock()
defer jq.mtx.Unlock()
logger.Trace("Get")
fileInfo, err := os.ReadDir(dataFolder)
if err != nil {
return nil, err
}
for _, file := range fileInfo {
if !file.IsDir() {
data, err := os.ReadFile(fmt.Sprintf("%s/%s", dataFolder, file.Name()))
if err == nil {
job, err := unmarshal(data)
if err == nil {
if jobKey.Equals(job.JobDetail().JobKey()) {
return job, nil
}
}
}
}
}
return nil, errors.New("no jobs found")
}

// Remove removes and returns the scheduled job with the specified key.
func (jq *jobQueue) Remove(jobKey *quartz.JobKey) (quartz.ScheduledJob, error) {
jq.mtx.Lock()
Expand Down
7 changes: 7 additions & 0 deletions quartz/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
// Errors
var (
ErrIllegalArgument = errors.New("illegal argument")
ErrIllegalState = errors.New("illegal state")
ErrCronParse = errors.New("parse cron expression")
ErrJobNotFound = errors.New("job not found")
)
Expand All @@ -18,6 +19,12 @@ func illegalArgumentError(message string) error {
return fmt.Errorf("%w: %s", ErrIllegalArgument, message)
}

// illegalStateError returns an illegal state error with a custom
// error message, which unwraps to ErrIllegalState.
func illegalStateError(message string) error {
return fmt.Errorf("%w: %s", ErrIllegalState, message)
}

// cronParseError returns a cron parse error with a custom error message,
// which unwraps to ErrCronParse.
func cronParseError(message string) error {
Expand Down
9 changes: 9 additions & 0 deletions quartz/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@ func TestIllegalArgumentError(t *testing.T) {
assert.Equal(t, err.Error(), fmt.Sprintf("%s: %s", ErrIllegalArgument, message))
}

func TestIllegalStateError(t *testing.T) {
message := "job already exists"
err := illegalStateError(message)
if !errors.Is(err, ErrIllegalState) {
t.Fatal("error must match ErrIllegalState")
}
assert.Equal(t, err.Error(), fmt.Sprintf("%s: %s", ErrIllegalState, message))
}

func TestCronParseError(t *testing.T) {
message := "invalid field"
err := cronParseError(message)
Expand Down
23 changes: 20 additions & 3 deletions quartz/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ type JobQueue interface {
// Head returns the first scheduled job without removing it from the queue.
Head() (ScheduledJob, error)

// Get returns the scheduled job with the specified key without removing it
// from the queue.
Get(jobKey *JobKey) (ScheduledJob, error)

// Remove removes and returns the scheduled job with the specified key.
Remove(jobKey *JobKey) (ScheduledJob, error)

Expand Down Expand Up @@ -136,8 +140,8 @@ func (jq *jobQueue) Push(job ScheduledJob) error {
heap.Remove(&jq.delegate, i)
break
}
return fmt.Errorf("job with the key %s already exists",
job.JobDetail().jobKey)
return illegalStateError(fmt.Sprintf("job with the key %s already exists",
job.JobDetail().jobKey))
}
}
heap.Push(&jq.delegate, job)
Expand All @@ -164,6 +168,19 @@ func (jq *jobQueue) Head() (ScheduledJob, error) {
return jq.delegate[0], nil
}

// Get returns the scheduled job with the specified key without removing it
// from the queue.
func (jq *jobQueue) Get(jobKey *JobKey) (ScheduledJob, error) {
jq.mtx.Lock()
defer jq.mtx.Unlock()
for _, scheduled := range jq.delegate {
if scheduled.JobDetail().jobKey.Equals(jobKey) {
return scheduled, nil
}
}
return nil, jobNotFoundError(jobKey.String())
}

// Remove removes and returns the scheduled job with the specified key.
func (jq *jobQueue) Remove(jobKey *JobKey) (ScheduledJob, error) {
jq.mtx.Lock()
Expand All @@ -174,7 +191,7 @@ func (jq *jobQueue) Remove(jobKey *JobKey) (ScheduledJob, error) {
return heap.Remove(&jq.delegate, i).(ScheduledJob), nil
}
}
return nil, jobNotFoundError(fmt.Sprintf("for key %s", jobKey))
return nil, jobNotFoundError(jobKey.String())
}

// ScheduledJobs returns the slice of all scheduled jobs in the queue.
Expand Down

0 comments on commit 79c9f3a

Please sign in to comment.