Skip to content

Commit

Permalink
implements a mosquito api for reading queues and fetching job runs fr…
Browse files Browse the repository at this point in the history
…om them
  • Loading branch information
robacarp committed Nov 3, 2024
1 parent e20a4b9 commit e7220cd
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 0 deletions.
57 changes: 57 additions & 0 deletions spec/mosquito/api/queue_spec.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
require "../../spec_helper"

describe Mosquito::Api::JobRun do
let(job_classes) {
[QueuedTestJob, PassingJob, FailingJob, QueueHookedTestJob]
}
let(queued_test_job) { QueuedTestJob.new }
let(passing_job) { PassingJob.new }

it "can fetch a list of current queues" do
clean_slate do
queued_test_job.enqueue
passing_job.enqueue
expected_queues = ["queued_test_job", "passing_job"].sort
queues = Mosquito::Api::Queue.all
assert_equal 2, queues.size
assert_equal expected_queues, queues.map(&.name).sort
end
end

it "can fetch the size of a queue" do
clean_slate do
job_classes.map(&.new).each(&.enqueue)
queues = Mosquito::Api::Queue.all
queues.each do |queue|
assert_equal 1, queue.size
end
end
end

it "can fetch the size details of a queue" do
clean_slate do
job_classes.map(&.new).each(&.enqueue)
queues = Mosquito::Api::Queue.all
sizes = queues.map(&.size_details)
sizes.each do |size|
assert_equal 1, size["waiting"]
assert_equal 0, size["scheduled"]
assert_equal 0, size["pending"]
assert_equal 0, size["dead"]
end
end
end

it "can fetch job runs from a queue" do
clean_slate do
job_classes.each do |job_class|
job = job_class.new
job.enqueue
api = Mosquito::Api::Queue.new job_class.queue.name
job_runs = api.waiting_job_runs
assert_equal 1, job_runs.size
assert_equal job.class.name.underscore, job_runs.first.type
end
end
end
end
6 changes: 6 additions & 0 deletions src/mosquito/api.cr
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ module Mosquito::Api
def self.job_run(id : String) : JobRun
JobRun.new id
end

def self.list_queues : Array(Observability::Queue)
Mosquito.backend.list_queues
.map { |name| Observability::Queue.new name }
end

def self.list_overseers : Array(Overseer)
Mosquito.backend.list_overseers
.map { |name| Overseer.new name }
Expand Down
38 changes: 38 additions & 0 deletions src/mosquito/api/queue.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
module Mosquito::Api
class Queue
getter name : String

private property backend : Mosquito::Backend

def self.all : Array(Queue)
Mosquito.backend.list_queues.map { |name| new name }
end

def initialize(@name)
@backend = Mosquito.backend.named name
end

{% for name in Mosquito::Backend::QUEUES %}
def {{name.id}}_job_runs : Array(JobRun)
backend.dump_{{name.id}}_q
.map { |task_id| JobRun.new task_id }
end
{% end %}

def size : Int64
backend.size(include_dead: false)
end

def size_details : Hash(String, Int64)
sizes = {} of String => Int64
{% for name in Mosquito::Backend::QUEUES %}
sizes["{{name.id}}"] = backend.{{name.id}}_size
{% end %}
sizes
end

def <=>(other)
name <=> other.name
end
end
end
16 changes: 16 additions & 0 deletions src/mosquito/redis_backend.cr
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,22 @@ module Mosquito
raise "don't know how to dump a #{type} for {{name.id}}"
end
end

def {{name.id}}_size : Int64
key = {{name.id}}_q
type = redis.type key

case type
when "list"
redis.llen(key).as(Int64)
when "zset"
redis.zcount(key, "0", "+inf").as(Int64)
when "none"
0_i64
else
raise "don't know how to {{name.id}}_size (redis type is a #{type})."
end
end
{% end %}

def scheduled_job_run_time(job_run : JobRun) : String?
Expand Down
4 changes: 4 additions & 0 deletions src/mosquito/test_backend.cr
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ module Mosquito
def dump_{{name.id}}_q : Array(String)
[] of String
end

def {{name.id}}_size : Int64
0_i64
end
{% end %}

def scheduled_job_run_time(job_run : JobRun) : String?
Expand Down

0 comments on commit e7220cd

Please sign in to comment.