Skip to content

Commit

Permalink
[BUG] The thread context is not properly cleared and messes up the tr…
Browse files Browse the repository at this point in the history
…aces

Signed-off-by: Andriy Redko <[email protected]>
  • Loading branch information
reta committed Oct 24, 2023
1 parent 5bd413c commit c1bbfbb
Show file tree
Hide file tree
Showing 19 changed files with 256 additions and 84 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix 'org.apache.hc.core5.http.ParseException: Invalid protocol version' under JDK 16+ ([#4827](https://github.com/opensearch-project/OpenSearch/pull/4827))
- Fix compression support for h2c protocol ([#4944](https://github.com/opensearch-project/OpenSearch/pull/4944))
- Don't over-allocate in HeapBufferedAsyncEntityConsumer in order to consume the response ([#9993](https://github.com/opensearch-project/OpenSearch/pull/9993))
- [BUG] The thread context is not properly cleared and messes up the traces ([#10873](https://github.com/opensearch-project/OpenSearch/pull/10873))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
class DefaultSpanScope implements SpanScope {
private final Span span;
private final SpanScope previousSpanScope;
private static final ThreadLocal<SpanScope> spanScopeThreadLocal = new ThreadLocal<>();
private final TracerContextStorage<String, Span> tracerContextStorage;
private static final ThreadLocal<SpanScope> spanScopeThreadLocal = new ThreadLocal<>();

/**
* Constructor
Expand All @@ -42,25 +42,24 @@ private DefaultSpanScope(Span span, SpanScope previousSpanScope, TracerContextSt
* @return SpanScope spanScope
*/
public static SpanScope create(Span span, TracerContextStorage<String, Span> tracerContextStorage) {
final SpanScope beforeSpanScope = spanScopeThreadLocal.get();
SpanScope newSpanScope = new DefaultSpanScope(span, beforeSpanScope, tracerContextStorage);
spanScopeThreadLocal.set(newSpanScope);
return newSpanScope;
final SpanScope previousSpanScope = spanScopeThreadLocal.get();
return new DefaultSpanScope(span, previousSpanScope, tracerContextStorage);
}

@Override
public void close() {
detach();
spanScopeThreadLocal.set(previousSpanScope);
}

@Override
public SpanScope attach() {
spanScopeThreadLocal.set(this);
tracerContextStorage.put(TracerContextStorage.CURRENT_SPAN, this.span);
return this;
}

private void detach() {
spanScopeThreadLocal.set(previousSpanScope);
if (previousSpanScope != null) {
tracerContextStorage.put(TracerContextStorage.CURRENT_SPAN, previousSpanScope.getSpan());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@

import org.opensearch.common.annotation.InternalApi;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;

Expand Down Expand Up @@ -53,14 +52,13 @@ public Span startSpan(SpanCreationContext context) {
parentSpan = getCurrentSpanInternal();
}
Span span = createSpan(context, parentSpan);
setCurrentSpanInContext(span);
addDefaultAttributes(span);
return span;
}

@Override
public void close() throws IOException {
((Closeable) tracingTelemetry).close();
tracingTelemetry.close();
}

private Span getCurrentSpanInternal() {
Expand All @@ -69,7 +67,7 @@ private Span getCurrentSpanInternal() {

@Override
public SpanContext getCurrentSpan() {
final Span currentSpan = tracerContextStorage.get(TracerContextStorage.CURRENT_SPAN);
final Span currentSpan = getCurrentSpanInternal();
return (currentSpan == null) ? null : new SpanContext(currentSpan);
}

Expand All @@ -94,10 +92,6 @@ private Span createSpan(SpanCreationContext spanCreationContext, Span parentSpan
return tracingTelemetry.createSpan(spanCreationContext, parentSpan);
}

private void setCurrentSpanInContext(Span span) {
tracerContextStorage.put(TracerContextStorage.CURRENT_SPAN, span);
}

/**
* Adds default attributes in the span
* @param span the current active span
Expand All @@ -107,7 +101,7 @@ protected void addDefaultAttributes(Span span) {
}

@Override
public Span startSpan(SpanCreationContext spanCreationContext, Map<String, List<String>> headers) {
public Span startSpan(SpanCreationContext spanCreationContext, Map<String, Collection<String>> headers) {
Optional<Span> propagatedSpan = tracingTelemetry.getContextPropagator().extractFromHeaders(headers);
return startSpan(spanCreationContext.parent(propagatedSpan.map(SpanContext::new).orElse(null)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
@InternalApi
final class SpanReference {

private Span span;
private volatile Span span;

/**
* Creates the wrapper with given span
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
package org.opensearch.telemetry.tracing;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.telemetry.tracing.http.HttpTracer;
import org.opensearch.telemetry.tracing.transport.TransportTracer;

import java.io.Closeable;

Expand All @@ -22,7 +22,7 @@
* @opensearch.experimental
*/
@ExperimentalApi
public interface Tracer extends HttpTracer, Closeable {
public interface Tracer extends TransportTracer, Closeable {
/**
* Starts the {@link Span} with given {@link SpanCreationContext}
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import org.opensearch.common.annotation.ExperimentalApi;

import java.util.List;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiConsumer;
Expand All @@ -36,7 +36,7 @@ public interface TracingContextPropagator {
* @param headers request headers to extract the context from
* @return current span
*/
Optional<Span> extractFromHeaders(Map<String, List<String>> headers);
Optional<Span> extractFromHeaders(Map<String, Collection<String>> headers);

/**
* Injects tracing context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.Tracer;

import java.util.List;
import java.util.Collection;
import java.util.Map;

/**
Expand Down Expand Up @@ -65,7 +65,7 @@ public void close() {
}

@Override
public Span startSpan(SpanCreationContext spanCreationContext, Map<String, List<String>> header) {
public Span startSpan(SpanCreationContext spanCreationContext, Map<String, Collection<String>> header) {
return NoopSpan.INSTANCE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,31 @@
* compatible open source license.
*/

package org.opensearch.telemetry.tracing.http;
package org.opensearch.telemetry.tracing.transport;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanCreationContext;

import java.util.List;
import java.util.Collection;
import java.util.Map;

/**
* HttpTracer helps in creating a {@link Span} which reads the incoming tracing information
* from the HttpRequest header and propagate the span accordingly.
* TransportTracer helps in creating a {@link Span} which reads the incoming tracing information
* from the HTTP or TCP transport headers and propagate the span accordingly.
* <p>
* All methods on the Tracer object are multi-thread safe.
*
* @opensearch.experimental
*/
@ExperimentalApi
public interface HttpTracer {
public interface TransportTracer {
/**
* Start the span with propagating the tracing info from the HttpRequest header.
*
* @param spanCreationContext span name.
* @param header http request header.
* @return span.
* @param headers transport headers
* @return the span instance
*/
Span startSpan(SpanCreationContext spanCreationContext, Map<String, List<String>> header);
public Span startSpan(SpanCreationContext spanCreationContext, Map<String, Collection<String>> headers);
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@
*/

/**
* Contains No-op implementations
* Contains TCP transport related tracer capabilities
*/
package org.opensearch.telemetry.tracing.http;
package org.opensearch.telemetry.tracing.transport;
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
Expand All @@ -35,7 +37,6 @@ public class DefaultTracerTests extends OpenSearchTestCase {
private Span mockSpan;
private Span mockParentSpan;

private SpanScope mockSpanScope;
private ThreadPool threadPool;
private ExecutorService executorService;
private SpanCreationContext spanCreationContext;
Expand Down Expand Up @@ -102,11 +103,11 @@ public void testCreateSpanWithAttributes() {

Span span = defaultTracer.startSpan(spanCreationContext);

assertEquals("span_name", defaultTracer.getCurrentSpan().getSpan().getSpanName());
assertEquals(1.0, ((MockSpan) defaultTracer.getCurrentSpan().getSpan()).getAttribute("key1"));
assertEquals(2l, ((MockSpan) defaultTracer.getCurrentSpan().getSpan()).getAttribute("key2"));
assertEquals(true, ((MockSpan) defaultTracer.getCurrentSpan().getSpan()).getAttribute("key3"));
assertEquals("key4", ((MockSpan) defaultTracer.getCurrentSpan().getSpan()).getAttribute("key4"));
assertThat(defaultTracer.getCurrentSpan(), is(nullValue()));
assertEquals(1.0, ((MockSpan) span).getAttribute("key1"));
assertEquals(2l, ((MockSpan) span).getAttribute("key2"));
assertEquals(true, ((MockSpan) span).getAttribute("key3"));
assertEquals("key4", ((MockSpan) span).getAttribute("key4"));
span.endSpan();
}

Expand All @@ -121,16 +122,18 @@ public void testCreateSpanWithParent() {

Span span = defaultTracer.startSpan(spanCreationContext, null);

SpanContext parentSpan = defaultTracer.getCurrentSpan();

SpanCreationContext spanCreationContext1 = buildSpanCreationContext("span_name_1", Attributes.EMPTY, parentSpan.getSpan());
try (final SpanScope scope = defaultTracer.withSpanInScope(span)) {
SpanContext parentSpan = defaultTracer.getCurrentSpan();

Span span1 = defaultTracer.startSpan(spanCreationContext1);
SpanCreationContext spanCreationContext1 = buildSpanCreationContext("span_name_1", Attributes.EMPTY, parentSpan.getSpan());

assertEquals("span_name_1", defaultTracer.getCurrentSpan().getSpan().getSpanName());
assertEquals(parentSpan.getSpan(), defaultTracer.getCurrentSpan().getSpan().getParentSpan());
span1.endSpan();
span.endSpan();
try (final ScopedSpan span1 = defaultTracer.startScopedSpan(spanCreationContext1)) {
assertEquals("span_name_1", defaultTracer.getCurrentSpan().getSpan().getSpanName());
assertEquals(parentSpan.getSpan(), defaultTracer.getCurrentSpan().getSpan().getParentSpan());
}
} finally {
span.endSpan();
}
}

@SuppressWarnings("unchecked")
Expand All @@ -155,8 +158,7 @@ public void testCreateSpanWithNullParent() {

Span span = defaultTracer.startSpan(spanCreationContext);

assertEquals("span_name", defaultTracer.getCurrentSpan().getSpan().getSpanName());
assertEquals(null, defaultTracer.getCurrentSpan().getSpan().getParentSpan());
assertThat(defaultTracer.getCurrentSpan(), is(nullValue()));
span.endSpan();
}

Expand Down Expand Up @@ -403,7 +405,6 @@ private void setupMocks() {
mockTracingTelemetry = mock(TracingTelemetry.class);
mockSpan = mock(Span.class);
mockParentSpan = mock(Span.class);
mockSpanScope = mock(SpanScope.class);
mockTracerContextStorage = mock(TracerContextStorage.class);
when(mockSpan.getSpanName()).thenReturn("span_name");
when(mockSpan.getSpanId()).thenReturn("span_id");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@

import org.opensearch.core.common.Strings;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -51,7 +51,7 @@ private static OTelPropagatedSpan getPropagatedSpan(Context context) {
}

@Override
public Optional<Span> extractFromHeaders(Map<String, List<String>> headers) {
public Optional<Span> extractFromHeaders(Map<String, Collection<String>> headers) {
Context context = openTelemetry.getPropagators().getTextMapPropagator().extract(Context.current(), headers, HEADER_TEXT_MAP_GETTER);
return Optional.ofNullable(getPropagatedSpan(context));
}
Expand Down Expand Up @@ -87,9 +87,9 @@ public String get(Map<String, String> headers, String key) {
}
};

private static final TextMapGetter<Map<String, List<String>>> HEADER_TEXT_MAP_GETTER = new TextMapGetter<>() {
private static final TextMapGetter<Map<String, Collection<String>>> HEADER_TEXT_MAP_GETTER = new TextMapGetter<>() {
@Override
public Iterable<String> keys(Map<String, List<String>> headers) {
public Iterable<String> keys(Map<String, Collection<String>> headers) {
if (headers != null) {
return headers.keySet();
} else {
Expand All @@ -98,7 +98,7 @@ public Iterable<String> keys(Map<String, List<String>> headers) {
}

@Override
public String get(Map<String, List<String>> headers, String key) {
public String get(Map<String, Collection<String>> headers, String key) {
if (headers != null && headers.containsKey(key)) {
return Strings.collectionToCommaDelimitedString(headers.get(key));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
import org.opensearch.test.OpenSearchTestCase;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import io.opentelemetry.api.OpenTelemetry;
Expand Down Expand Up @@ -57,7 +57,7 @@ public void testExtractTracerContextFromHeader() {
}

public void testExtractTracerContextFromHttpHeader() {
Map<String, List<String>> requestHeaders = new HashMap<>();
Map<String, Collection<String>> requestHeaders = new HashMap<>();
requestHeaders.put("traceparent", Arrays.asList("00-" + TRACE_ID + "-" + SPAN_ID + "-00"));
OpenTelemetry mockOpenTelemetry = mock(OpenTelemetry.class);
when(mockOpenTelemetry.getPropagators()).thenReturn(ContextPropagators.create(W3CTraceContextPropagator.getInstance()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,11 @@
import java.nio.channels.CancelledKeyException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -362,7 +364,7 @@ protected void serverAcceptedChannel(HttpChannel httpChannel) {
* @param httpChannel that received the http request
*/
public void incomingRequest(final HttpRequest httpRequest, final HttpChannel httpChannel) {
final Span span = tracer.startSpan(SpanBuilder.from(httpRequest), httpRequest.getHeaders());
final Span span = tracer.startSpan(SpanBuilder.from(httpRequest), extractHeaders(httpRequest.getHeaders()));
try (final SpanScope httpRequestSpanScope = tracer.withSpanInScope(span)) {
HttpChannel traceableHttpChannel = TraceableHttpChannel.create(httpChannel, span, tracer);
handleIncomingRequest(httpRequest, traceableHttpChannel, httpRequest.getInboundException());
Expand Down Expand Up @@ -483,4 +485,9 @@ private static ActionListener<Void> earlyResponseListener(HttpRequest request, H
return NO_OP;
}
}

@SuppressWarnings("unchecked")
private static <Values extends Collection<String>> Map<String, Collection<String>> extractHeaders(Map<String, Values> headers) {
return (Map<String, Collection<String>>) headers;
}
}
Loading

0 comments on commit c1bbfbb

Please sign in to comment.