Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport] [2.x] Add support to use trace propagated from client (#9506) #9581

Merged
merged 2 commits into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Refactor Compressors from CompressorFactory to CompressorRegistry for extensibility ([#9262](https://github.com/opensearch-project/OpenSearch/pull/9262))
- Fix sort related ITs for concurrent search ([#9177](https://github.com/opensearch-project/OpenSearch/pull/9466)
- [Remote Store] Implicitly use replication type SEGMENT for remote store clusters ([#9264](https://github.com/opensearch-project/OpenSearch/pull/9264))
- Add support to use trace propagated from client ([#9506](https://github.com/opensearch-project/OpenSearch/pull/9506))
- Separate request-based and settings-based concurrent segment search controls and introduce AggregatorFactory method to determine concurrent search support ([#9469](https://github.com/opensearch-project/OpenSearch/pull/9469))

### Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
*
Expand Down Expand Up @@ -44,7 +47,7 @@ public SpanScope startSpan(String spanName) {

@Override
public SpanScope startSpan(String spanName, Attributes attributes) {
return startSpan(spanName, null, attributes);
return startSpan(spanName, (SpanContext) null, attributes);
}

@Override
Expand Down Expand Up @@ -97,4 +100,10 @@ protected void addDefaultAttributes(Span span) {
span.addAttribute(THREAD_NAME, Thread.currentThread().getName());
}

@Override
public SpanScope startSpan(String spanName, Map<String, List<String>> headers, Attributes attributes) {
Optional<Span> propagatedSpan = tracingTelemetry.getContextPropagator().extractFromHeaders(headers);
return startSpan(spanName, propagatedSpan.map(SpanContext::new).orElse(null), attributes);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.telemetry.tracing;

import org.opensearch.telemetry.tracing.attributes.Attributes;
import org.opensearch.telemetry.tracing.http.HttpTracer;

import java.io.Closeable;

Expand All @@ -18,7 +19,7 @@
*
* All methods on the Tracer object are multi-thread safe.
*/
public interface Tracer extends Closeable {
public interface Tracer extends HttpTracer, Closeable {

/**
* Starts the {@link Span} with given name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@

package org.opensearch.telemetry.tracing;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiConsumer;

/**
Expand All @@ -23,7 +25,15 @@ public interface TracingContextPropagator {
* @param props properties
* @return current span
*/
Span extract(Map<String, String> props);
Optional<Span> extract(Map<String, String> props);

/**
* Extracts current span from HTTP headers.
*
* @param headers request headers to extract the context from
* @return current span
*/
Optional<Span> extractFromHeaders(Map<String, List<String>> headers);

/**
* Injects tracing context
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.telemetry.tracing.http;

import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.attributes.Attributes;

import java.util.List;
import java.util.Map;

/**
* HttpTracer helps in creating a {@link Span} which reads the incoming tracing information
* from the HttpRequest header and propagate the span accordingly.
*
* All methods on the Tracer object are multi-thread safe.
*/
public interface HttpTracer {
/**
* Start the span with propagating the tracing info from the HttpRequest header.
*
* @param spanName span name.
* @param header http request header.
* @param attributes span attributes.
* @return scope of the span, must be closed with explicit close or with try-with-resource
*/
SpanScope startSpan(String spanName, Map<String, List<String>> header, Attributes attributes);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* Contains No-op implementations
*/
package org.opensearch.telemetry.tracing.http;
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.telemetry.tracing.attributes.Attributes;

import java.util.List;
import java.util.Map;

/**
* No-op implementation of Tracer
*
Expand Down Expand Up @@ -51,4 +54,9 @@ public SpanContext getCurrentSpan() {
public void close() {

}

@Override
public SpanScope startSpan(String spanName, Map<String, List<String>> header, Attributes attributes) {
return SpanScope.NO_OP;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
import org.junit.Assert;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
Expand Down Expand Up @@ -104,14 +108,36 @@ public void testCreateSpanWithParent() {
Assert.assertEquals(parentSpan.getSpan(), defaultTracer.getCurrentSpan().getSpan().getParentSpan());
}

public void testHttpTracer() {
String traceId = "trace_id";
String spanId = "span_id";
TracingTelemetry tracingTelemetry = new MockTracingTelemetry();

DefaultTracer defaultTracer = new DefaultTracer(
tracingTelemetry,
new ThreadContextBasedTracerContextStorage(new ThreadContext(Settings.EMPTY), tracingTelemetry)
);

Map<String, List<String>> requestHeaders = new HashMap<>();
requestHeaders.put("traceparent", Arrays.asList(traceId + "~" + spanId));

SpanScope spanScope = defaultTracer.startSpan("test_span", requestHeaders, Attributes.EMPTY);
SpanContext currentSpan = defaultTracer.getCurrentSpan();
assertNotNull(currentSpan);
assertEquals(traceId, currentSpan.getSpan().getTraceId());
assertEquals(traceId, currentSpan.getSpan().getParentSpan().getTraceId());
assertEquals(spanId, currentSpan.getSpan().getParentSpan().getSpanId());
spanScope.close();
}

public void testCreateSpanWithNullParent() {
TracingTelemetry tracingTelemetry = new MockTracingTelemetry();
DefaultTracer defaultTracer = new DefaultTracer(
tracingTelemetry,
new ThreadContextBasedTracerContextStorage(new ThreadContext(Settings.EMPTY), tracingTelemetry)
);

defaultTracer.startSpan("span_name", null, Attributes.EMPTY);
defaultTracer.startSpan("span_name");

Assert.assertEquals("span_name", defaultTracer.getCurrentSpan().getSpan().getSpanName());
Assert.assertEquals(null, defaultTracer.getCurrentSpan().getSpan().getParentSpan());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,16 @@ public void testRunnableWithParent() throws Exception {
DefaultTracer defaultTracer = new DefaultTracer(new MockTracingTelemetry(), contextStorage);
defaultTracer.startSpan(parentSpanName);
SpanContext parentSpan = defaultTracer.getCurrentSpan();
AtomicReference<SpanContext> currrntSpan = new AtomicReference<>(new SpanContext(null));
AtomicReference<SpanContext> currentSpan = new AtomicReference<>();
final AtomicBoolean isRunnableCompleted = new AtomicBoolean(false);
TraceableRunnable traceableRunnable = new TraceableRunnable(defaultTracer, spanName, parentSpan, Attributes.EMPTY, () -> {
isRunnableCompleted.set(true);
currrntSpan.set(defaultTracer.getCurrentSpan());
currentSpan.set(defaultTracer.getCurrentSpan());
});
traceableRunnable.run();
assertTrue(isRunnableCompleted.get());
assertEquals(spanName, currrntSpan.get().getSpan().getSpanName());
assertEquals(parentSpan.getSpan(), currrntSpan.get().getSpan().getParentSpan());
assertEquals(spanName, currentSpan.get().getSpan().getSpanName());
assertEquals(parentSpan.getSpan(), currentSpan.get().getSpan().getParentSpan());
assertEquals(parentSpan.getSpan(), defaultTracer.getCurrentSpan().getSpan());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@

package org.opensearch.telemetry.tracing;

import org.opensearch.core.common.Strings;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiConsumer;

import io.opentelemetry.api.OpenTelemetry;
Expand All @@ -32,15 +37,25 @@ public OTelTracingContextPropagator(OpenTelemetry openTelemetry) {
}

@Override
public Span extract(Map<String, String> props) {
public Optional<Span> extract(Map<String, String> props) {
Context context = openTelemetry.getPropagators().getTextMapPropagator().extract(Context.current(), props, TEXT_MAP_GETTER);
return Optional.ofNullable(getPropagatedSpan(context));
}

private static OTelPropagatedSpan getPropagatedSpan(Context context) {
if (context != null) {
io.opentelemetry.api.trace.Span span = io.opentelemetry.api.trace.Span.fromContext(context);
return new OTelPropagatedSpan(span);
}
return null;
}

@Override
public Optional<Span> extractFromHeaders(Map<String, List<String>> headers) {
Context context = openTelemetry.getPropagators().getTextMapPropagator().extract(Context.current(), headers, HEADER_TEXT_MAP_GETTER);
return Optional.ofNullable(getPropagatedSpan(context));
}

@Override
public void inject(Span currentSpan, BiConsumer<String, String> setter) {
openTelemetry.getPropagators().getTextMapPropagator().inject(context((OTelSpan) currentSpan), setter, TEXT_MAP_SETTER);
Expand Down Expand Up @@ -72,4 +87,23 @@ public String get(Map<String, String> headers, String key) {
}
};

private static final TextMapGetter<Map<String, List<String>>> HEADER_TEXT_MAP_GETTER = new TextMapGetter<>() {
@Override
public Iterable<String> keys(Map<String, List<String>> headers) {
if (headers != null) {
return headers.keySet();
} else {
return Collections.emptySet();
}
}

@Override
public String get(Map<String, List<String>> headers, String key) {
if (headers != null && headers.containsKey(key)) {
return Strings.collectionToCommaDelimitedString(headers.get(key));
}
return null;
}
};

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@

import org.opensearch.test.OpenSearchTestCase;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import io.opentelemetry.api.OpenTelemetry;
Expand All @@ -19,6 +21,7 @@
import io.opentelemetry.api.trace.TraceFlags;
import io.opentelemetry.api.trace.TraceState;
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.ContextPropagators;

import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -48,8 +51,39 @@ public void testExtractTracerContextFromHeader() {
OpenTelemetry mockOpenTelemetry = mock(OpenTelemetry.class);
when(mockOpenTelemetry.getPropagators()).thenReturn(ContextPropagators.create(W3CTraceContextPropagator.getInstance()));
TracingContextPropagator tracingContextPropagator = new OTelTracingContextPropagator(mockOpenTelemetry);
org.opensearch.telemetry.tracing.Span span = tracingContextPropagator.extract(requestHeaders);
org.opensearch.telemetry.tracing.Span span = tracingContextPropagator.extract(requestHeaders).orElse(null);
assertEquals(TRACE_ID, span.getTraceId());
assertEquals(SPAN_ID, span.getSpanId());
}

public void testExtractTracerContextFromHttpHeader() {
Map<String, List<String>> requestHeaders = new HashMap<>();
requestHeaders.put("traceparent", Arrays.asList("00-" + TRACE_ID + "-" + SPAN_ID + "-00"));
OpenTelemetry mockOpenTelemetry = mock(OpenTelemetry.class);
when(mockOpenTelemetry.getPropagators()).thenReturn(ContextPropagators.create(W3CTraceContextPropagator.getInstance()));
TracingContextPropagator tracingContextPropagator = new OTelTracingContextPropagator(mockOpenTelemetry);
org.opensearch.telemetry.tracing.Span span = tracingContextPropagator.extractFromHeaders(requestHeaders).get();
assertEquals(TRACE_ID, span.getTraceId());
assertEquals(SPAN_ID, span.getSpanId());
}

public void testExtractTracerContextFromHttpHeaderNull() {
OpenTelemetry mockOpenTelemetry = mock(OpenTelemetry.class);
when(mockOpenTelemetry.getPropagators()).thenReturn(ContextPropagators.create(W3CTraceContextPropagator.getInstance()));
TracingContextPropagator tracingContextPropagator = new OTelTracingContextPropagator(mockOpenTelemetry);
org.opensearch.telemetry.tracing.Span span = tracingContextPropagator.extractFromHeaders(null).get();
org.opensearch.telemetry.tracing.Span propagatedSpan = new OTelPropagatedSpan(Span.fromContext(Context.root()));
assertEquals(propagatedSpan.getTraceId(), span.getTraceId());
assertEquals(propagatedSpan.getSpanId(), span.getSpanId());
}

public void testExtractTracerContextFromHttpHeaderEmpty() {
OpenTelemetry mockOpenTelemetry = mock(OpenTelemetry.class);
when(mockOpenTelemetry.getPropagators()).thenReturn(ContextPropagators.create(W3CTraceContextPropagator.getInstance()));
TracingContextPropagator tracingContextPropagator = new OTelTracingContextPropagator(mockOpenTelemetry);
org.opensearch.telemetry.tracing.Span span = tracingContextPropagator.extractFromHeaders(new HashMap<>()).get();
org.opensearch.telemetry.tracing.Span propagatedSpan = new OTelPropagatedSpan(Span.fromContext(Context.root()));
assertEquals(propagatedSpan.getTraceId(), span.getTraceId());
assertEquals(propagatedSpan.getSpanId(), span.getSpanId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ private Optional<Span> spanFromThreadContext(String key) {
}

private Span spanFromHeader() {
return tracingTelemetry.getContextPropagator().extract(threadContext.getHeaders());
Optional<Span> span = tracingTelemetry.getContextPropagator().extract(threadContext.getHeaders());
return span.orElse(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import org.opensearch.telemetry.tracing.noop.NoopTracer;

import java.io.IOException;
import java.util.List;
import java.util.Map;

/**
* Wrapper implementation of Tracer. This delegates call to right tracer based on the tracer settings
Expand Down Expand Up @@ -42,7 +44,7 @@ public SpanScope startSpan(String spanName) {

@Override
public SpanScope startSpan(String spanName, Attributes attributes) {
return startSpan(spanName, null, attributes);
return startSpan(spanName, (SpanContext) null, attributes);
}

@Override
Expand All @@ -66,4 +68,9 @@ public void close() throws IOException {
Tracer getDelegateTracer() {
return telemetrySettings.isTracingEnabled() ? defaultTracer : NoopTracer.INSTANCE;
}

@Override
public SpanScope startSpan(String spanName, Map<String, List<String>> headers, Attributes attributes) {
return defaultTracer.startSpan(spanName, headers, attributes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void testStartSpanWithTracingEnabledInvokesDefaultTracer() throws Excepti
wrappedTracer.startSpan("foo");

assertTrue(wrappedTracer.getDelegateTracer() instanceof DefaultTracer);
verify(mockDefaultTracer).startSpan(eq("foo"), eq(null), any(Attributes.class));
verify(mockDefaultTracer).startSpan(eq("foo"), eq((SpanContext) null), any(Attributes.class));
}
}

Expand All @@ -64,7 +64,7 @@ public void testStartSpanWithTracingEnabledInvokesDefaultTracerWithAttr() throws
wrappedTracer.startSpan("foo", attributes);

assertTrue(wrappedTracer.getDelegateTracer() instanceof DefaultTracer);
verify(mockDefaultTracer).startSpan("foo", null, attributes);
verify(mockDefaultTracer).startSpan("foo", (SpanContext) null, attributes);
}
}

Expand Down
Loading