Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(que): Fix bulk_enqueue when enqueuing more than 5 jobs #1074

Merged
merged 1 commit into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ class << base
# Module to prepend to Que singleton class
module ClassMethods
def enqueue(*args, job_options: {}, **arg_opts)
# In Que version 2.1.0 `bulk_enqueue` was introduced.
# In that case, the span is created inside the `bulk_enqueue` method.
return super(*args, **arg_opts) if gem_version >= Gem::Version.new('2.1.0') && Thread.current[:que_jobs_to_bulk_insert]

tracer = Que::Instrumentation.instance.tracer
otel_config = Que::Instrumentation.instance.config

Expand All @@ -43,19 +47,8 @@ def enqueue(*args, job_options: {}, **arg_opts)
OpenTelemetry.propagation.inject(tags, setter: TagSetter)
end

# In Que version 2.1.0 `bulk_enqueue` was introduced and in order
# for it to work, we must pass `job_options` to `bulk_enqueue` instead of enqueue.
if gem_version >= Gem::Version.new('2.1.0') && Thread.current[:que_jobs_to_bulk_insert]
Thread.current[:que_jobs_to_bulk_insert][:job_options] = Thread.current[:que_jobs_to_bulk_insert][:job_options]&.merge(tags: tags) do |_, a, b|
a.is_a?(Array) && b.is_a?(Array) ? a.concat(b) : b
end

job = super(*args, **arg_opts)
job_attrs = Thread.current[:que_jobs_to_bulk_insert][:jobs_attrs].last
else
job = super(*args, job_options: job_options.merge(tags: tags), **arg_opts)
job_attrs = job.que_attrs
end
job = super(*args, job_options: job_options.merge(tags: tags), **arg_opts)
job_attrs = job.que_attrs

span.name = "#{job_attrs[:job_class]} publish"
span.add_attributes(QueJob.job_attributes(job_attrs))
Expand All @@ -67,6 +60,32 @@ def enqueue(*args, job_options: {}, **arg_opts)
def gem_version
@gem_version ||= Gem.loaded_specs['que'].version
end

if Gem.loaded_specs['que'].version >= Gem::Version.new('2.1.0')
def bulk_enqueue(**_kwargs, &block)
tracer = Que::Instrumentation.instance.tracer
otel_config = Que::Instrumentation.instance.config

tracer.in_span('publish', kind: :producer) do |span|
super do
yield

job_attrs = Thread.current[:que_jobs_to_bulk_insert][:jobs_attrs]

unless job_attrs.empty?
span.name = "#{job_attrs.first[:job_class]} publish"
span.add_attributes(QueJob.job_attributes(job_attrs.first))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it add messaging.message_id to the attributes? It shouldn't in case of a bulk enqueue. The other properties seem correct because they should be the same according to the bulk enqueue limitations.

Copy link
Contributor Author

@laurglia laurglia Jul 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The job attributes do not contain ID in the case of bulk enqueue, so no ID is added to attributes.

This is so because IDs only exist after a job is inserted into database but for bulk enqueue we set the attributes before they are inserted into database.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will see if I can also add a test to assert that message ID is not added

Copy link
Contributor Author

@laurglia laurglia Jul 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test case to check lack of message ID attribute and presence of other attributes

end

if otel_config[:propagation_style] != :none
job_options = Thread.current[:que_jobs_to_bulk_insert][:job_options]
job_options[:tags] ||= []
OpenTelemetry.propagation.inject(job_options[:tags], setter: TagSetter)
end
end
end
end
end
end

def self.job_attributes(job_attrs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,74 @@ def self.run(first, second); end
end
end

describe 'enqueueing multiple jobs' do
it 'creates a span' do
Que.bulk_enqueue do
10.times { TestJobAsync.enqueue }
end

_(finished_spans.size).must_equal(1)

span = finished_spans.last
_(span.kind).must_equal(:producer)
end

it 'names the created span' do
Que.bulk_enqueue do
10.times { TestJobAsync.enqueue }
end

span = finished_spans.last
_(span.name).must_equal('TestJobAsync publish')
end

it 'links spans together' do
bulk_jobs = Que.bulk_enqueue do
10.times { TestJobAsync.enqueue }
end

bulk_jobs.each { |job| Que.run_job_middleware(job) { job.tap(&:_run) } }

_(finished_spans.size).must_equal(11)

publish_span = finished_spans.first

process_spans = finished_spans.drop(1)

process_spans.each do |process_span|
_(publish_span.trace_id).wont_equal(process_span.trace_id)

_(process_span.total_recorded_links).must_equal(1)
_(process_span.links[0].span_context.trace_id).must_equal(publish_span.trace_id)
_(process_span.links[0].span_context.span_id).must_equal(publish_span.span_id)
end
end

it 'records attributes' do
Que.bulk_enqueue do
10.times { TestJobAsync.enqueue }
end

attributes = finished_spans.last.attributes
_(attributes['messaging.system']).must_equal('que')
_(attributes['messaging.destination']).must_equal('default')
_(attributes['messaging.destination_kind']).must_equal('queue')
_(attributes['messaging.operation']).must_equal('publish')
_(attributes['messaging.que.job_class']).must_equal('TestJobAsync')
_(attributes['messaging.que.priority']).must_equal(100)
_(attributes.key?('messaging.message_id')).must_equal(false)
end
end

describe 'enqueueing zero jobs' do
it 'creates a span' do
Que.bulk_enqueue do
end

_(finished_spans.size).must_equal(1)
end
end

describe 'processing a job' do
before do
bulk_job = Que.bulk_enqueue do
Expand Down Expand Up @@ -363,10 +431,10 @@ def self.run(first, second); end

_(finished_spans.size).must_equal(2)

span1 = finished_spans.first
span1 = finished_spans.last
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did something change with ordering?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, forgot to mention this in commit message. It affects the "synchronous" mode test where previously the "produce" span finished first and only then "consume" finished. Now they finish in opposite order.

I do not know what is the desired behavior, not sure if it's even important here what order they are in.

But observe that now the behavior is identical to the normal enqueue for synchronous jobs. Here is same test for regular enqueue and observe that now the two tests are identical.

_(span1.kind).must_equal(:producer)

span2 = finished_spans.last
span2 = finished_spans.first
_(span2.kind).must_equal(:consumer)
end

Expand All @@ -375,10 +443,10 @@ def self.run(first, second); end
TestJobSync.enqueue
end

span1 = finished_spans.first
span1 = finished_spans.last
_(span1.name).must_equal('TestJobSync publish')

span2 = finished_spans.last
span2 = finished_spans.first
_(span2.name).must_equal('TestJobSync process')
end

Expand All @@ -387,7 +455,7 @@ def self.run(first, second); end
TestJobSync.enqueue
end

attributes = finished_spans.last.attributes
attributes = finished_spans.first.attributes
_(attributes['messaging.system']).must_equal('que')
_(attributes['messaging.destination']).must_equal('default')
_(attributes['messaging.destination_kind']).must_equal('queue')
Expand Down
Loading