Skip to content

Commit

Permalink
Added Transport action sampler, which will sample based on different …
Browse files Browse the repository at this point in the history
…probability for all actions. Also added setting to define order of samplers

Signed-off-by: Dev Agarwal <[email protected]>
  • Loading branch information
devagarwal1803 committed Feb 1, 2024
1 parent 8ff9fe3 commit 481bd69
Show file tree
Hide file tree
Showing 10 changed files with 221 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,18 @@
import static org.opensearch.telemetry.tracing.AttributeNames.TRANSPORT_ACTION;

/**
* TransportActionSampler sampler samples request with action based on defined probability
* ProbabilisticTransportActionSampler sampler samples request with action based on defined probability
*/
public class TransportActionSampler implements Sampler {
public class ProbabilisticTransportActionSampler implements Sampler {
private Sampler actionSampler;
private final TelemetrySettings telemetrySettings;
private double actionSamplingRatio;

/**
* Creates TransportActionSampler sampler
* Creates ProbabilisticTransportActionSampler sampler
* @param telemetrySettings TelemetrySettings
*/
public TransportActionSampler(TelemetrySettings telemetrySettings) {
public ProbabilisticTransportActionSampler(TelemetrySettings telemetrySettings) {
this.telemetrySettings = Objects.requireNonNull(telemetrySettings);
this.actionSamplingRatio = telemetrySettings.getActionSamplingProbability();
this.actionSampler = Sampler.traceIdRatioBased(actionSamplingRatio);
Expand All @@ -53,7 +53,7 @@ public SamplingResult shouldSample(
final String action = attributes.get(AttributeKey.stringKey(TRANSPORT_ACTION));
if (action != null) {
double newActionSamplingRatio = telemetrySettings.getActionSamplingProbability();
if (isSamplingRatioChanged(newActionSamplingRatio)) {
if (isActionSamplingRatioChanged(newActionSamplingRatio)) {
synchronized (this) {
this.actionSamplingRatio = newActionSamplingRatio;
actionSampler = Sampler.traceIdRatioBased(actionSamplingRatio);
Expand All @@ -64,7 +64,7 @@ public SamplingResult shouldSample(
return SamplingResult.drop();
}

private boolean isSamplingRatioChanged(double newSamplingRatio) {
private boolean isActionSamplingRatioChanged(double newSamplingRatio) {
return Double.compare(this.actionSamplingRatio, newSamplingRatio) != 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@

package org.opensearch.telemetry.tracing.sampler;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.telemetry.TelemetrySettings;

import java.lang.reflect.Constructor;
import java.util.HashMap;
import java.util.List;

import io.opentelemetry.api.common.AttributeKey;
Expand All @@ -21,23 +25,40 @@
import io.opentelemetry.sdk.trace.samplers.SamplingResult;

import static org.opensearch.telemetry.tracing.AttributeNames.TRACE;
import static org.opensearch.telemetry.tracing.AttributeNames.TRANSPORT_ACTION;

/**
* HeadBased sampler
* RequestSampler based on HeadBased sampler
*/
public class RequestSampler implements Sampler {
private final Sampler defaultSampler;

private final Sampler actionSampler;
private final HashMap<String, Sampler> samplers;
private final TelemetrySettings telemetrySettings;
protected Logger logger;

/**
* Creates action sampler which samples request for all actions based on defined probability
* Creates request sampler which applies based on all applicable sampler
* @param telemetrySettings TelemetrySettings
*/
public RequestSampler(TelemetrySettings telemetrySettings) {
this.defaultSampler = new ProbabilisticSampler(telemetrySettings);
this.actionSampler = new TransportActionSampler(telemetrySettings);
this.telemetrySettings = telemetrySettings;
this.samplers = new HashMap<>();
this.logger = LogManager.getLogger(getClass());
this.SamplerInit();
}

/**
* Initialises all samplers based on telemetry setting
*/
private void SamplerInit() {
for (String samplerName : this.telemetrySettings.getSamplingOrder()) {
try {
Class<?> samplerClass = Class.forName(samplerName);
Constructor<?> constructor = samplerClass.getConstructor(TelemetrySettings.class);
Sampler sampler = (Sampler) constructor.newInstance(telemetrySettings);
this.samplers.put(samplerName, sampler);
} catch (Exception e) {
logger.error("error while creatin class object: ", e);
}
}
}

@Override
Expand All @@ -50,17 +71,20 @@ public SamplingResult shouldSample(
List<LinkData> parentLinks
) {
final String trace = attributes.get(AttributeKey.stringKey(TRACE));
final String action = attributes.get(AttributeKey.stringKey(TRANSPORT_ACTION));

// Determine the sampling decision based on the availability of trace information and action,
// either recording and sampling, delegating to action sampler, or falling back to default sampler.
List<String> samplers = telemetrySettings.getSamplingOrder();
if (trace != null) {
return (Boolean.parseBoolean(trace) == true) ? SamplingResult.recordAndSample() : SamplingResult.drop();
} else if (action != null) {
return actionSampler.shouldSample(parentContext, traceId, name, spanKind, attributes, parentLinks);
} else {
return defaultSampler.shouldSample(parentContext, traceId, name, spanKind, attributes, parentLinks);
}

for (String samplerName : samplers) {
SamplingResult result = this.samplers.get(samplerName)
.shouldSample(parentContext, traceId, name, spanKind, attributes, parentLinks);
if (result == SamplingResult.recordAndSample()) {
return result;
}
}
return SamplingResult.drop();
}

@Override
Expand All @@ -72,4 +96,8 @@ public String getDescription() {
public String toString() {
return getDescription();
}

public HashMap<String, Sampler> getSamplers() {
return samplers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import static org.opensearch.telemetry.TelemetrySettings.TRACER_ENABLED_SETTING;
import static org.opensearch.telemetry.TelemetrySettings.TRACER_SAMPLER_ACTION_PROBABILITY;
import static org.opensearch.telemetry.TelemetrySettings.TRACER_SAMPLER_PROBABILITY;
import static org.opensearch.telemetry.TelemetrySettings.TRACER_SPAN_SAMPLER_CLASSES;

public class OTelTelemetryPluginTests extends OpenSearchTestCase {

Expand All @@ -54,7 +55,15 @@ public void setup() {
telemetry = oTelTelemetryPlugin.getTelemetry(
new TelemetrySettings(
Settings.EMPTY,
new ClusterSettings(settings, Set.of(TRACER_ENABLED_SETTING, TRACER_SAMPLER_PROBABILITY, TRACER_SAMPLER_ACTION_PROBABILITY))
new ClusterSettings(
settings,
Set.of(
TRACER_ENABLED_SETTING,
TRACER_SAMPLER_PROBABILITY,
TRACER_SAMPLER_ACTION_PROBABILITY,
TRACER_SPAN_SAMPLER_CLASSES
)
)
)
);
tracingTelemetry = telemetry.get().getTracingTelemetry();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.opensearch.telemetry.TelemetrySettings.TRACER_ENABLED_SETTING;
import static org.opensearch.telemetry.TelemetrySettings.TRACER_SAMPLER_ACTION_PROBABILITY;
import static org.opensearch.telemetry.TelemetrySettings.TRACER_SAMPLER_PROBABILITY;
import static org.opensearch.telemetry.TelemetrySettings.TRACER_SPAN_SAMPLER_CLASSES;
import static org.mockito.Mockito.mock;

public class ProbabilisticSamplerTests extends OpenSearchTestCase {
Expand All @@ -36,7 +37,10 @@ public void testDefaultGetSampler() {
Settings settings = Settings.builder().put(TRACER_EXPORTER_DELAY_SETTING.getKey(), "1s").build();
TelemetrySettings telemetrySettings = new TelemetrySettings(
Settings.EMPTY,
new ClusterSettings(settings, Set.of(TRACER_SAMPLER_PROBABILITY, TRACER_ENABLED_SETTING, TRACER_SAMPLER_ACTION_PROBABILITY))
new ClusterSettings(
settings,
Set.of(TRACER_SAMPLER_PROBABILITY, TRACER_ENABLED_SETTING, TRACER_SAMPLER_ACTION_PROBABILITY, TRACER_SPAN_SAMPLER_CLASSES)
)
);

// Probabilistic Sampler
Expand All @@ -49,7 +53,10 @@ public void testGetSamplerWithUpdatedSamplingRatio() {
Settings settings = Settings.builder().put(TRACER_EXPORTER_DELAY_SETTING.getKey(), "1s").build();
TelemetrySettings telemetrySettings = new TelemetrySettings(
Settings.EMPTY,
new ClusterSettings(settings, Set.of(TRACER_SAMPLER_PROBABILITY, TRACER_ENABLED_SETTING, TRACER_SAMPLER_ACTION_PROBABILITY))
new ClusterSettings(
settings,
Set.of(TRACER_SAMPLER_PROBABILITY, TRACER_ENABLED_SETTING, TRACER_SAMPLER_ACTION_PROBABILITY, TRACER_SPAN_SAMPLER_CLASSES)
)
);

// Probabilistic Sampler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,43 +21,31 @@
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.trace.samplers.SamplingResult;

import static org.opensearch.telemetry.OTelTelemetrySettings.TRACER_EXPORTER_DELAY_SETTING;
import static org.opensearch.telemetry.TelemetrySettings.TRACER_ENABLED_SETTING;
import static org.opensearch.telemetry.TelemetrySettings.TRACER_SAMPLER_ACTION_PROBABILITY;
import static org.opensearch.telemetry.TelemetrySettings.TRACER_SAMPLER_PROBABILITY;
import static org.opensearch.telemetry.TelemetrySettings.TRACER_SPAN_SAMPLER_CLASSES;
import static org.opensearch.telemetry.tracing.AttributeNames.TRANSPORT_ACTION;
import static org.mockito.Mockito.mock;

public class TransportActionSamplerTests extends OpenSearchTestCase {
public class ProbabilisticTransportActionSamplerTests extends OpenSearchTestCase {

public void testGetSamplerWithDefaultActionSamplingRatio() {
Settings settings = Settings.builder().put(TRACER_EXPORTER_DELAY_SETTING.getKey(), "1s").build();
TelemetrySettings telemetrySettings = new TelemetrySettings(
Settings.EMPTY,
new ClusterSettings(settings, Set.of(TRACER_SAMPLER_PROBABILITY, TRACER_ENABLED_SETTING, TRACER_SAMPLER_ACTION_PROBABILITY))
);

// TransportActionSampler
TransportActionSampler transportActionSampler = new TransportActionSampler(telemetrySettings);

// Validates if default sampling of 1%
assertEquals(0.001d, transportActionSampler.getSamplingRatio(), 0.0d);
}

public void testGetSamplerWithUpdatedActionSamplingRatio() {
public void testGetSamplerWithUpdatingActionSamplingRatio() {
ClusterSettings clusterSettings = new ClusterSettings(
Settings.EMPTY,
Set.of(TRACER_SAMPLER_PROBABILITY, TRACER_ENABLED_SETTING, TRACER_SAMPLER_ACTION_PROBABILITY)
Set.of(TRACER_SAMPLER_PROBABILITY, TRACER_ENABLED_SETTING, TRACER_SAMPLER_ACTION_PROBABILITY, TRACER_SPAN_SAMPLER_CLASSES)
);

TelemetrySettings telemetrySettings = new TelemetrySettings(Settings.EMPTY, clusterSettings);

// TransportActionSampler
TransportActionSampler transportActionSampler = new TransportActionSampler(telemetrySettings);
ProbabilisticTransportActionSampler probabilisticTransportActionSampler = new ProbabilisticTransportActionSampler(
telemetrySettings
);
clusterSettings.applySettings(Settings.builder().put("telemetry.tracer.action.sampler.probability", "1.0").build());

// Need to call shouldSample() to update the value of samplingRatio
SamplingResult result = transportActionSampler.shouldSample(
SamplingResult result = probabilisticTransportActionSampler.shouldSample(
mock(Context.class),
"00000000000000000000000000000000",
"spanName",
Expand All @@ -67,18 +55,19 @@ public void testGetSamplerWithUpdatedActionSamplingRatio() {
);
// Verify that TransportActionSampler returned SamplingResult.recordAndSample() as all actions will be sampled
assertEquals(SamplingResult.recordAndSample(), result);
assertEquals(1.0, transportActionSampler.getSamplingRatio(), 0.000d);
assertEquals(1.0, probabilisticTransportActionSampler.getSamplingRatio(), 0.000d);
assertEquals(SamplingResult.recordAndSample(), result);

clusterSettings.applySettings(Settings.builder().put("telemetry.tracer.action.sampler.probability", "0.5").build());
result = transportActionSampler.shouldSample(
clusterSettings.applySettings(Settings.builder().put("telemetry.tracer.action.sampler.probability", "0.0").build());
result = probabilisticTransportActionSampler.shouldSample(
mock(Context.class),
"00000000000000000000000000000000",
"spanName",
SpanKind.INTERNAL,
Attributes.builder().put(TRANSPORT_ACTION, "dummy_action").build(),
Collections.emptyList()
);
assertEquals(0.5, transportActionSampler.getSamplingRatio(), 0.000d);
assertEquals(0.0, probabilisticTransportActionSampler.getSamplingRatio(), 0.000d);
assertEquals(SamplingResult.drop(), result);
}

}
Loading

0 comments on commit 481bd69

Please sign in to comment.