Skip to content

Commit

Permalink
Add SimpleSpanProcessor support
Browse files Browse the repository at this point in the history
  • Loading branch information
alesj committed Oct 31, 2024
1 parent 80ed777 commit 75b098a
Show file tree
Hide file tree
Showing 22 changed files with 355 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ AdditionalBeanBuildItem ensureProducerIsRetained() {
return AdditionalBeanBuildItem.builder()
.setUnremovable()
.addBeanClasses(
AutoConfiguredOpenTelemetrySdkBuilderCustomizer.SimpleLogRecordProcessorCustomizer.class,
AutoConfiguredOpenTelemetrySdkBuilderCustomizer.TracingResourceCustomizer.class,
AutoConfiguredOpenTelemetrySdkBuilderCustomizer.SamplerCustomizer.class,
AutoConfiguredOpenTelemetrySdkBuilderCustomizer.TracerProviderCustomizer.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import io.quarkus.opentelemetry.runtime.config.runtime.exporter.OtlpExporterConfigBuilder;
import io.quarkus.opentelemetry.runtime.config.runtime.exporter.OtlpExporterRuntimeConfig;
import io.quarkus.opentelemetry.runtime.exporter.otlp.OTelExporterRecorder;
import io.quarkus.opentelemetry.runtime.exporter.otlp.tracing.LateBoundBatchSpanProcessor;
import io.quarkus.opentelemetry.runtime.exporter.otlp.tracing.LateBoundSpanProcessor;
import io.quarkus.tls.TlsConfigurationRegistry;
import io.quarkus.tls.TlsRegistryBuildItem;
import io.quarkus.vertx.core.deployment.CoreVertxBuildItem;
Expand Down Expand Up @@ -84,7 +84,8 @@ void config(BuildProducer<RunTimeConfigBuilderBuildItem> runTimeConfigBuilderPro
@BuildStep(onlyIf = OtlpExporterProcessor.OtlpTracingExporterEnabled.class)
@Record(ExecutionTime.RUNTIME_INIT)
@Consume(TlsRegistryBuildItem.class)
void createBatchSpanProcessor(OTelExporterRecorder recorder,
void createSpanProcessor(OTelExporterRecorder recorder,
OTelBuildConfig oTelBuildConfig,
OTelRuntimeConfig otelRuntimeConfig,
OtlpExporterRuntimeConfig exporterRuntimeConfig,
CoreVertxBuildItem vertxBuildItem,
Expand All @@ -95,16 +96,16 @@ void createBatchSpanProcessor(OTelExporterRecorder recorder,
return;
}
syntheticBeanBuildItemBuildProducer.produce(SyntheticBeanBuildItem
.configure(LateBoundBatchSpanProcessor.class)
.configure(LateBoundSpanProcessor.class)
.types(SpanProcessor.class)
.setRuntimeInit()
.scope(Singleton.class)
.unremovable()
.addInjectionPoint(ParameterizedType.create(DotName.createSimple(Instance.class),
new Type[] { ClassType.create(DotName.createSimple(SpanExporter.class.getName())) }, null))
.addInjectionPoint(ClassType.create(DotName.createSimple(TlsConfigurationRegistry.class)))
.createWith(recorder.batchSpanProcessorForOtlp(otelRuntimeConfig, exporterRuntimeConfig,
vertxBuildItem.getVertx()))
.createWith(recorder.spanProcessorForOtlp(oTelBuildConfig, otelRuntimeConfig,
exporterRuntimeConfig, vertxBuildItem.getVertx()))
.done());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.quarkus.opentelemetry.runtime.config.build.OTelBuildConfig;
import io.quarkus.opentelemetry.runtime.config.runtime.OTelRuntimeConfig;
import io.quarkus.opentelemetry.runtime.exporter.otlp.tracing.LateBoundBatchSpanProcessor;
import io.quarkus.opentelemetry.runtime.exporter.otlp.tracing.LateBoundSpanProcessor;
import io.quarkus.opentelemetry.runtime.tracing.intrumentation.vertx.HttpInstrumenterVertxTracer;
import io.quarkus.test.QuarkusUnitTest;
import io.vertx.core.spi.observability.HttpRequest;
Expand All @@ -26,7 +26,7 @@ public class OpenTelemetryDisabledSdkTest {
.overrideConfigKey("quarkus.otel.sdk.disabled", "true");

@Inject
LateBoundBatchSpanProcessor batchSpanProcessor;
LateBoundSpanProcessor spanProcessor;

@Inject
OpenTelemetry openTelemetry;
Expand All @@ -40,7 +40,7 @@ public class OpenTelemetryDisabledSdkTest {
@Test
void testNoTracer() {
// The OTel API doesn't provide a clear way to check if a tracer is an effective NOOP tracer.
Assertions.assertTrue(batchSpanProcessor.isDelegateNull(), "BatchSpanProcessor delegate must not be set");
Assertions.assertTrue(spanProcessor.isDelegateNull(), "SpanProcessor delegate must not be set");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import io.quarkus.opentelemetry.runtime.exporter.otlp.tracing.LateBoundBatchSpanProcessor;
import io.quarkus.opentelemetry.runtime.exporter.otlp.tracing.LateBoundSpanProcessor;
import io.quarkus.test.QuarkusUnitTest;

public class OtlpTraceExporterDisabledTest {
Expand All @@ -25,15 +25,15 @@ public class OtlpTraceExporterDisabledTest {
OpenTelemetry openTelemetry;

@Inject
Instance<LateBoundBatchSpanProcessor> lateBoundBatchSpanProcessorInstance;
Instance<LateBoundSpanProcessor> lateBoundSpanProcessorInstance;

@Inject
Instance<MetricExporter> metricExporters;

@Test
void testOpenTelemetryButNoBatchSpanProcessor() {
assertNotNull(openTelemetry);
assertFalse(lateBoundBatchSpanProcessorInstance.isResolvable());
assertFalse(lateBoundSpanProcessorInstance.isResolvable());
assertFalse(metricExporters.isResolvable());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder;
import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
import io.opentelemetry.sdk.common.Clock;
import io.opentelemetry.sdk.logs.LogRecordProcessor;
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
import io.opentelemetry.sdk.logs.export.SimpleLogRecordProcessor;
import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.IdGenerator;
Expand All @@ -27,7 +30,7 @@
import io.quarkus.arc.All;
import io.quarkus.opentelemetry.runtime.config.build.OTelBuildConfig;
import io.quarkus.opentelemetry.runtime.config.runtime.OTelRuntimeConfig;
import io.quarkus.opentelemetry.runtime.exporter.otlp.tracing.RemoveableLateBoundBatchSpanProcessor;
import io.quarkus.opentelemetry.runtime.exporter.otlp.tracing.RemoveableLateBoundSpanProcessor;
import io.quarkus.opentelemetry.runtime.propagation.TextMapPropagatorCustomizer;
import io.quarkus.opentelemetry.runtime.tracing.DelayedAttributes;
import io.quarkus.opentelemetry.runtime.tracing.DropTargetsSampler;
Expand All @@ -39,6 +42,46 @@ public interface AutoConfiguredOpenTelemetrySdkBuilderCustomizer {

void customize(AutoConfiguredOpenTelemetrySdkBuilder builder);

@Singleton
final class SimpleLogRecordProcessorCustomizer implements AutoConfiguredOpenTelemetrySdkBuilderCustomizer {
private SimpleLogRecordProcessorBiFunction biFunction;

public SimpleLogRecordProcessorCustomizer(
OTelBuildConfig oTelBuildConfig,
LogRecordExporter logRecordExporter) {
if (oTelBuildConfig.simple()) {
LogRecordProcessor lrp = SimpleLogRecordProcessor.create(logRecordExporter);
this.biFunction = new SimpleLogRecordProcessorBiFunction(lrp);
}
}

@Override
public void customize(AutoConfiguredOpenTelemetrySdkBuilder builder) {
if (biFunction != null) {
builder.addLogRecordProcessorCustomizer(biFunction);
}
}
}

class SimpleLogRecordProcessorBiFunction
implements BiFunction<LogRecordProcessor, ConfigProperties, LogRecordProcessor> {

private final LogRecordProcessor logRecordProcessor;

public SimpleLogRecordProcessorBiFunction(LogRecordProcessor logRecordProcessor) {
this.logRecordProcessor = logRecordProcessor;
}

@Override
public LogRecordProcessor apply(LogRecordProcessor lrp, ConfigProperties cp) {
if (!(lrp instanceof SimpleLogRecordProcessor)) {
return logRecordProcessor;
} else {
return lrp;
}
}
}

@Singleton
final class TracingResourceCustomizer implements AutoConfiguredOpenTelemetrySdkBuilderCustomizer {

Expand Down Expand Up @@ -174,7 +217,7 @@ public SdkTracerProviderBuilder apply(SdkTracerProviderBuilder tracerProviderBui
spanProcessors.stream().filter(new Predicate<SpanProcessor>() {
@Override
public boolean test(SpanProcessor sp) {
return !(sp instanceof RemoveableLateBoundBatchSpanProcessor);
return !(sp instanceof RemoveableLateBoundSpanProcessor);
}
})
.forEach(tracerProviderBuilder::addSpanProcessor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,17 @@ public interface OTelBuildConfig {
@WithDefault("true")
boolean enabled();

/**
* Should we use simple processor for spans and log records.
* This will disable batch processing and the exporter will send
* telemetry data right away.
* This is recommended for serverless applications.
* <p>
* Defaults to <code>false</code>.
*/
@WithDefault("false")
boolean simple();

/**
* Trace exporter configurations.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,12 @@
import io.opentelemetry.sdk.metrics.internal.aggregator.AggregationUtil;
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
import io.opentelemetry.sdk.trace.export.BatchSpanProcessorBuilder;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessorBuilder;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import io.quarkus.arc.SyntheticCreationalContext;
import io.quarkus.opentelemetry.runtime.config.build.OTelBuildConfig;
import io.quarkus.opentelemetry.runtime.config.runtime.BatchSpanProcessorConfig;
import io.quarkus.opentelemetry.runtime.config.runtime.OTelRuntimeConfig;
import io.quarkus.opentelemetry.runtime.config.runtime.exporter.*;
import io.quarkus.opentelemetry.runtime.exporter.otlp.logs.NoopLogRecordExporter;
Expand All @@ -48,8 +52,8 @@
import io.quarkus.opentelemetry.runtime.exporter.otlp.metrics.VertxHttpMetricsExporter;
import io.quarkus.opentelemetry.runtime.exporter.otlp.sender.VertxGrpcSender;
import io.quarkus.opentelemetry.runtime.exporter.otlp.sender.VertxHttpSender;
import io.quarkus.opentelemetry.runtime.exporter.otlp.tracing.LateBoundBatchSpanProcessor;
import io.quarkus.opentelemetry.runtime.exporter.otlp.tracing.RemoveableLateBoundBatchSpanProcessor;
import io.quarkus.opentelemetry.runtime.exporter.otlp.tracing.LateBoundSpanProcessor;
import io.quarkus.opentelemetry.runtime.exporter.otlp.tracing.RemoveableLateBoundSpanProcessor;
import io.quarkus.opentelemetry.runtime.exporter.otlp.tracing.VertxGrpcSpanExporter;
import io.quarkus.opentelemetry.runtime.exporter.otlp.tracing.VertxHttpSpanExporter;
import io.quarkus.runtime.annotations.Recorder;
Expand All @@ -68,23 +72,24 @@ public class OTelExporterRecorder {
public static final String BASE2EXPONENTIAL_AGGREGATION_NAME = AggregationUtil
.aggregationName(Aggregation.base2ExponentialBucketHistogram());

public Function<SyntheticCreationalContext<LateBoundBatchSpanProcessor>, LateBoundBatchSpanProcessor> batchSpanProcessorForOtlp(
public Function<SyntheticCreationalContext<LateBoundSpanProcessor>, LateBoundSpanProcessor> spanProcessorForOtlp(
OTelBuildConfig oTelBuildConfig,
OTelRuntimeConfig otelRuntimeConfig,
OtlpExporterRuntimeConfig exporterRuntimeConfig,
Supplier<Vertx> vertx) {
URI baseUri = getTracesUri(exporterRuntimeConfig); // do the creation and validation here in order to preserve backward compatibility
return new Function<>() {
@Override
public LateBoundBatchSpanProcessor apply(
SyntheticCreationalContext<LateBoundBatchSpanProcessor> context) {
public LateBoundSpanProcessor apply(
SyntheticCreationalContext<LateBoundSpanProcessor> context) {
if (otelRuntimeConfig.sdkDisabled() || baseUri == null) {
return RemoveableLateBoundBatchSpanProcessor.INSTANCE;
return RemoveableLateBoundSpanProcessor.INSTANCE;
}
// Only create the OtlpGrpcSpanExporter if an endpoint was set in runtime config and was properly validated at startup
Instance<SpanExporter> spanExporters = context.getInjectedReference(new TypeLiteral<>() {
});
if (!spanExporters.isUnsatisfied()) {
return RemoveableLateBoundBatchSpanProcessor.INSTANCE;
return RemoveableLateBoundSpanProcessor.INSTANCE;
}

try {
Expand All @@ -93,15 +98,21 @@ public LateBoundBatchSpanProcessor apply(
var spanExporter = createSpanExporter(exporterRuntimeConfig, vertx.get(), baseUri,
tlsConfigurationRegistry);

BatchSpanProcessorBuilder processorBuilder = BatchSpanProcessor.builder(spanExporter);
if (oTelBuildConfig.simple()) {
SimpleSpanProcessorBuilder processorBuilder = SimpleSpanProcessor.builder(spanExporter);
return new LateBoundSpanProcessor(processorBuilder.build());
} else {
BatchSpanProcessorBuilder processorBuilder = BatchSpanProcessor.builder(spanExporter);

processorBuilder.setScheduleDelay(otelRuntimeConfig.bsp().scheduleDelay());
processorBuilder.setMaxQueueSize(otelRuntimeConfig.bsp().maxQueueSize());
processorBuilder.setMaxExportBatchSize(otelRuntimeConfig.bsp().maxExportBatchSize());
processorBuilder.setExporterTimeout(otelRuntimeConfig.bsp().exportTimeout());
// processorBuilder.setMeterProvider() // TODO add meter provider to span processor.
BatchSpanProcessorConfig bspc = otelRuntimeConfig.bsp();
processorBuilder.setScheduleDelay(bspc.scheduleDelay());
processorBuilder.setMaxQueueSize(bspc.maxQueueSize());
processorBuilder.setMaxExportBatchSize(bspc.maxExportBatchSize());
processorBuilder.setExporterTimeout(bspc.exportTimeout());
// processorBuilder.setMeterProvider() // TODO add meter provider to span processor.

return new LateBoundBatchSpanProcessor(processorBuilder.build());
return new LateBoundSpanProcessor(processorBuilder.build());
}
} catch (IllegalArgumentException iae) {
throw new IllegalStateException("Unable to install OTLP Exporter", iae);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,19 @@
import io.opentelemetry.sdk.trace.ReadWriteSpan;
import io.opentelemetry.sdk.trace.ReadableSpan;
import io.opentelemetry.sdk.trace.SpanProcessor;
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;

/**
* Class to facilitate a delay in when the worker thread inside {@link BatchSpanProcessor}
* Class to facilitate a delay in when the worker thread inside {@link SpanProcessor}
* is started, enabling Quarkus to instantiate a {@link io.opentelemetry.api.trace.TracerProvider}
* during static initialization and set a {@link BatchSpanProcessor} delegate during runtime initialization.
* during static initialization and set a {@link SpanProcessor} delegate during runtime initialization.
*/
public class LateBoundBatchSpanProcessor implements SpanProcessor {
private static final Logger log = Logger.getLogger(LateBoundBatchSpanProcessor.class);
public class LateBoundSpanProcessor implements SpanProcessor {
private static final Logger log = Logger.getLogger(LateBoundSpanProcessor.class);

private boolean warningLogged = false;
private BatchSpanProcessor delegate;
private SpanProcessor delegate;

public LateBoundBatchSpanProcessor(BatchSpanProcessor delegate) {
public LateBoundSpanProcessor(SpanProcessor delegate) {
this.delegate = delegate;
}

Expand Down Expand Up @@ -104,7 +103,7 @@ private void resetDelegate() {
*/
private void logDelegateNotFound() {
if (!warningLogged) {
log.warn("No BatchSpanProcessor delegate specified, no action taken.");
log.warn("No SpanProcessor delegate specified, no action taken.");
warningLogged = true;
}
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.quarkus.opentelemetry.runtime.exporter.otlp.tracing;

import io.quarkus.opentelemetry.runtime.AutoConfiguredOpenTelemetrySdkBuilderCustomizer.TracerProviderCustomizer;

/**
* The only point in having this class is to allow {@link TracerProviderCustomizer}
* to easily ignore the configured {@link LateBoundSpanProcessor}.
*/
public final class RemoveableLateBoundSpanProcessor extends LateBoundSpanProcessor {

public static final RemoveableLateBoundSpanProcessor INSTANCE = new RemoveableLateBoundSpanProcessor();

private RemoveableLateBoundSpanProcessor() {
super(null);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package io.quarkus.it.opentelemetry.vertx.exporter;

import java.util.Map;

import io.quarkus.test.junit.QuarkusTestProfile;

public final class SimpleProfile implements QuarkusTestProfile {
@Override
public Map<String, String> getConfigOverrides() {
return Map.of("quarkus.otel.simple", "true");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.quarkus.it.opentelemetry.vertx.exporter.grpc;

import io.quarkus.it.opentelemetry.vertx.exporter.AbstractExporterTest;
import io.quarkus.it.opentelemetry.vertx.exporter.OtelCollectorLifecycleManager;
import io.quarkus.it.opentelemetry.vertx.exporter.SimpleProfile;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.TestProfile;

@QuarkusTest
@QuarkusTestResource(value = OtelCollectorLifecycleManager.class, restrictToAnnotatedClass = true)
@TestProfile(SimpleProfile.class)
public class SimpleGrpcNoTLSNoCompressionTest extends AbstractExporterTest {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.quarkus.it.opentelemetry.vertx.exporter.grpc;

import io.quarkus.it.opentelemetry.vertx.exporter.AbstractExporterTest;
import io.quarkus.it.opentelemetry.vertx.exporter.OtelCollectorLifecycleManager;
import io.quarkus.it.opentelemetry.vertx.exporter.SimpleProfile;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.common.ResourceArg;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.TestProfile;

@QuarkusTest
@QuarkusTestResource(value = OtelCollectorLifecycleManager.class, initArgs = @ResourceArg(name = "enableCompression", value = "true"), restrictToAnnotatedClass = true)
@TestProfile(SimpleProfile.class)
public class SimpleGrpcNoTLSWithCompressionTest extends AbstractExporterTest {

}
Loading

0 comments on commit 75b098a

Please sign in to comment.