Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding new inferred sampler #12927

Closed
wants to merge 14 commits into from
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,24 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 2.x]
### Added
- [Tiered caching] Introducing cache plugins and exposing Ehcache as one of the pluggable disk cache option ([#11874](https://github.com/opensearch-project/OpenSearch/pull/11874))
nishchay21 marked this conversation as resolved.
Show resolved Hide resolved
- Add support for dependencies in plugin descriptor properties with semver range ([#11441](https://github.com/opensearch-project/OpenSearch/pull/11441))
- Add community_id ingest processor ([#12121](https://github.com/opensearch-project/OpenSearch/pull/12121))
- Introduce query level setting `index.query.max_nested_depth` limiting nested queries ([#3268](https://github.com/opensearch-project/OpenSearch/issues/3268)
- Add toString methods to MultiSearchRequest, MultiGetRequest and CreateIndexRequest ([#12163](https://github.com/opensearch-project/OpenSearch/pull/12163))
- Support for returning scores in matched queries ([#11626](https://github.com/opensearch-project/OpenSearch/pull/11626))
- Add shard id property to SearchLookup for use in field types provided by plugins ([#1063](https://github.com/opensearch-project/OpenSearch/pull/1063))
- Force merge API supports performing on primary shards only ([#11269](https://github.com/opensearch-project/OpenSearch/pull/11269))
- [Tiered caching] Make IndicesRequestCache implementation configurable [EXPERIMENTAL] ([#12533](https://github.com/opensearch-project/OpenSearch/pull/12533))
- Add kuromoji_completion analyzer and filter ([#4835](https://github.com/opensearch-project/OpenSearch/issues/4835))
- The org.opensearch.bootstrap.Security should support codebase for JAR files with classifiers ([#12586](https://github.com/opensearch-project/OpenSearch/issues/12586))
- [Metrics Framework] Adds support for asynchronous gauge metric type. ([#12642](https://github.com/opensearch-project/OpenSearch/issues/12642))
- Make search query counters dynamic to support all query types ([#12601](https://github.com/opensearch-project/OpenSearch/pull/12601))
- [Tiered caching] Add policies controlling which values can enter pluggable caches [EXPERIMENTAL] ([#12542](https://github.com/opensearch-project/OpenSearch/pull/12542))
- [Tiered caching] Add Stale keys Management and CacheCleaner to IndicesRequestCache ([#12625](https://github.com/opensearch-project/OpenSearch/pull/12625))
- Add a counter to node stat api to track shard going from idle to non-idle ([#12768](https://github.com/opensearch-project/OpenSearch/pull/12768))
- [Concurrent Segment Search] Perform buildAggregation concurrently and support Composite Aggregations ([#12697](https://github.com/opensearch-project/OpenSearch/pull/12697))
- [Tracing Framework] Adds support for inferred sampling ([#12315](https://github.com/opensearch-project/OpenSearch/issues/12315))

### Dependencies
- Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,24 @@

private TransportAddress remoteAddress;

private boolean sampled;

public void remoteAddress(TransportAddress remoteAddress) {
this.remoteAddress = remoteAddress;
}

public void markSampled() {
nishchay21 marked this conversation as resolved.
Show resolved Hide resolved
this.sampled = true;
}

Check warning on line 56 in libs/core/src/main/java/org/opensearch/core/transport/TransportMessage.java

View check run for this annotation

Codecov / codecov/patch

libs/core/src/main/java/org/opensearch/core/transport/TransportMessage.java#L55-L56

Added lines #L55 - L56 were not covered by tests

public TransportAddress remoteAddress() {
return remoteAddress;
}

public boolean sampled() {
return sampled;
}

/**
* Constructs a new empty transport message
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
package org.opensearch.telemetry.tracing;

import org.opensearch.common.annotation.InternalApi;
import org.opensearch.telemetry.tracing.attributes.AttributeType;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

/**
Expand All @@ -24,6 +27,7 @@
private final Span beforeSpan;
private static final ThreadLocal<SpanScope> spanScopeThreadLocal = new ThreadLocal<>();
private final TracerContextStorage<String, Span> tracerContextStorage;
private final Map<String, AttributeType> commonAttributeMap = new HashMap<>();

/**
* Constructor
Expand All @@ -40,6 +44,7 @@
this.beforeSpan = beforeSpan;
this.previousSpanScope = previousSpanScope;
this.tracerContextStorage = tracerContextStorage;
initializeCommonPropagationAttributes();
nishchay21 marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand All @@ -55,6 +60,13 @@
return newSpanScope;
}

/**
* Common attributes need to be taken from parent and propagated to child span*
*/
private void initializeCommonPropagationAttributes() {
commonAttributeMap.put(TracerContextStorage.INFERRED_SAMPLER, AttributeType.BOOLEAN);
}

@Override
public void close() {
detach();
Expand All @@ -64,9 +76,38 @@
public SpanScope attach() {
spanScopeThreadLocal.set(this);
tracerContextStorage.put(TracerContextStorage.CURRENT_SPAN, this.span);
addCommonParentAttributes();
return this;
}

private void addCommonParentAttributes() {
// This work as common attribute propagator from parent to child
for (String attribute : commonAttributeMap.keySet()) {
if (beforeSpan != null && beforeSpan.getAttributes().containsKey(attribute)) {
AttributeType attributeValue = commonAttributeMap.get(attribute);
this.storeAttributeValue(attribute, attributeValue);

Check warning on line 88 in libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultSpanScope.java

View check run for this annotation

Codecov / codecov/patch

libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultSpanScope.java#L87-L88

Added lines #L87 - L88 were not covered by tests
}
}
}

private void storeAttributeValue(String attribute, AttributeType attributeType) {
switch (attributeType) {
case BOOLEAN:
span.addAttribute(attribute, (Boolean) beforeSpan.getAttribute(attribute));
break;

Check warning on line 97 in libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultSpanScope.java

View check run for this annotation

Codecov / codecov/patch

libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultSpanScope.java#L96-L97

Added lines #L96 - L97 were not covered by tests
case LONG:
span.addAttribute(attribute, (Long) beforeSpan.getAttribute(attribute));
break;

Check warning on line 100 in libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultSpanScope.java

View check run for this annotation

Codecov / codecov/patch

libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultSpanScope.java#L99-L100

Added lines #L99 - L100 were not covered by tests
case STRING:
span.addAttribute(attribute, (String) beforeSpan.getAttribute(attribute));
break;

Check warning on line 103 in libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultSpanScope.java

View check run for this annotation

Codecov / codecov/patch

libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultSpanScope.java#L102-L103

Added lines #L102 - L103 were not covered by tests
case DOUBLE:
span.addAttribute(attribute, (Double) beforeSpan.getAttribute(attribute));

Check warning on line 105 in libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultSpanScope.java

View check run for this annotation

Codecov / codecov/patch

libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultSpanScope.java#L105

Added line #L105 was not covered by tests
break;
// Add more cases for other types if needed
}
}

Check warning on line 109 in libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultSpanScope.java

View check run for this annotation

Codecov / codecov/patch

libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultSpanScope.java#L109

Added line #L109 was not covered by tests

private void detach() {
spanScopeThreadLocal.set(previousSpanScope);
if (beforeSpan != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,13 @@
@Override
public Span startSpan(SpanCreationContext spanCreationContext, Map<String, Collection<String>> headers) {
Optional<Span> propagatedSpan = tracingTelemetry.getContextPropagator().extractFromHeaders(headers);
addRequestAttributeToContext(spanCreationContext, headers);
return startSpan(spanCreationContext.parent(propagatedSpan.map(SpanContext::new).orElse(null)));
}

private void addRequestAttributeToContext(SpanCreationContext spanCreationContext, Map<String, Collection<String>> headers) {
if (headers != null && headers.containsKey(TracerContextStorage.INFERRED_SAMPLER)) {
spanCreationContext.getAttributes().addAttribute(TracerContextStorage.INFERRED_SAMPLER, true);

Check warning on line 113 in libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultTracer.java

View check run for this annotation

Codecov / codecov/patch

libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultTracer.java#L113

Added line #L113 was not covered by tests
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

import org.opensearch.common.annotation.ExperimentalApi;

import java.util.Map;

/**
* An interface that represents a tracing span.
* Spans are created by the Tracer.startSpan method.
Expand Down Expand Up @@ -93,4 +95,16 @@ public interface Span {
*/
String getSpanId();

/**
* Returns attribute.
* @param key key
* @return value
*/
Object getAttribute(String key);

/**
* Returns the attributes as map.
* @return returns the attributes map.
*/
Map<String, Object> getAttributes();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@ public interface TracerContextStorage<K, V> {
*/
String CURRENT_SPAN = "current_span";

/**
* Key for storing sample information
*/
String SAMPLED = "sampled";
nishchay21 marked this conversation as resolved.
Show resolved Hide resolved

/**
* Key for storing inferred Sampling information
*/
String INFERRED_SAMPLER = "Inferred_sampler";

/**
* Fetches value corresponding to key
* @param key of the tracing context
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.telemetry.tracing.attributes;

import org.opensearch.common.annotation.PublicApi;

/**
* Type of Attribute.
*/
@PublicApi(since = "2.14.0")
public enum AttributeType {
nishchay21 marked this conversation as resolved.
Show resolved Hide resolved

/**
* Attribute type represents the Boolean attribute.
*/
BOOLEAN,

/**
* Attribute type represents the Long attribute.
*/
LONG,

/**
* Attribute type represents the String attribute.
*/
STRING,

/**
* Attribute type represents the Double attribute.
*/
DOUBLE
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.opensearch.common.annotation.InternalApi;
import org.opensearch.telemetry.tracing.Span;

import java.util.Map;

/**
* No-op implementation of {@link org.opensearch.telemetry.tracing.Span}
*
Expand Down Expand Up @@ -82,4 +84,25 @@
public String getSpanId() {
return "noop-span-id";
}

/**
* Returns attribute.
*
* @param key key
* @return value
*/
@Override
public Object getAttribute(String key) {
nishchay21 marked this conversation as resolved.
Show resolved Hide resolved
return null;

Check warning on line 96 in libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/noop/NoopSpan.java

View check run for this annotation

Codecov / codecov/patch

libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/noop/NoopSpan.java#L96

Added line #L96 was not covered by tests
}

/**
* Returns the attributes as map.
*
* @return returns the attributes map.
*/
@Override
public Map<String, Object> getAttributes() {
nishchay21 marked this conversation as resolved.
Show resolved Hide resolved
return null;

Check warning on line 106 in libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/noop/NoopSpan.java

View check run for this annotation

Codecov / codecov/patch

libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/noop/NoopSpan.java#L106

Added line #L106 was not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.telemetry.TelemetrySettings;
import org.opensearch.telemetry.metrics.exporter.OTelMetricsExporterFactory;
import org.opensearch.telemetry.tracing.exporter.OTelSpanExporterFactory;
import org.opensearch.telemetry.tracing.processor.OtelSpanProcessor;
import org.opensearch.telemetry.tracing.sampler.OTelSamplerFactory;
import org.opensearch.telemetry.tracing.sampler.RequestSampler;

Expand Down Expand Up @@ -117,7 +118,11 @@ private static SdkTracerProvider createSdkTracerProvider(
.build();
}

private static BatchSpanProcessor spanProcessor(Settings settings, SpanExporter spanExporter) {
private static OtelSpanProcessor spanProcessor(Settings settings, SpanExporter spanExporter) {
return new OtelSpanProcessor(batchSpanProcessor(settings, spanExporter));
}

private static BatchSpanProcessor batchSpanProcessor(Settings settings, SpanExporter spanExporter) {
return BatchSpanProcessor.builder(spanExporter)
.setScheduleDelay(TRACER_EXPORTER_DELAY_SETTING.get(settings).getSeconds(), TimeUnit.SECONDS)
.setMaxExportBatchSize(TRACER_EXPORTER_BATCH_SIZE_SETTING.get(settings))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

package org.opensearch.telemetry.tracing;

import java.util.HashMap;
import java.util.Map;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;

Expand All @@ -19,6 +22,8 @@

private final Span delegateSpan;

private final Map<String, Object> metadata;

/**
* Constructor
* @param spanName span name
Expand All @@ -28,31 +33,47 @@
public OTelSpan(String spanName, Span span, org.opensearch.telemetry.tracing.Span parentSpan) {
super(spanName, parentSpan);
this.delegateSpan = span;
this.metadata = new HashMap<>();
}

@Override
public void endSpan() {
if (getAttributes().containsKey(TracerContextStorage.SAMPLED)) {
markParentForSampling();

Check warning on line 42 in plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelSpan.java

View check run for this annotation

Codecov / codecov/patch

plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelSpan.java#L42

Added line #L42 was not covered by tests
nishchay21 marked this conversation as resolved.
Show resolved Hide resolved
}
delegateSpan.end();
}

private void markParentForSampling() {
org.opensearch.telemetry.tracing.Span current_parent = getParentSpan();

Check warning on line 48 in plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelSpan.java

View check run for this annotation

Codecov / codecov/patch

plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelSpan.java#L48

Added line #L48 was not covered by tests
nishchay21 marked this conversation as resolved.
Show resolved Hide resolved
while (current_parent != null && !current_parent.getAttributes().containsKey(TracerContextStorage.SAMPLED)) {
nishchay21 marked this conversation as resolved.
Show resolved Hide resolved
current_parent.addAttribute(TracerContextStorage.SAMPLED, true);
current_parent = current_parent.getParentSpan();

Check warning on line 51 in plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelSpan.java

View check run for this annotation

Codecov / codecov/patch

plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelSpan.java#L50-L51

Added lines #L50 - L51 were not covered by tests
}
}

Check warning on line 53 in plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelSpan.java

View check run for this annotation

Codecov / codecov/patch

plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelSpan.java#L53

Added line #L53 was not covered by tests

@Override
public void addAttribute(String key, String value) {
delegateSpan.setAttribute(key, value);
metadata.put(key, value);
nishchay21 marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public void addAttribute(String key, Long value) {
delegateSpan.setAttribute(key, value);
metadata.put(key, value);
}

@Override
public void addAttribute(String key, Double value) {
delegateSpan.setAttribute(key, value);
metadata.put(key, value);
}

@Override
public void addAttribute(String key, Boolean value) {
delegateSpan.setAttribute(key, value);
metadata.put(key, value);
}

@Override
Expand All @@ -77,8 +98,17 @@
return delegateSpan.getSpanContext().getSpanId();
}

@Override
public Object getAttribute(String key) {
return metadata.get(key);

Check warning on line 103 in plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelSpan.java

View check run for this annotation

Codecov / codecov/patch

plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelSpan.java#L103

Added line #L103 was not covered by tests
}

@Override
public Map<String, Object> getAttributes() {
return metadata;
}

io.opentelemetry.api.trace.Span getDelegateSpan() {
return delegateSpan;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.io.Closeable;
import java.io.IOException;

import io.opentelemetry.api.baggage.Baggage;
import io.opentelemetry.api.trace.TracerProvider;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.OpenTelemetrySdk;
Expand Down Expand Up @@ -63,9 +64,22 @@
OTelSpanKindConverter.convert(spanCreationContext.getSpanKind())
);
Span newSpan = new OTelSpan(spanCreationContext.getSpanName(), otelSpan, parentSpan);

// This attribute is added for inferred sampling
addInferredAttribute(newSpan, spanCreationContext);

return newSpan;
}

private void addInferredAttribute(Span newSpan, SpanCreationContext spanCreationContext) {
nishchay21 marked this conversation as resolved.
Show resolved Hide resolved
// If the current context has this attribute we need to add the same to the span as well.
if (Baggage.current().getEntryValue(TracerContextStorage.INFERRED_SAMPLER) != null
|| (spanCreationContext.getAttributes() != null
&& spanCreationContext.getAttributes().getAttributesMap().containsKey(TracerContextStorage.INFERRED_SAMPLER))) {
newSpan.addAttribute(TracerContextStorage.INFERRED_SAMPLER, true);

Check warning on line 79 in plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelTracingTelemetry.java

View check run for this annotation

Codecov / codecov/patch

plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelTracingTelemetry.java#L79

Added line #L79 was not covered by tests
}
}

io.opentelemetry.api.trace.Span otelSpan(
String spanName,
Span parentOTelSpan,
Expand Down
Loading
Loading