From 4e7dd5f680a4795245f84f371d8488339e5fabe8 Mon Sep 17 00:00:00 2001 From: Oliver Drotbohm Date: Thu, 23 May 2024 17:22:40 +0200 Subject: [PATCH] GH-637 - Enable TraceContext propagation across asynchronous threads. We're now registering a ThreadPoolTaskScheduler and customize the resources created to propagate the Trace context into threads created by it. This is necessary to connect the tracing spans created for events handled asynchronously. --- .../ModuleObservabilityAutoConfiguration.java | 47 +++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/spring-modulith-observability/src/main/java/org/springframework/modulith/observability/autoconfigure/ModuleObservabilityAutoConfiguration.java b/spring-modulith-observability/src/main/java/org/springframework/modulith/observability/autoconfigure/ModuleObservabilityAutoConfiguration.java index a4e406fd1..2b001d739 100644 --- a/spring-modulith-observability/src/main/java/org/springframework/modulith/observability/autoconfigure/ModuleObservabilityAutoConfiguration.java +++ b/spring-modulith-observability/src/main/java/org/springframework/modulith/observability/autoconfigure/ModuleObservabilityAutoConfiguration.java @@ -22,22 +22,36 @@ import brave.handler.MutableSpan; import brave.handler.SpanHandler; import brave.propagation.TraceContext; +import io.micrometer.context.ContextExecutorService; +import io.micrometer.context.ContextScheduledExecutorService; +import io.micrometer.context.ContextSnapshot; +import io.micrometer.context.ContextSnapshotFactory; import io.micrometer.tracing.Tracer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.function.Supplier; + import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; +import org.springframework.boot.autoconfigure.AutoConfigureBefore; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.modulith.observability.ModuleEventListener; import org.springframework.modulith.observability.ModuleTracingBeanPostProcessor; import org.springframework.modulith.runtime.ApplicationModulesRuntime; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; /** * @author Oliver Drotbohm */ @Configuration(proxyBeanMethods = false) +@AutoConfigureBefore(TaskExecutionAutoConfiguration.class) @ConditionalOnProperty(name = "management.tracing.enabled", havingValue = "true", matchIfMissing = true) class ModuleObservabilityAutoConfiguration { @@ -53,6 +67,39 @@ static ModuleEventListener tracingModuleEventListener(ApplicationModulesRuntime return new ModuleEventListener(runtime, () -> tracer.getObject()); } + /** + * Custom override of default {@link ThreadPoolTaskScheduler} to make sure asynchronous method invocations get the + * {@link io.micrometer.tracing.handler.TracingObservationHandler.TracingContext} forwarded into threads spawned for + * those methods. The name of the bean is important for it to be picked up by the async invocation + * infrastructure! + */ + @Bean(name = "taskExecutor", destroyMethod = "shutdown") + ThreadPoolTaskScheduler threadPoolTaskScheduler() { + + ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler() { + + private static final long serialVersionUID = -3935299327010101697L; + private final Supplier captureAll = () -> ContextSnapshotFactory.builder().build().captureAll(); + + @Override + protected ExecutorService initializeExecutor(ThreadFactory threadFactory, + RejectedExecutionHandler rejectedExecutionHandler) { + + return ContextExecutorService.wrap(super.initializeExecutor(threadFactory, rejectedExecutionHandler), + captureAll); + } + + @Override + public ScheduledExecutorService getScheduledExecutor() throws IllegalStateException { + return ContextScheduledExecutorService.wrap(super.getScheduledExecutor(), captureAll); + } + }; + + threadPoolTaskScheduler.initialize(); + + return threadPoolTaskScheduler; + } + /** * Brave-specific auto configuration. *