Skip to content

Commit

Permalink
Added custom sampler support based on action in request (#10136)
Browse files Browse the repository at this point in the history
* Added custom sampler support based on action in request

Signed-off-by: Dev Agarwal <[email protected]>

* UT Fix

Signed-off-by: Dev Agarwal <[email protected]>

* Added Transport action sampler, which will sample based on different probability for all actions

Signed-off-by: Dev Agarwal <[email protected]>

* Added Transport action sampler, which will sample based on different probability for all actions. Also added setting to define order of samplers

Signed-off-by: Dev Agarwal <[email protected]>

* Added missing java-doc

Signed-off-by: Dev Agarwal <[email protected]>

* Moving sampler class settings to OtelTelemetry setting

Signed-off-by: Dev Agarwal <[email protected]>

* Minor refactor

Signed-off-by: Dev Agarwal <[email protected]>

* Refactored to use chain of samplers

Signed-off-by: Dev Agarwal <[email protected]>

* Addressed comments

Signed-off-by: Dev Agarwal <[email protected]>

* Addressed comments to move action_probability to OtelTelemetrySettings

Signed-off-by: Dev Agarwal <[email protected]>

* Updated eror msg returned when Sampler class is not found

Signed-off-by: Dev Agarwal <[email protected]>

* Added UT for OTelSamplerFactory

Signed-off-by: Dev Agarwal <[email protected]>

* minor refactor

Signed-off-by: Dev Agarwal <[email protected]>

* minor refactor

Signed-off-by: Dev Agarwal <[email protected]>

* spotless check

Signed-off-by: Dev Agarwal <[email protected]>

* Updating OtelTelemetryPlugin.get() method

Signed-off-by: Dev Agarwal <[email protected]>

* Addressed comments

Signed-off-by: Dev Agarwal <[email protected]>

* minor refactor

Signed-off-by: Dev Agarwal <[email protected]>

* addressed comments

Signed-off-by: Dev Agarwal <[email protected]>

* Updated transport action sampler

Signed-off-by: Dev Agarwal <[email protected]>

* Empty-Commit

Signed-off-by: Dev Agarwal <[email protected]>

* Empty-Commit

Signed-off-by: Dev Agarwal <[email protected]>

---------

Signed-off-by: Dev Agarwal <[email protected]>
(cherry picked from commit 445bf1f)
  • Loading branch information
devagarwal1803 committed Feb 7, 2024
1 parent ac37b38 commit 7bde40a
Show file tree
Hide file tree
Showing 15 changed files with 547 additions and 73 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Added Support for dynamically adding SearchRequestOperationsListeners with SearchRequestOperationsCompositeListenerFactory ([#11526](https://github.com/opensearch-project/OpenSearch/pull/11526))
- [Query Insights] Query Insights Framework which currently supports retrieving the most time-consuming queries within the last configured time window ([#11903](https://github.com/opensearch-project/OpenSearch/pull/11903))
- [Query Insights] Implement Top N Queries feature to collect and gather information about high latency queries in a window ([#11904](https://github.com/opensearch-project/OpenSearch/pull/11904))
- Add override support for sampling based on action ([#9621](https://github.com/opensearch-project/OpenSearch/issues/9621))
- Added custom sampler support based on transport action in request ([#9621](https://github.com/opensearch-project/OpenSearch/issues/9621))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ public List<Setting<?>> getSettings() {
OTelTelemetrySettings.TRACER_EXPORTER_DELAY_SETTING,
OTelTelemetrySettings.TRACER_EXPORTER_MAX_QUEUE_SIZE_SETTING,
OTelTelemetrySettings.OTEL_TRACER_SPAN_EXPORTER_CLASS_SETTING,
OTelTelemetrySettings.OTEL_METRICS_EXPORTER_CLASS_SETTING
OTelTelemetrySettings.OTEL_TRACER_SPAN_SAMPLER_CLASS_SETTINGS,
OTelTelemetrySettings.OTEL_METRICS_EXPORTER_CLASS_SETTING,
OTelTelemetrySettings.TRACER_SAMPLER_ACTION_PROBABILITY
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,21 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.telemetry.metrics.exporter.OTelMetricsExporterFactory;
import org.opensearch.telemetry.tracing.exporter.OTelSpanExporterFactory;
import org.opensearch.telemetry.tracing.sampler.OTelSamplerFactory;
import org.opensearch.telemetry.tracing.sampler.ProbabilisticSampler;
import org.opensearch.telemetry.tracing.sampler.ProbabilisticTransportActionSampler;

import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.List;

import io.opentelemetry.exporter.logging.LoggingMetricExporter;
import io.opentelemetry.exporter.logging.LoggingSpanExporter;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import io.opentelemetry.sdk.trace.samplers.Sampler;

/**
* OTel specific telemetry settings.
Expand Down Expand Up @@ -110,4 +116,40 @@ private OTelTelemetrySettings() {}
Setting.Property.NodeScope,
Setting.Property.Final
);

/**
* Samplers orders setting.
*/
@SuppressWarnings("unchecked")
public static final Setting<List<Class<Sampler>>> OTEL_TRACER_SPAN_SAMPLER_CLASS_SETTINGS = Setting.listSetting(
"telemetry.otel.tracer.span.sampler.classes",
Arrays.asList(ProbabilisticTransportActionSampler.class.getName(), ProbabilisticSampler.class.getName()),
sampler -> {
// Check we ourselves are not being called by unprivileged code.
SpecialPermission.check();
try {
return AccessController.doPrivileged((PrivilegedExceptionAction<Class<Sampler>>) () -> {
final ClassLoader loader = OTelSamplerFactory.class.getClassLoader();
return (Class<Sampler>) loader.loadClass(sampler);
});
} catch (PrivilegedActionException ex) {
throw new IllegalStateException("Unable to load sampler class: " + sampler, ex.getCause());

Check warning on line 136 in plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/OTelTelemetrySettings.java

View check run for this annotation

Codecov / codecov/patch

plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/OTelTelemetrySettings.java#L135-L136

Added lines #L135 - L136 were not covered by tests
}
},
Setting.Property.NodeScope,
Setting.Property.Final
);

/**
* Probability of action based sampler
*/
public static final Setting<Double> TRACER_SAMPLER_ACTION_PROBABILITY = Setting.doubleSetting(
"telemetry.tracer.action.sampler.probability",
0.001d,
0.000d,
1.00d,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import org.opensearch.telemetry.TelemetrySettings;
import org.opensearch.telemetry.metrics.exporter.OTelMetricsExporterFactory;
import org.opensearch.telemetry.tracing.exporter.OTelSpanExporterFactory;
import org.opensearch.telemetry.tracing.sampler.ProbabilisticSampler;
import org.opensearch.telemetry.tracing.sampler.OTelSamplerFactory;
import org.opensearch.telemetry.tracing.sampler.RequestSampler;

import java.security.AccessController;
Expand Down Expand Up @@ -60,7 +60,7 @@ public static OpenTelemetrySdk get(TelemetrySettings telemetrySettings, Settings
settings,
OTelSpanExporterFactory.create(settings),
ContextPropagators.create(W3CTraceContextPropagator.getInstance()),
Sampler.parentBased(new RequestSampler(new ProbabilisticSampler(telemetrySettings)))
Sampler.parentBased(new RequestSampler(OTelSamplerFactory.create(telemetrySettings, settings)))
)
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.telemetry.tracing.sampler;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.SpecialPermission;
import org.opensearch.common.settings.Settings;
import org.opensearch.telemetry.OTelTelemetrySettings;
import org.opensearch.telemetry.TelemetrySettings;

import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.security.AccessController;
import java.security.PrivilegedExceptionAction;
import java.util.List;
import java.util.ListIterator;

import io.opentelemetry.sdk.trace.samplers.Sampler;

/**
* Factory class to create the instance of OTelSampler
*/
public class OTelSamplerFactory {

/**
* Logger instance for logging messages related to the OTelSamplerFactory.
*/
private static final Logger logger = LogManager.getLogger(OTelSamplerFactory.class);

/**
* Base constructor.
*/
private OTelSamplerFactory() {

}

/**
* Creates the {@link Sampler} instances based on the TRACER_SPAN_SAMPLER_CLASSES value.
*
* @param telemetrySettings TelemetrySettings.
* @param settings the settings
* @return list of samplers.
*/
public static Sampler create(TelemetrySettings telemetrySettings, Settings settings) {
List<Class<Sampler>> samplersNameList = OTelTelemetrySettings.OTEL_TRACER_SPAN_SAMPLER_CLASS_SETTINGS.get(settings);
ListIterator<Class<Sampler>> li = samplersNameList.listIterator(samplersNameList.size());

Sampler fallbackSampler = null;

// Iterating samplers list in reverse order to create chain of sampler
while (li.hasPrevious()) {
Class<Sampler> samplerName = li.previous();
fallbackSampler = instantiateSampler(samplerName, telemetrySettings, settings, fallbackSampler);
}

return fallbackSampler;
}

private static Sampler instantiateSampler(
Class<Sampler> samplerClassName,
TelemetrySettings telemetrySettings,
Settings settings,
Sampler fallbackSampler
) {
try {
// Check we ourselves are not being called by unprivileged code.
SpecialPermission.check();

return AccessController.doPrivileged((PrivilegedExceptionAction<Sampler>) () -> {
try {
// Define the method type which receives TelemetrySettings & Sampler as arguments
MethodType methodType = MethodType.methodType(Sampler.class, TelemetrySettings.class, Settings.class, Sampler.class);

return (Sampler) MethodHandles.publicLookup()
.findStatic(samplerClassName, "create", methodType)
.invokeExact(telemetrySettings, settings, fallbackSampler);
} catch (Throwable e) {

Check warning on line 84 in plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/sampler/OTelSamplerFactory.java

View check run for this annotation

Codecov / codecov/patch

plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/sampler/OTelSamplerFactory.java#L84

Added line #L84 was not covered by tests
if (e.getCause() instanceof NoSuchMethodException) {
throw new IllegalStateException("No create method exist in [" + samplerClassName + "]", e.getCause());

Check warning on line 86 in plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/sampler/OTelSamplerFactory.java

View check run for this annotation

Codecov / codecov/patch

plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/sampler/OTelSamplerFactory.java#L86

Added line #L86 was not covered by tests
} else {
throw new IllegalStateException("Sampler instantiation failed for class [" + samplerClassName + "]", e.getCause());

Check warning on line 88 in plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/sampler/OTelSamplerFactory.java

View check run for this annotation

Codecov / codecov/patch

plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/sampler/OTelSamplerFactory.java#L88

Added line #L88 was not covered by tests
}
}
});
} catch (Exception e) {
throw new IllegalStateException("Sampler instantiation failed for class [" + samplerClassName + "]", e.getCause());

Check warning on line 93 in plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/sampler/OTelSamplerFactory.java

View check run for this annotation

Codecov / codecov/patch

plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/sampler/OTelSamplerFactory.java#L92-L93

Added lines #L92 - L93 were not covered by tests
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.telemetry.tracing.sampler;

import org.opensearch.common.settings.Settings;
import org.opensearch.telemetry.TelemetrySettings;

import java.util.List;
Expand All @@ -18,36 +19,43 @@
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.trace.data.LinkData;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import io.opentelemetry.sdk.trace.samplers.SamplingDecision;
import io.opentelemetry.sdk.trace.samplers.SamplingResult;

/**
* ProbabilisticSampler implements a head-based sampling strategy based on provided settings.
* ProbabilisticSampler implements a probability sampling strategy based on configured sampling ratio.
*/
public class ProbabilisticSampler implements Sampler {
private Sampler defaultSampler;
private final TelemetrySettings telemetrySettings;
private final Settings settings;
private final Sampler fallbackSampler;

private double samplingRatio;

/**
* Constructor
*
* @param telemetrySettings Telemetry settings.
*/
public ProbabilisticSampler(TelemetrySettings telemetrySettings) {
private ProbabilisticSampler(TelemetrySettings telemetrySettings, Settings settings, Sampler fallbackSampler) {
this.telemetrySettings = Objects.requireNonNull(telemetrySettings);
this.settings = Objects.requireNonNull(settings);
this.samplingRatio = telemetrySettings.getSamplingProbability();
this.defaultSampler = Sampler.traceIdRatioBased(samplingRatio);
this.fallbackSampler = fallbackSampler;
}

Sampler getSampler() {
double newSamplingRatio = telemetrySettings.getSamplingProbability();
if (isSamplingRatioChanged(newSamplingRatio)) {
synchronized (this) {
this.samplingRatio = newSamplingRatio;
defaultSampler = Sampler.traceIdRatioBased(samplingRatio);
}
}
return defaultSampler;
/**
* Create probabilistic sampler.
*
* @param telemetrySettings the telemetry settings
* @param settings the settings
* @param fallbackSampler the fallback sampler
* @return the probabilistic sampler
*/
public static Sampler create(TelemetrySettings telemetrySettings, Settings settings, Sampler fallbackSampler) {
return new ProbabilisticSampler(telemetrySettings, settings, fallbackSampler);
}

private boolean isSamplingRatioChanged(double newSamplingRatio) {
Expand All @@ -67,7 +75,19 @@ public SamplingResult shouldSample(
Attributes attributes,
List<LinkData> parentLinks
) {
return getSampler().shouldSample(parentContext, traceId, name, spanKind, attributes, parentLinks);
double newSamplingRatio = telemetrySettings.getSamplingProbability();
if (isSamplingRatioChanged(newSamplingRatio)) {
synchronized (this) {
this.samplingRatio = newSamplingRatio;
defaultSampler = Sampler.traceIdRatioBased(samplingRatio);
}
}
final SamplingResult result = defaultSampler.shouldSample(parentContext, traceId, name, spanKind, attributes, parentLinks);
if (result.getDecision() != SamplingDecision.DROP && fallbackSampler != null) {
return fallbackSampler.shouldSample(parentContext, traceId, name, spanKind, attributes, parentLinks);

Check warning on line 87 in plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/sampler/ProbabilisticSampler.java

View check run for this annotation

Codecov / codecov/patch

plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/sampler/ProbabilisticSampler.java#L87

Added line #L87 was not covered by tests
} else {
return result;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.telemetry.tracing.sampler;

import org.opensearch.common.settings.Settings;
import org.opensearch.telemetry.OTelTelemetrySettings;
import org.opensearch.telemetry.TelemetrySettings;

import java.util.List;
import java.util.Objects;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.trace.data.LinkData;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import io.opentelemetry.sdk.trace.samplers.SamplingDecision;
import io.opentelemetry.sdk.trace.samplers.SamplingResult;

import static org.opensearch.telemetry.tracing.AttributeNames.TRANSPORT_ACTION;

/**
* ProbabilisticTransportActionSampler sampler samples request with action based on defined probability
*/
public class ProbabilisticTransportActionSampler implements Sampler {

private final Sampler fallbackSampler;
private Sampler actionSampler;
private final TelemetrySettings telemetrySettings;
private final Settings settings;
private double actionSamplingRatio;

/**
* Creates ProbabilisticTransportActionSampler sampler
* @param telemetrySettings TelemetrySettings
*/
private ProbabilisticTransportActionSampler(TelemetrySettings telemetrySettings, Settings settings, Sampler fallbackSampler) {
this.telemetrySettings = Objects.requireNonNull(telemetrySettings);
this.settings = Objects.requireNonNull(settings);
this.actionSamplingRatio = OTelTelemetrySettings.TRACER_SAMPLER_ACTION_PROBABILITY.get(settings);
this.actionSampler = Sampler.traceIdRatioBased(actionSamplingRatio);
this.fallbackSampler = fallbackSampler;
}

/**
* Create probabilistic transport action sampler.
*
* @param telemetrySettings the telemetry settings
* @param settings the settings
* @param fallbackSampler the fallback sampler
* @return the probabilistic transport action sampler
*/
public static Sampler create(TelemetrySettings telemetrySettings, Settings settings, Sampler fallbackSampler) {
return new ProbabilisticTransportActionSampler(telemetrySettings, settings, fallbackSampler);
}

@Override
public SamplingResult shouldSample(
Context parentContext,
String traceId,
String name,
SpanKind spanKind,
Attributes attributes,
List<LinkData> parentLinks
) {
final String action = attributes.get(AttributeKey.stringKey(TRANSPORT_ACTION));
if (action != null) {
final SamplingResult result = actionSampler.shouldSample(parentContext, traceId, name, spanKind, attributes, parentLinks);
if (result.getDecision() != SamplingDecision.DROP && fallbackSampler != null) {
return fallbackSampler.shouldSample(parentContext, traceId, name, spanKind, attributes, parentLinks);
}
return result;
}
if (fallbackSampler != null) return fallbackSampler.shouldSample(parentContext, traceId, name, spanKind, attributes, parentLinks);

return SamplingResult.drop();

Check warning on line 83 in plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/sampler/ProbabilisticTransportActionSampler.java

View check run for this annotation

Codecov / codecov/patch

plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/sampler/ProbabilisticTransportActionSampler.java#L83

Added line #L83 was not covered by tests
}

double getSamplingRatio() {
return actionSamplingRatio;
}

@Override
public String getDescription() {
return "Transport Action Sampler";

Check warning on line 92 in plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/sampler/ProbabilisticTransportActionSampler.java

View check run for this annotation

Codecov / codecov/patch

plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/sampler/ProbabilisticTransportActionSampler.java#L92

Added line #L92 was not covered by tests
}

@Override
public String toString() {
return getDescription();

Check warning on line 97 in plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/sampler/ProbabilisticTransportActionSampler.java

View check run for this annotation

Codecov / codecov/patch

plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/sampler/ProbabilisticTransportActionSampler.java#L97

Added line #L97 was not covered by tests
}
}
Loading

0 comments on commit 7bde40a

Please sign in to comment.