diff --git a/instrumentation/que/lib/opentelemetry/instrumentation/que/patches/que_job.rb b/instrumentation/que/lib/opentelemetry/instrumentation/que/patches/que_job.rb index 5be898984..a189f5aee 100644 --- a/instrumentation/que/lib/opentelemetry/instrumentation/que/patches/que_job.rb +++ b/instrumentation/que/lib/opentelemetry/instrumentation/que/patches/que_job.rb @@ -19,6 +19,10 @@ class << base # Module to prepend to Que singleton class module ClassMethods def enqueue(*args, job_options: {}, **arg_opts) + # In Que version 2.1.0 `bulk_enqueue` was introduced. + # In that case, the span is created inside the `bulk_enqueue` method. + return super(*args, **arg_opts) if gem_version >= Gem::Version.new('2.1.0') && Thread.current[:que_jobs_to_bulk_insert] + tracer = Que::Instrumentation.instance.tracer otel_config = Que::Instrumentation.instance.config @@ -43,19 +47,8 @@ def enqueue(*args, job_options: {}, **arg_opts) OpenTelemetry.propagation.inject(tags, setter: TagSetter) end - # In Que version 2.1.0 `bulk_enqueue` was introduced and in order - # for it to work, we must pass `job_options` to `bulk_enqueue` instead of enqueue. - if gem_version >= Gem::Version.new('2.1.0') && Thread.current[:que_jobs_to_bulk_insert] - Thread.current[:que_jobs_to_bulk_insert][:job_options] = Thread.current[:que_jobs_to_bulk_insert][:job_options]&.merge(tags: tags) do |_, a, b| - a.is_a?(Array) && b.is_a?(Array) ? a.concat(b) : b - end - - job = super(*args, **arg_opts) - job_attrs = Thread.current[:que_jobs_to_bulk_insert][:jobs_attrs].last - else - job = super(*args, job_options: job_options.merge(tags: tags), **arg_opts) - job_attrs = job.que_attrs - end + job = super(*args, job_options: job_options.merge(tags: tags), **arg_opts) + job_attrs = job.que_attrs span.name = "#{job_attrs[:job_class]} publish" span.add_attributes(QueJob.job_attributes(job_attrs)) @@ -67,6 +60,32 @@ def enqueue(*args, job_options: {}, **arg_opts) def gem_version @gem_version ||= Gem.loaded_specs['que'].version end + + if Gem.loaded_specs['que'].version >= Gem::Version.new('2.1.0') + def bulk_enqueue(**_kwargs, &block) + tracer = Que::Instrumentation.instance.tracer + otel_config = Que::Instrumentation.instance.config + + tracer.in_span('publish', kind: :producer) do |span| + super do + yield + + job_attrs = Thread.current[:que_jobs_to_bulk_insert][:jobs_attrs] + + unless job_attrs.empty? + span.name = "#{job_attrs.first[:job_class]} publish" + span.add_attributes(QueJob.job_attributes(job_attrs.first)) + end + + if otel_config[:propagation_style] != :none + job_options = Thread.current[:que_jobs_to_bulk_insert][:job_options] + job_options[:tags] ||= [] + OpenTelemetry.propagation.inject(job_options[:tags], setter: TagSetter) + end + end + end + end + end end def self.job_attributes(job_attrs) diff --git a/instrumentation/que/test/opentelemetry/instrumentation/que_test.rb b/instrumentation/que/test/opentelemetry/instrumentation/que_test.rb index 9ec9847c1..7222e82cd 100644 --- a/instrumentation/que/test/opentelemetry/instrumentation/que_test.rb +++ b/instrumentation/que/test/opentelemetry/instrumentation/que_test.rb @@ -304,6 +304,74 @@ def self.run(first, second); end end end + describe 'enqueueing multiple jobs' do + it 'creates a span' do + Que.bulk_enqueue do + 10.times { TestJobAsync.enqueue } + end + + _(finished_spans.size).must_equal(1) + + span = finished_spans.last + _(span.kind).must_equal(:producer) + end + + it 'names the created span' do + Que.bulk_enqueue do + 10.times { TestJobAsync.enqueue } + end + + span = finished_spans.last + _(span.name).must_equal('TestJobAsync publish') + end + + it 'links spans together' do + bulk_jobs = Que.bulk_enqueue do + 10.times { TestJobAsync.enqueue } + end + + bulk_jobs.each { |job| Que.run_job_middleware(job) { job.tap(&:_run) } } + + _(finished_spans.size).must_equal(11) + + publish_span = finished_spans.first + + process_spans = finished_spans.drop(1) + + process_spans.each do |process_span| + _(publish_span.trace_id).wont_equal(process_span.trace_id) + + _(process_span.total_recorded_links).must_equal(1) + _(process_span.links[0].span_context.trace_id).must_equal(publish_span.trace_id) + _(process_span.links[0].span_context.span_id).must_equal(publish_span.span_id) + end + end + + it 'records attributes' do + Que.bulk_enqueue do + 10.times { TestJobAsync.enqueue } + end + + attributes = finished_spans.last.attributes + _(attributes['messaging.system']).must_equal('que') + _(attributes['messaging.destination']).must_equal('default') + _(attributes['messaging.destination_kind']).must_equal('queue') + _(attributes['messaging.operation']).must_equal('publish') + _(attributes['messaging.que.job_class']).must_equal('TestJobAsync') + _(attributes['messaging.que.priority']).must_equal(100) + _(attributes.key?('messaging.message_id')).must_equal(false) + end + end + + describe 'enqueueing zero jobs' do + it 'creates a span' do + Que.bulk_enqueue do + end + + _(finished_spans.size).must_equal(1) + end + end + describe 'processing a job' do before do bulk_job = Que.bulk_enqueue do @@ -363,10 +431,10 @@ def self.run(first, second); end _(finished_spans.size).must_equal(2) - span1 = finished_spans.first + span1 = finished_spans.last _(span1.kind).must_equal(:producer) - span2 = finished_spans.last + span2 = finished_spans.first _(span2.kind).must_equal(:consumer) end @@ -375,10 +443,10 @@ def self.run(first, second); end TestJobSync.enqueue end - span1 = finished_spans.first + span1 = finished_spans.last _(span1.name).must_equal('TestJobSync publish') - span2 = finished_spans.last + span2 = finished_spans.first _(span2.name).must_equal('TestJobSync process') end @@ -387,7 +455,7 @@ def self.run(first, second); end TestJobSync.enqueue end - attributes = finished_spans.last.attributes + attributes = finished_spans.first.attributes _(attributes['messaging.system']).must_equal('que') _(attributes['messaging.destination']).must_equal('default') _(attributes['messaging.destination_kind']).must_equal('queue')