From 67bda51cc5d59f74e53154d996a0153721510f54 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Lavoie?= Date: Mon, 22 Jul 2024 11:50:02 -0400 Subject: [PATCH] feat(active-job): Normalize event messages --- .../instrumentation/active_job/handlers.rb | 4 +++- .../active_job/handlers/default.rb | 17 ++++++++++++++++- .../active_job/handlers/enqueue.rb | 17 ++++------------- .../active_job/handlers/perform.rb | 17 +++-------------- .../active_job/handlers/discard_test.rb | 2 +- .../active_job/handlers/retry_stopped_test.rb | 2 +- 6 files changed, 28 insertions(+), 31 deletions(-) diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers.rb index 7eafb94252..a88260e2ea 100644 --- a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers.rb +++ b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers.rb @@ -14,6 +14,8 @@ module Instrumentation module ActiveJob # Module that contains custom event handlers, which are used to generate spans per event module Handlers + EVENT_NAMESPACE = 'active_job'.freeze + module_function # Subscribes Event Handlers to relevant ActiveJob notifications @@ -57,7 +59,7 @@ def subscribe } @subscriptions = handlers_by_pattern.map do |key, handler| - ::ActiveSupport::Notifications.subscribe("#{key}.active_job", handler) + ::ActiveSupport::Notifications.subscribe("#{key}.#{EVENT_NAMESPACE}", handler) end end diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/default.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/default.rb index 692346aeb0..a5c007749f 100644 --- a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/default.rb +++ b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/default.rb @@ -40,7 +40,10 @@ def start(name, id, payload) # @param payload [Hash] containing job run information # @return [Hash] with the span and generated context tokens def start_span(name, _id, payload) - span = tracer.start_span(name, attributes: @mapper.call(payload)) + job = payload.fetch(:job) + event_name = name.delete_suffix(".#{EVENT_NAMESPACE}") + span_name = span_name(job, event_name) + span = tracer.start_span(span_name, attributes: @mapper.call(payload)) token = OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span)) { span: span, ctx_token: token } @@ -106,6 +109,18 @@ def on_exception(exception, span) def tracer OpenTelemetry::Instrumentation::ActiveJob::Instrumentation.instance.tracer end + + private + + def span_name(job, event_name) + prefix = if @config[:span_naming] == :job_class + job.class.name + else + job.queue_name + end + + "#{prefix} #{event_name}" + end end end end diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/enqueue.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/enqueue.rb index 4b12917476..3a66d9d45f 100644 --- a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/enqueue.rb +++ b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/enqueue.rb @@ -10,6 +10,8 @@ module ActiveJob module Handlers # Handles `enqueue.active_job` and `enqueue_at.active_job` to generate egress spans class Enqueue < Default + EVENT_NAME = 'publish'.freeze + # Overrides the `Default#start_span` method to create an egress span # and registers it with the current context # @@ -19,22 +21,11 @@ class Enqueue < Default # @return [Hash] with the span and generated context tokens def start_span(name, _id, payload) job = payload.fetch(:job) - span = tracer.start_span(span_name(job), kind: :producer, attributes: @mapper.call(payload)) + span_name = span_name(job, EVENT_NAME) + span = tracer.start_span(span_name, kind: :producer, attributes: @mapper.call(payload)) OpenTelemetry.propagation.inject(job.__otel_headers) # This must be transmitted over the wire { span: span, ctx_token: OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span)) } end - - private - - def span_name(job) - prefix = if @config[:span_naming] == :job_class - job.class.name - else - job.queue_name - end - - "#{prefix} publish" - end end end end diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/perform.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/perform.rb index 853281a5e4..abdebda2ee 100644 --- a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/perform.rb +++ b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/perform.rb @@ -10,6 +10,8 @@ module ActiveJob module Handlers # Handles perform.active_job to generate ingress spans class Perform < Default + EVENT_NAME = 'process'.freeze + # Overrides the `Default#start_span` method to create an ingress span # and registers it with the current context # @@ -19,10 +21,9 @@ class Perform < Default # @return [Hash] with the span and generated context tokens def start_span(name, _id, payload) job = payload.fetch(:job) + span_name = span_name(job, EVENT_NAME) parent_context = OpenTelemetry.propagation.extract(job.__otel_headers) - span_name = span_name(job) - # TODO: Refactor into a propagation strategy propagation_style = @config[:propagation_style] if propagation_style == :child @@ -48,18 +49,6 @@ def attach_consumer_context(span) OpenTelemetry::Context.attach(internal_context) end - - private - - def span_name(job) - prefix = if @config[:span_naming] == :job_class - job.class.name - else - job.queue_name - end - - "#{prefix} process" - end end end end diff --git a/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/discard_test.rb b/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/discard_test.rb index 89a32b6c62..76c0c82c58 100644 --- a/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/discard_test.rb +++ b/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/discard_test.rb @@ -15,7 +15,7 @@ let(:spans) { exporter.finished_spans } let(:publish_span) { spans.find { |s| s.name == 'default publish' } } let(:process_span) { spans.find { |s| s.name == 'default process' } } - let(:discard_span) { spans.find { |s| s.name == 'discard.active_job' } } + let(:discard_span) { spans.find { |s| s.name == 'default discard' } } before do OpenTelemetry::Instrumentation::ActiveJob::Handlers.unsubscribe diff --git a/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/retry_stopped_test.rb b/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/retry_stopped_test.rb index 5b4eb774c3..a1d931755e 100644 --- a/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/retry_stopped_test.rb +++ b/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/retry_stopped_test.rb @@ -15,7 +15,7 @@ let(:spans) { exporter.finished_spans } let(:publish_span) { spans.find { |s| s.name == 'default publish' } } let(:process_span) { spans.find { |s| s.name == 'default process' } } - let(:retry_span) { spans.find { |s| s.name == 'retry_stopped.active_job' } } + let(:retry_span) { spans.find { |s| s.name == 'default retry_stopped' } } before do OpenTelemetry::Instrumentation::ActiveJob::Handlers.unsubscribe