-
Notifications
You must be signed in to change notification settings - Fork 170
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(que): Fix bulk_enqueue when enqueuing more than 5 jobs #1074
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -304,6 +304,74 @@ def self.run(first, second); end | |
end | ||
end | ||
|
||
describe 'enqueueing multiple jobs' do | ||
it 'creates a span' do | ||
Que.bulk_enqueue do | ||
10.times { TestJobAsync.enqueue } | ||
end | ||
|
||
_(finished_spans.size).must_equal(1) | ||
|
||
span = finished_spans.last | ||
_(span.kind).must_equal(:producer) | ||
end | ||
|
||
it 'names the created span' do | ||
Que.bulk_enqueue do | ||
10.times { TestJobAsync.enqueue } | ||
end | ||
|
||
span = finished_spans.last | ||
_(span.name).must_equal('TestJobAsync publish') | ||
end | ||
|
||
it 'links spans together' do | ||
bulk_jobs = Que.bulk_enqueue do | ||
10.times { TestJobAsync.enqueue } | ||
end | ||
|
||
bulk_jobs.each { |job| Que.run_job_middleware(job) { job.tap(&:_run) } } | ||
|
||
_(finished_spans.size).must_equal(11) | ||
|
||
publish_span = finished_spans.first | ||
|
||
process_spans = finished_spans.drop(1) | ||
|
||
process_spans.each do |process_span| | ||
_(publish_span.trace_id).wont_equal(process_span.trace_id) | ||
|
||
_(process_span.total_recorded_links).must_equal(1) | ||
_(process_span.links[0].span_context.trace_id).must_equal(publish_span.trace_id) | ||
_(process_span.links[0].span_context.span_id).must_equal(publish_span.span_id) | ||
end | ||
end | ||
|
||
it 'records attributes' do | ||
Que.bulk_enqueue do | ||
10.times { TestJobAsync.enqueue } | ||
end | ||
|
||
attributes = finished_spans.last.attributes | ||
_(attributes['messaging.system']).must_equal('que') | ||
_(attributes['messaging.destination']).must_equal('default') | ||
_(attributes['messaging.destination_kind']).must_equal('queue') | ||
_(attributes['messaging.operation']).must_equal('publish') | ||
_(attributes['messaging.que.job_class']).must_equal('TestJobAsync') | ||
_(attributes['messaging.que.priority']).must_equal(100) | ||
_(attributes.key?('messaging.message_id')).must_equal(false) | ||
end | ||
end | ||
|
||
describe 'enqueueing zero jobs' do | ||
it 'creates a span' do | ||
Que.bulk_enqueue do | ||
end | ||
|
||
_(finished_spans.size).must_equal(1) | ||
end | ||
end | ||
|
||
describe 'processing a job' do | ||
before do | ||
bulk_job = Que.bulk_enqueue do | ||
|
@@ -363,10 +431,10 @@ def self.run(first, second); end | |
|
||
_(finished_spans.size).must_equal(2) | ||
|
||
span1 = finished_spans.first | ||
span1 = finished_spans.last | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. did something change with ordering? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, forgot to mention this in commit message. It affects the "synchronous" mode test where previously the "produce" span finished first and only then "consume" finished. Now they finish in opposite order. I do not know what is the desired behavior, not sure if it's even important here what order they are in. But observe that now the behavior is identical to the normal enqueue for synchronous jobs. Here is same test for regular enqueue and observe that now the two tests are identical. |
||
_(span1.kind).must_equal(:producer) | ||
|
||
span2 = finished_spans.last | ||
span2 = finished_spans.first | ||
_(span2.kind).must_equal(:consumer) | ||
end | ||
|
||
|
@@ -375,10 +443,10 @@ def self.run(first, second); end | |
TestJobSync.enqueue | ||
end | ||
|
||
span1 = finished_spans.first | ||
span1 = finished_spans.last | ||
_(span1.name).must_equal('TestJobSync publish') | ||
|
||
span2 = finished_spans.last | ||
span2 = finished_spans.first | ||
_(span2.name).must_equal('TestJobSync process') | ||
end | ||
|
||
|
@@ -387,7 +455,7 @@ def self.run(first, second); end | |
TestJobSync.enqueue | ||
end | ||
|
||
attributes = finished_spans.last.attributes | ||
attributes = finished_spans.first.attributes | ||
_(attributes['messaging.system']).must_equal('que') | ||
_(attributes['messaging.destination']).must_equal('default') | ||
_(attributes['messaging.destination_kind']).must_equal('queue') | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it add
messaging.message_id
to the attributes? It shouldn't in case of a bulk enqueue. The other properties seem correct because they should be the same according to the bulk enqueue limitations.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The job attributes do not contain ID in the case of bulk enqueue, so no ID is added to attributes.
This is so because IDs only exist after a job is inserted into database but for bulk enqueue we set the attributes before they are inserted into database.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will see if I can also add a test to assert that message ID is not added
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a test case to check lack of message ID attribute and presence of other attributes