Skip to content

Commit

Permalink
Added sampler based on Blanket Probabilistic Sampling rate and Overri…
Browse files Browse the repository at this point in the history
…de for on demand (opensearch-project#9522)

Signed-off-by: Dev Agarwal <[email protected]>
Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
devagarwal1803 authored and shiv0408 committed Apr 25, 2024
1 parent 2d10cc2 commit 25a2624
Show file tree
Hide file tree
Showing 13 changed files with 367 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Remote Store] Rate limiter integration for remote store uploads and downloads([#9448](https://github.com/opensearch-project/OpenSearch/pull/9448/))
- [Remote Store] Implicitly use replication type SEGMENT for remote store clusters ([#9264](https://github.com/opensearch-project/OpenSearch/pull/9264))
- Use non-concurrent path for sort request on timeseries index and field([#9562](https://github.com/opensearch-project/OpenSearch/pull/9562))
- Added sampler based on `Blanket Probabilistic Sampling rate` and `Override for on demand` ([#9621](https://github.com/opensearch-project/OpenSearch/issues/9621))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ public IntegrationTestOTelTelemetryPlugin(Settings settings) {
/**
* This method overrides getTelemetry() method in OTel plugin class, so we create only one instance of global OpenTelemetry
* resetForTest() will set OpenTelemetry to null again.
* @param settings cluster settings
* @param telemetrySettings telemetry settings
*/
public Optional<Telemetry> getTelemetry(TelemetrySettings settings) {
public Optional<Telemetry> getTelemetry(TelemetrySettings telemetrySettings) {
GlobalOpenTelemetry.resetForTest();
return super.getTelemetry(settings);
return super.getTelemetry(telemetrySettings);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,17 @@ public List<Setting<?>> getSettings() {
}

@Override
public Optional<Telemetry> getTelemetry(TelemetrySettings settings) {
return Optional.of(telemetry());
public Optional<Telemetry> getTelemetry(TelemetrySettings telemetrySettings) {
return Optional.of(telemetry(telemetrySettings));
}

@Override
public String getName() {
return OTEL_TRACER_NAME;
}

private Telemetry telemetry() {
return new OTelTelemetry(new OTelTracingTelemetry(OTelResourceProvider.get(settings)), new MetricsTelemetry() {
private Telemetry telemetry(TelemetrySettings telemetrySettings) {
return new OTelTelemetry(new OTelTracingTelemetry(OTelResourceProvider.get(telemetrySettings, settings)), new MetricsTelemetry() {
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
package org.opensearch.telemetry.tracing;

import org.opensearch.common.settings.Settings;
import org.opensearch.telemetry.TelemetrySettings;
import org.opensearch.telemetry.tracing.exporter.OTelSpanExporterFactory;
import org.opensearch.telemetry.tracing.sampler.ProbabilisticSampler;
import org.opensearch.telemetry.tracing.sampler.RequestSampler;

import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -37,15 +40,16 @@ private OTelResourceProvider() {}

/**
* Creates OpenTelemetry instance with default configuration
* @param telemetrySettings telemetry settings
* @param settings cluster settings
* @return OpenTelemetry instance
*/
public static OpenTelemetry get(Settings settings) {
public static OpenTelemetry get(TelemetrySettings telemetrySettings, Settings settings) {
return get(
settings,
OTelSpanExporterFactory.create(settings),
ContextPropagators.create(W3CTraceContextPropagator.getInstance()),
Sampler.alwaysOn()
Sampler.parentBased(new RequestSampler(new ProbabilisticSampler(telemetrySettings)))
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.telemetry.tracing.sampler;

import org.opensearch.telemetry.TelemetrySettings;

import java.util.List;
import java.util.Objects;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.trace.data.LinkData;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import io.opentelemetry.sdk.trace.samplers.SamplingResult;

/**
* ProbabilisticSampler implements a head-based sampling strategy based on provided settings.
*/
public class ProbabilisticSampler implements Sampler {
private Sampler defaultSampler;
private final TelemetrySettings telemetrySettings;
private double samplingRatio;

/**
* Constructor
*
* @param telemetrySettings Telemetry settings.
*/
public ProbabilisticSampler(TelemetrySettings telemetrySettings) {
this.telemetrySettings = Objects.requireNonNull(telemetrySettings);
this.samplingRatio = telemetrySettings.getTracerHeadSamplerSamplingRatio();
this.defaultSampler = Sampler.traceIdRatioBased(samplingRatio);
}

Sampler getSampler() {
double newSamplingRatio = telemetrySettings.getTracerHeadSamplerSamplingRatio();
if (isSamplingRatioChanged(newSamplingRatio)) {
synchronized (this) {
this.samplingRatio = newSamplingRatio;
defaultSampler = Sampler.traceIdRatioBased(samplingRatio);
}
}
return defaultSampler;
}

private boolean isSamplingRatioChanged(double newSamplingRatio) {
return Double.compare(this.samplingRatio, newSamplingRatio) != 0;
}

double getSamplingRatio() {
return samplingRatio;
}

@Override
public SamplingResult shouldSample(
Context parentContext,
String traceId,
String name,
SpanKind spanKind,
Attributes attributes,
List<LinkData> parentLinks
) {
return getSampler().shouldSample(parentContext, traceId, name, spanKind, attributes, parentLinks);
}

@Override
public String getDescription() {
return "Probabilistic Sampler";
}

@Override
public String toString() {
return getDescription();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.telemetry.tracing.sampler;

import java.util.List;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.trace.data.LinkData;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import io.opentelemetry.sdk.trace.samplers.SamplingResult;

/**
* HeadBased sampler
*/
public class RequestSampler implements Sampler {
private final Sampler defaultSampler;

// TODO: Pick value of TRACE from PR #9415.
private static final String TRACE = "trace";

/**
* Creates Head based sampler
* @param defaultSampler defaultSampler
*/
public RequestSampler(Sampler defaultSampler) {
this.defaultSampler = defaultSampler;
}

@Override
public SamplingResult shouldSample(
Context parentContext,
String traceId,
String name,
SpanKind spanKind,
Attributes attributes,
List<LinkData> parentLinks
) {

final String trace = attributes.get(AttributeKey.stringKey(TRACE));

if (trace != null) {
return (Boolean.parseBoolean(trace) == true) ? SamplingResult.recordAndSample() : SamplingResult.drop();
} else {
return defaultSampler.shouldSample(parentContext, traceId, name, spanKind, attributes, parentLinks);
}

}

@Override
public String getDescription() {
return "Request Sampler";
}

@Override
public String toString() {
return getDescription();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* This package contains classes needed for sampler.
*/
package org.opensearch.telemetry.tracing.sampler;
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import static org.opensearch.telemetry.OTelTelemetrySettings.TRACER_EXPORTER_BATCH_SIZE_SETTING;
import static org.opensearch.telemetry.OTelTelemetrySettings.TRACER_EXPORTER_DELAY_SETTING;
import static org.opensearch.telemetry.OTelTelemetrySettings.TRACER_EXPORTER_MAX_QUEUE_SIZE_SETTING;
import static org.opensearch.telemetry.TelemetrySettings.TRACER_ENABLED_SETTING;
import static org.opensearch.telemetry.TelemetrySettings.TRACER_SAMPLER_PROBABILITY;

public class OTelTelemetryPluginTests extends OpenSearchTestCase {

Expand All @@ -42,7 +44,9 @@ public void setup() {
// io.opentelemetry.sdk.OpenTelemetrySdk.close waits only for 10 seconds for shutdown to complete.
Settings settings = Settings.builder().put(TRACER_EXPORTER_DELAY_SETTING.getKey(), "1s").build();
oTelTracerModulePlugin = new OTelTelemetryPlugin(settings);
telemetry = oTelTracerModulePlugin.getTelemetry(null);
telemetry = oTelTracerModulePlugin.getTelemetry(
new TelemetrySettings(Settings.EMPTY, new ClusterSettings(settings, Set.of(TRACER_ENABLED_SETTING, TRACER_SAMPLER_PROBABILITY)))
);
tracingTelemetry = telemetry.get().getTracingTelemetry();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.telemetry.tracing.sampler;

import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.telemetry.TelemetrySettings;
import org.opensearch.test.OpenSearchTestCase;

import java.util.Set;

import io.opentelemetry.sdk.trace.samplers.Sampler;

import static org.opensearch.telemetry.OTelTelemetrySettings.TRACER_EXPORTER_DELAY_SETTING;
import static org.opensearch.telemetry.TelemetrySettings.TRACER_ENABLED_SETTING;
import static org.opensearch.telemetry.TelemetrySettings.TRACER_SAMPLER_PROBABILITY;

public class ProbabilisticSamplerTests extends OpenSearchTestCase {

// When ProbabilisticSampler is created with OTelTelemetrySettings as null
public void testProbabilisticSamplerWithNullSettings() {
// Verify that the constructor throws IllegalArgumentException when given null settings
assertThrows(NullPointerException.class, () -> { new ProbabilisticSampler(null); });
}

public void testDefaultGetSampler() {
Settings settings = Settings.builder().put(TRACER_EXPORTER_DELAY_SETTING.getKey(), "1s").build();
TelemetrySettings telemetrySettings = new TelemetrySettings(
Settings.EMPTY,
new ClusterSettings(settings, Set.of(TRACER_SAMPLER_PROBABILITY, TRACER_ENABLED_SETTING))
);

// Probabilistic Sampler
ProbabilisticSampler probabilisticSampler = new ProbabilisticSampler(telemetrySettings);

assertNotNull(probabilisticSampler.getSampler());
assertEquals(0.01, probabilisticSampler.getSamplingRatio(), 0.0d);
}

public void testGetSamplerWithUpdatedSamplingRatio() {
Settings settings = Settings.builder().put(TRACER_EXPORTER_DELAY_SETTING.getKey(), "1s").build();
TelemetrySettings telemetrySettings = new TelemetrySettings(
Settings.EMPTY,
new ClusterSettings(settings, Set.of(TRACER_SAMPLER_PROBABILITY, TRACER_ENABLED_SETTING))
);

// Probabilistic Sampler
ProbabilisticSampler probabilisticSampler = new ProbabilisticSampler(telemetrySettings);
assertEquals(0.01d, probabilisticSampler.getSamplingRatio(), 0.0d);

telemetrySettings.setSamplingProbability(0.02);

// Need to call getSampler() to update the value of tracerHeadSamplerSamplingRatio
Sampler updatedProbabilisticSampler = probabilisticSampler.getSampler();
assertEquals(0.02, probabilisticSampler.getSamplingRatio(), 0.0d);
}

}
Loading

0 comments on commit 25a2624

Please sign in to comment.