From a478894c8706573e11508aac54cd34463f7a4c2e Mon Sep 17 00:00:00 2001 From: Mateusz <76775507+szczygiel-m@users.noreply.github.com> Date: Fri, 14 Jun 2024 12:08:25 +0200 Subject: [PATCH] Added measurement point to consumer profiling (#1869) --- .../tech/hermes/consumers/consumer/ConsumerMessageSender.java | 1 + .../tech/hermes/consumers/consumer/profiling/Measurement.java | 1 + .../tech/hermes/integrationtests/ConsumerProfilingTest.java | 3 +++ 3 files changed, 5 insertions(+) diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/ConsumerMessageSender.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/ConsumerMessageSender.java index eb37f61a85..151235730d 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/ConsumerMessageSender.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/ConsumerMessageSender.java @@ -118,6 +118,7 @@ public void sendAsync(Message message, ConsumerProfiler profiler) { } private void sendAsync(Message message, int delayMillis, ConsumerProfiler profiler) { + profiler.measure(Measurement.SCHEDULE_MESSAGE_SENDING); retrySingleThreadExecutor.schedule(() -> sendMessage(message, profiler), delayMillis, TimeUnit.MILLISECONDS); } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/profiling/Measurement.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/profiling/Measurement.java index 2de07271fb..a601674687 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/profiling/Measurement.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/profiling/Measurement.java @@ -8,6 +8,7 @@ public class Measurement { public static final String MESSAGE_CONVERSION = "messageConverter.convert"; public static final String OFFER_INFLIGHT_OFFSET = "offsetQueue.offerInflightOffset"; public static final String TRACKERS_LOG_INFLIGHT = "trackers.logInflight"; + public static final String SCHEDULE_MESSAGE_SENDING = "retrySingleThreadExecutor.schedule"; public static final String ACQUIRE_RATE_LIMITER = "acquireRateLimiter"; public static final String MESSAGE_SENDER_SEND = "messageSender.send"; public static final String HANDLERS = "handlers"; diff --git a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/ConsumerProfilingTest.java b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/ConsumerProfilingTest.java index e673317c2e..64148f67d0 100644 --- a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/ConsumerProfilingTest.java +++ b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/ConsumerProfilingTest.java @@ -148,6 +148,7 @@ public void shouldProfileSuccessfulMessageProcessing() { Measurement.MESSAGE_CONVERSION, Measurement.OFFER_INFLIGHT_OFFSET, Measurement.TRACKERS_LOG_INFLIGHT, + Measurement.SCHEDULE_MESSAGE_SENDING, Measurement.ACQUIRE_RATE_LIMITER, Measurement.MESSAGE_SENDER_SEND, Measurement.HANDLERS, @@ -183,6 +184,7 @@ public void shouldProfileDiscardedMessageProcessing() { Measurement.MESSAGE_CONVERSION, Measurement.OFFER_INFLIGHT_OFFSET, Measurement.TRACKERS_LOG_INFLIGHT, + Measurement.SCHEDULE_MESSAGE_SENDING, Measurement.ACQUIRE_RATE_LIMITER, Measurement.MESSAGE_SENDER_SEND, Measurement.HANDLERS, @@ -218,6 +220,7 @@ public void shouldProfileRetriedMessageProcessing() { Measurement.MESSAGE_CONVERSION, Measurement.OFFER_INFLIGHT_OFFSET, Measurement.TRACKERS_LOG_INFLIGHT, + Measurement.SCHEDULE_MESSAGE_SENDING, Measurement.ACQUIRE_RATE_LIMITER, Measurement.MESSAGE_SENDER_SEND, Measurement.HANDLERS,