Skip to content

Commit

Permalink
Fix default tracer flaky test (opensearch-project#9782)
Browse files Browse the repository at this point in the history
* Fix default tracer falky test

Signed-off-by: Gagan Juneja <[email protected]>

* Empty-Commit

Signed-off-by: Gagan Juneja <[email protected]>

* Address review comment

Signed-off-by: Gagan Juneja <[email protected]>

* Empty-Commit

Signed-off-by: Gagan Juneja <[email protected]>

* Address review comment

Signed-off-by: Gagan Juneja <[email protected]>

---------

Signed-off-by: Gagan Juneja <[email protected]>
Co-authored-by: Gagan Juneja <[email protected]>
Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
2 people authored and shiv0408 committed Apr 25, 2024
1 parent b74da5a commit ce8d95f
Showing 1 changed file with 38 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
Expand Down Expand Up @@ -252,42 +252,33 @@ public void testEndSpanByClosingSpanScopeMultiple() {
public void testSpanAcrossThreads() {
TracingTelemetry tracingTelemetry = new MockTracingTelemetry();
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
AtomicReference<Span> currentSpanRefThread1 = new AtomicReference<>();
AtomicReference<Span> currentSpanRefThread2 = new AtomicReference<>();
AtomicReference<Span> currentSpanRefAfterEndThread2 = new AtomicReference<>();

AtomicReference<Span> spanRef = new AtomicReference<>();
AtomicReference<Span> spanT2Ref = new AtomicReference<>();

ThreadContextBasedTracerContextStorage spanTracerStorage = new ThreadContextBasedTracerContextStorage(
threadContext,
tracingTelemetry
);
DefaultTracer defaultTracer = new DefaultTracer(tracingTelemetry, spanTracerStorage);

executorService.execute(() -> {
CompletableFuture<?> asyncTask = CompletableFuture.runAsync(() -> {
// create a span
Span span = defaultTracer.startSpan(new SpanCreationContext("span_name_t_1", Attributes.EMPTY));
SpanScope spanScope = defaultTracer.withSpanInScope(span);
spanRef.set(span);

executorService.execute(() -> {
CompletableFuture<?> asyncTask1 = CompletableFuture.runAsync(() -> {
Span spanT2 = defaultTracer.startSpan(new SpanCreationContext("span_name_t_2", Attributes.EMPTY));
SpanScope spanScopeT2 = defaultTracer.withSpanInScope(spanT2);
spanT2Ref.set(spanT2);

currentSpanRefThread2.set(defaultTracer.getCurrentSpan().getSpan());
assertEquals(spanT2, defaultTracer.getCurrentSpan().getSpan());

spanT2.endSpan();
spanScopeT2.close();
currentSpanRefAfterEndThread2.set(getCurrentSpanFromContext(defaultTracer));
});
spanT2.endSpan();
assertEquals(null, defaultTracer.getCurrentSpan());
}, executorService);
asyncTask1.join();
spanScope.close();
currentSpanRefThread1.set(getCurrentSpanFromContext(defaultTracer));
});
assertEquals(spanT2Ref.get(), currentSpanRefThread2.get());
assertEquals(spanRef.get(), currentSpanRefAfterEndThread2.get());
assertEquals(null, currentSpanRefThread1.get());
span.endSpan();
assertEquals(null, defaultTracer.getCurrentSpan());
}, executorService);
asyncTask.join();
}

public void testSpanCloseOnThread2() {
Expand All @@ -297,27 +288,27 @@ public void testSpanCloseOnThread2() {
threadContext,
tracingTelemetry
);
AtomicReference<SpanContext> currentSpanRefThread1 = new AtomicReference<>();
AtomicReference<SpanContext> currentSpanRefThread2 = new AtomicReference<>();
DefaultTracer defaultTracer = new DefaultTracer(tracingTelemetry, spanTracerStorage);
final Span span = defaultTracer.startSpan(new SpanCreationContext("span_name_t1", Attributes.EMPTY));
try (SpanScope spanScope = defaultTracer.withSpanInScope(span)) {
executorService.execute(() -> async(new ActionListener<Boolean>() {
CompletableFuture<?> asyncTask = CompletableFuture.runAsync(() -> async(new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean response) {
span.endSpan();
currentSpanRefThread2.set(defaultTracer.getCurrentSpan());
try (SpanScope s = defaultTracer.withSpanInScope(span)) {
assertEquals(span, defaultTracer.getCurrentSpan().getSpan());
} finally {
span.endSpan();
}
}

@Override
public void onFailure(Exception e) {

}
}));
currentSpanRefThread1.set(defaultTracer.getCurrentSpan());
}), executorService);
assertEquals(span, defaultTracer.getCurrentSpan().getSpan());
asyncTask.join();
}
assertEquals(null, currentSpanRefThread2.get());
assertEquals(span, currentSpanRefThread1.get().getSpan());
assertEquals(null, defaultTracer.getCurrentSpan());
}

Expand All @@ -337,57 +328,45 @@ private void async(ActionListener<Boolean> actionListener) {
public void testSpanAcrossThreadsMultipleSpans() {
TracingTelemetry tracingTelemetry = new MockTracingTelemetry();
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
AtomicReference<Span> currentSpanRefThread1 = new AtomicReference<>();
AtomicReference<Span> currentSpanRefThread2 = new AtomicReference<>();
AtomicReference<Span> currentSpanRefAfterEndThread2 = new AtomicReference<>();

AtomicReference<Span> parentSpanRef = new AtomicReference<>();
AtomicReference<Span> spanRef = new AtomicReference<>();
AtomicReference<Span> spanT2Ref = new AtomicReference<>();

ThreadContextBasedTracerContextStorage spanTracerStorage = new ThreadContextBasedTracerContextStorage(
threadContext,
tracingTelemetry
);
DefaultTracer defaultTracer = new DefaultTracer(tracingTelemetry, spanTracerStorage);

executorService.execute(() -> {
CompletableFuture<?> asyncTask = CompletableFuture.runAsync(() -> {
// create a parent span
Span parentSpan = defaultTracer.startSpan(new SpanCreationContext("p_span_name", Attributes.EMPTY));
SpanScope parentSpanScope = defaultTracer.withSpanInScope(parentSpan);
parentSpanRef.set(parentSpan);
// create a span
Span span = defaultTracer.startSpan(new SpanCreationContext("span_name_t_1", Attributes.EMPTY));
SpanScope spanScope = defaultTracer.withSpanInScope(span);
spanRef.set(span);

executorService.execute(() -> {
CompletableFuture<?> asyncTask1 = CompletableFuture.runAsync(() -> {
Span spanT2 = defaultTracer.startSpan(new SpanCreationContext("span_name_t_2", Attributes.EMPTY));
SpanScope spanScopeT2 = defaultTracer.withSpanInScope(spanT2);

Span spanT21 = defaultTracer.startSpan(new SpanCreationContext("span_name_t_2", Attributes.EMPTY));
SpanScope spanScopeT21 = defaultTracer.withSpanInScope(spanT2);
spanT2Ref.set(spanT21);
currentSpanRefThread2.set(defaultTracer.getCurrentSpan().getSpan());

spanT21.endSpan();
SpanScope spanScopeT21 = defaultTracer.withSpanInScope(spanT21);
assertEquals(spanT21, defaultTracer.getCurrentSpan().getSpan());
spanScopeT21.close();
spanT21.endSpan();

spanT2.endSpan();
spanScopeT2.close();
currentSpanRefAfterEndThread2.set(getCurrentSpanFromContext(defaultTracer));
});
spanT2.endSpan();

assertEquals(null, defaultTracer.getCurrentSpan());
}, executorService);

asyncTask1.join();

spanScope.close();
span.endSpan();
parentSpanScope.close();
currentSpanRefThread1.set(getCurrentSpanFromContext(defaultTracer));
});
assertEquals(spanT2Ref.get(), currentSpanRefThread2.get());
assertEquals(spanRef.get(), currentSpanRefAfterEndThread2.get());
assertEquals(null, currentSpanRefThread1.get());
}

private static Span getCurrentSpanFromContext(DefaultTracer defaultTracer) {
return defaultTracer.getCurrentSpan() != null ? defaultTracer.getCurrentSpan().getSpan() : null;
parentSpan.endSpan();
assertEquals(null, defaultTracer.getCurrentSpan());
}, executorService);
asyncTask.join();
}

public void testClose() throws IOException {
Expand Down

0 comments on commit ce8d95f

Please sign in to comment.