From 6832da5d470b26403df30a75f73d9ccfb1bfba33 Mon Sep 17 00:00:00 2001 From: Gagan Juneja Date: Thu, 4 Jan 2024 16:22:04 +0530 Subject: [PATCH 01/10] Clear transient header from system context Signed-off-by: Gagan Juneja --- .../common/util/concurrent/ThreadContext.java | 12 +++++++++++- .../ThreadContextStatePropagator.java | 10 ++++++++++ .../tasks/TaskThreadContextStatePropagator.java | 5 +++++ .../opensearch/transport/TransportService.java | 17 ++++------------- 4 files changed, 30 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContext.java b/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContext.java index 3da21a6777456..ff871c96e1ad9 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContext.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContext.java @@ -534,7 +534,17 @@ boolean isDefaultContext() { * by the system itself rather than by a user action. */ public void markAsSystemContext() { - threadLocal.set(threadLocal.get().setSystemContext()); + ThreadContextStruct threadContextStruct = threadLocal.get(); + final Map transients = new HashMap<>(); + propagators.forEach(p -> transients.putAll(p.transientsForSystemContext(threadContextStruct.transientHeaders))); + ThreadContextStruct newThreadContextStruct = new ThreadContextStruct( + threadContextStruct.requestHeaders, + threadContextStruct.responseHeaders, + transients, + threadContextStruct.persistentHeaders, + threadContextStruct.isSystemContext + ); + threadLocal.set(newThreadContextStruct.setSystemContext()); } /** diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContextStatePropagator.java b/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContextStatePropagator.java index dac70b0e8124e..73346cdca256b 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContextStatePropagator.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContextStatePropagator.java @@ -10,6 +10,7 @@ import org.opensearch.common.annotation.PublicApi; +import java.util.HashMap; import java.util.Map; /** @@ -27,6 +28,15 @@ public interface ThreadContextStatePropagator { */ Map transients(Map source); + /** + * Returns the list of transient headers that need to be propagated to the child system context. + * @param source current context transient headers + * @return the list of transient headers that needs to be propagated from current context to new thread context + */ + default Map transientsForSystemContext(Map source) { + return new HashMap<>(); + } + /** * Returns the list of request headers that needs to be propagated from current context to request. * @param source current context headers diff --git a/server/src/main/java/org/opensearch/tasks/TaskThreadContextStatePropagator.java b/server/src/main/java/org/opensearch/tasks/TaskThreadContextStatePropagator.java index ed111b34f048f..73b9123df3cc3 100644 --- a/server/src/main/java/org/opensearch/tasks/TaskThreadContextStatePropagator.java +++ b/server/src/main/java/org/opensearch/tasks/TaskThreadContextStatePropagator.java @@ -31,6 +31,11 @@ public Map transients(Map source) { return transients; } + @Override + public Map transientsForSystemContext(Map source) { + return transients(source); + } + @Override public Map headers(Map source) { return Collections.emptyMap(); diff --git a/server/src/main/java/org/opensearch/transport/TransportService.java b/server/src/main/java/org/opensearch/transport/TransportService.java index a1697b1898eeb..d50266d8c9e4a 100644 --- a/server/src/main/java/org/opensearch/transport/TransportService.java +++ b/server/src/main/java/org/opensearch/transport/TransportService.java @@ -868,19 +868,10 @@ public final void sendRequest( final TransportRequestOptions options, final TransportResponseHandler handler ) { - if (connection == localNodeConnection) { - // See please https://github.com/opensearch-project/OpenSearch/issues/10291 - sendRequestAsync(connection, action, request, options, handler); - } else { - final Span span = tracer.startSpan(SpanBuilder.from(action, connection)); - try (SpanScope spanScope = tracer.withSpanInScope(span)) { - TransportResponseHandler traceableTransportResponseHandler = TraceableTransportResponseHandler.create( - handler, - span, - tracer - ); - sendRequestAsync(connection, action, request, options, traceableTransportResponseHandler); - } + final Span span = tracer.startSpan(SpanBuilder.from(action, connection)); + try (SpanScope spanScope = tracer.withSpanInScope(span)) { + TransportResponseHandler traceableTransportResponseHandler = TraceableTransportResponseHandler.create(handler, span, tracer); + sendRequestAsync(connection, action, request, options, traceableTransportResponseHandler); } } From 6e7e68585a2fa8f7af99336f436c5864dc4e96b9 Mon Sep 17 00:00:00 2001 From: Gagan Juneja Date: Thu, 4 Jan 2024 20:14:19 +0530 Subject: [PATCH 02/10] Clear transient header from system context Signed-off-by: Gagan Juneja --- .../common/util/concurrent/ThreadContext.java | 34 +++++++------------ .../ThreadContextStatePropagator.java | 22 +++++------- .../TaskThreadContextStatePropagator.java | 9 ++--- ...hreadContextBasedTracerContextStorage.java | 8 ++--- 4 files changed, 26 insertions(+), 47 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContext.java b/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContext.java index ff871c96e1ad9..6580b0e0085ef 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContext.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContext.java @@ -161,7 +161,7 @@ public StoredContext stashContext() { ); } - final Map transientHeaders = propagateTransients(context.transientHeaders); + final Map transientHeaders = propagateTransients(context.transientHeaders, context.isSystemContext); if (!transientHeaders.isEmpty()) { threadContextStruct = threadContextStruct.putTransient(transientHeaders); } @@ -182,7 +182,7 @@ public StoredContext stashContext() { public Writeable captureAsWriteable() { final ThreadContextStruct context = threadLocal.get(); return out -> { - final Map propagatedHeaders = propagateHeaders(context.transientHeaders); + final Map propagatedHeaders = propagateHeaders(context.transientHeaders, context.isSystemContext); context.writeTo(out, defaultHeader, propagatedHeaders); }; } @@ -245,7 +245,7 @@ public StoredContext newStoredContext(boolean preserveResponseHeaders, Collectio final Map newTransientHeaders = new HashMap<>(originalContext.transientHeaders); boolean transientHeadersModified = false; - final Map transientHeaders = propagateTransients(originalContext.transientHeaders); + final Map transientHeaders = propagateTransients(originalContext.transientHeaders, originalContext.isSystemContext); if (!transientHeaders.isEmpty()) { newTransientHeaders.putAll(transientHeaders); transientHeadersModified = true; @@ -322,7 +322,7 @@ public Supplier wrapRestorable(StoredContext storedContext) { @Override public void writeTo(StreamOutput out) throws IOException { final ThreadContextStruct context = threadLocal.get(); - final Map propagatedHeaders = propagateHeaders(context.transientHeaders); + final Map propagatedHeaders = propagateHeaders(context.transientHeaders, context.isSystemContext); context.writeTo(out, defaultHeader, propagatedHeaders); } @@ -534,17 +534,7 @@ boolean isDefaultContext() { * by the system itself rather than by a user action. */ public void markAsSystemContext() { - ThreadContextStruct threadContextStruct = threadLocal.get(); - final Map transients = new HashMap<>(); - propagators.forEach(p -> transients.putAll(p.transientsForSystemContext(threadContextStruct.transientHeaders))); - ThreadContextStruct newThreadContextStruct = new ThreadContextStruct( - threadContextStruct.requestHeaders, - threadContextStruct.responseHeaders, - transients, - threadContextStruct.persistentHeaders, - threadContextStruct.isSystemContext - ); - threadLocal.set(newThreadContextStruct.setSystemContext()); + threadLocal.set(threadLocal.get().setSystemContext(propagators)); } /** @@ -583,15 +573,15 @@ public static Map buildDefaultHeaders(Settings settings) { } } - private Map propagateTransients(Map source) { + private Map propagateTransients(Map source, boolean isSystemContext) { final Map transients = new HashMap<>(); - propagators.forEach(p -> transients.putAll(p.transients(source))); + propagators.forEach(p -> transients.putAll(p.transients(source, isSystemContext))); return transients; } - private Map propagateHeaders(Map source) { + private Map propagateHeaders(Map source, boolean isSystemContext) { final Map headers = new HashMap<>(); - propagators.forEach(p -> headers.putAll(p.headers(source))); + propagators.forEach(p -> headers.putAll(p.headers(source, isSystemContext))); return headers; } @@ -613,11 +603,13 @@ private static final class ThreadContextStruct { // saving current warning headers' size not to recalculate the size with every new warning header private final long warningHeadersSize; - private ThreadContextStruct setSystemContext() { + private ThreadContextStruct setSystemContext(final List propagators) { if (isSystemContext) { return this; } - return new ThreadContextStruct(requestHeaders, responseHeaders, transientHeaders, persistentHeaders, true); + final Map transients = new HashMap<>(); + propagators.forEach(p -> transients.putAll(p.transients(transientHeaders, true))); + return new ThreadContextStruct(requestHeaders, responseHeaders, transients, persistentHeaders, true); } private ThreadContextStruct( diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContextStatePropagator.java b/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContextStatePropagator.java index 73346cdca256b..d6b65b4c0bdf9 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContextStatePropagator.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContextStatePropagator.java @@ -10,7 +10,6 @@ import org.opensearch.common.annotation.PublicApi; -import java.util.HashMap; import java.util.Map; /** @@ -23,24 +22,19 @@ public interface ThreadContextStatePropagator { /** * Returns the list of transient headers that needs to be propagated from current context to new thread context. - * @param source current context transient headers + * + * @param source current context transient headers + * @param isSystemContext if the propagation is for system context. * @return the list of transient headers that needs to be propagated from current context to new thread context */ - Map transients(Map source); - - /** - * Returns the list of transient headers that need to be propagated to the child system context. - * @param source current context transient headers - * @return the list of transient headers that needs to be propagated from current context to new thread context - */ - default Map transientsForSystemContext(Map source) { - return new HashMap<>(); - } + Map transients(Map source, boolean isSystemContext); /** * Returns the list of request headers that needs to be propagated from current context to request. - * @param source current context headers + * + * @param source current context headers + * @param isSystemContext if the propagation is for system context. * @return the list of request headers that needs to be propagated from current context to request */ - Map headers(Map source); + Map headers(Map source, boolean isSystemContext); } diff --git a/server/src/main/java/org/opensearch/tasks/TaskThreadContextStatePropagator.java b/server/src/main/java/org/opensearch/tasks/TaskThreadContextStatePropagator.java index 73b9123df3cc3..b7ed5e620b73b 100644 --- a/server/src/main/java/org/opensearch/tasks/TaskThreadContextStatePropagator.java +++ b/server/src/main/java/org/opensearch/tasks/TaskThreadContextStatePropagator.java @@ -21,7 +21,7 @@ */ public class TaskThreadContextStatePropagator implements ThreadContextStatePropagator { @Override - public Map transients(Map source) { + public Map transients(Map source, boolean isSystemContext) { final Map transients = new HashMap<>(); if (source.containsKey(TASK_ID)) { @@ -32,12 +32,7 @@ public Map transients(Map source) { } @Override - public Map transientsForSystemContext(Map source) { - return transients(source); - } - - @Override - public Map headers(Map source) { + public Map headers(Map source, boolean isSystemContext) { return Collections.emptyMap(); } } diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorage.java b/server/src/main/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorage.java index 863f56d9fbe94..756d364cfb80d 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorage.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorage.java @@ -50,21 +50,19 @@ public void put(String key, Span span) { } @Override - public Map transients(Map source) { + public Map transients(Map source, boolean isSystemContext) { final Map transients = new HashMap<>(); - - if (source.containsKey(CURRENT_SPAN)) { + if (isSystemContext == false && source.containsKey(CURRENT_SPAN)) { final SpanReference current = (SpanReference) source.get(CURRENT_SPAN); if (current != null) { transients.put(CURRENT_SPAN, new SpanReference(current.getSpan())); } } - return transients; } @Override - public Map headers(Map source) { + public Map headers(Map source, boolean isSystemContext) { final Map headers = new HashMap<>(); if (source.containsKey(CURRENT_SPAN)) { From daeadd26bb6929b1dd921f85910af839e166534a Mon Sep 17 00:00:00 2001 From: Gagan Juneja Date: Thu, 4 Jan 2024 21:40:46 +0530 Subject: [PATCH 03/10] Adds changelog Signed-off-by: Gagan Juneja --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f5191c5e04a41..6fcacfe6ebe98 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -206,6 +206,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix template setting override for replication type ([#11417](https://github.com/opensearch-project/OpenSearch/pull/11417)) - Fix Automatic addition of protocol broken in #11512 ([#11609](https://github.com/opensearch-project/OpenSearch/pull/11609)) - Fix issue when calling Delete PIT endpoint and no PITs exist ([#11711](https://github.com/opensearch-project/OpenSearch/pull/11711)) +- Fix parent mapping in the local transport instrumentation ([#11490](https://github.com/opensearch-project/OpenSearch/pull/11490)) ### Security From 1d0e11c5a6e5c5e8c61f0913d9027ea0cc4444b6 Mon Sep 17 00:00:00 2001 From: Gagan Juneja Date: Thu, 4 Jan 2024 22:00:52 +0530 Subject: [PATCH 04/10] Update CHANGELOG.md Co-authored-by: Andriy Redko Signed-off-by: Gagan Juneja --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6fcacfe6ebe98..dc195195edd51 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -206,7 +206,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix template setting override for replication type ([#11417](https://github.com/opensearch-project/OpenSearch/pull/11417)) - Fix Automatic addition of protocol broken in #11512 ([#11609](https://github.com/opensearch-project/OpenSearch/pull/11609)) - Fix issue when calling Delete PIT endpoint and no PITs exist ([#11711](https://github.com/opensearch-project/OpenSearch/pull/11711)) -- Fix parent mapping in the local transport instrumentation ([#11490](https://github.com/opensearch-project/OpenSearch/pull/11490)) +- Fix tracing context propagation for local transport instrumentation ([#11490](https://github.com/opensearch-project/OpenSearch/pull/11490)) ### Security From 04867e8b9270fce958c90fe27195e5e095bcf02a Mon Sep 17 00:00:00 2001 From: Gagan Juneja Date: Fri, 5 Jan 2024 15:26:38 +0530 Subject: [PATCH 05/10] Adds unit tests Signed-off-by: Gagan Juneja --- .../util/concurrent/ThreadContextTests.java | 67 +++++++++++++++++++ ...TaskThreadContextStatePropagatorTests.java | 34 ++++++++++ ...ContextBasedTracerContextStorageTests.java | 16 +++++ 3 files changed, 117 insertions(+) create mode 100644 server/src/test/java/org/opensearch/tasks/TaskThreadContextStatePropagatorTests.java diff --git a/server/src/test/java/org/opensearch/common/util/concurrent/ThreadContextTests.java b/server/src/test/java/org/opensearch/common/util/concurrent/ThreadContextTests.java index a0531c76bf897..7027d36f3b920 100644 --- a/server/src/test/java/org/opensearch/common/util/concurrent/ThreadContextTests.java +++ b/server/src/test/java/org/opensearch/common/util/concurrent/ThreadContextTests.java @@ -740,6 +740,73 @@ public void testMarkAsSystemContext() throws IOException { assertFalse(threadContext.isSystemContext()); } + public void testSystemContextWithPropagator() { + Settings build = Settings.builder().put("request.headers.default", "1").build(); + ThreadContext threadContext = new ThreadContext(build); + threadContext.registerThreadContextStatePropagator(createDummyPropagator(("test_transient_propagation_key"))); + threadContext.putHeader("foo", "bar"); + threadContext.putTransient("test_transient_propagation_key", 1); + assertEquals(Integer.valueOf(1), threadContext.getTransient("test_transient_propagation_key")); + assertEquals("bar", threadContext.getHeader("foo")); + try (ThreadContext.StoredContext ctx = threadContext.stashContext()) { + threadContext.markAsSystemContext(); + assertNull(threadContext.getHeader("foo")); + assertNull(threadContext.getTransient("test_transient_propagation_key")); + assertEquals("1", threadContext.getHeader("default")); + } + + assertEquals("bar", threadContext.getHeader("foo")); + assertEquals(Integer.valueOf(1), threadContext.getTransient("test_transient_propagation_key")); + assertEquals("1", threadContext.getHeader("default")); + } + + public void testSerializeSystemContext() throws IOException { + Settings build = Settings.builder().put("request.headers.default", "1").build(); + ThreadContext threadContext = new ThreadContext(build); + threadContext.registerThreadContextStatePropagator(createDummyPropagator(("test_transient_propagation_key"))); + threadContext.putHeader("foo", "bar"); + threadContext.putTransient("test_transient_propagation_key", "test"); + BytesStreamOutput out = new BytesStreamOutput(); + BytesStreamOutput outFromSystemContext = new BytesStreamOutput(); + threadContext.writeTo(out); + try (ThreadContext.StoredContext ctx = threadContext.stashContext()) { + assertEquals("test", threadContext.getTransient("test_transient_propagation_key")); + threadContext.markAsSystemContext(); + threadContext.writeTo(outFromSystemContext); + assertNull(threadContext.getHeader("foo")); + assertNull(threadContext.getTransient("test_transient_propagation_key")); + threadContext.readHeaders(outFromSystemContext.bytes().streamInput()); + assertNull(threadContext.getHeader("test_transient_propagation_key")); + } + assertEquals("test", threadContext.getTransient("test_transient_propagation_key")); + threadContext.readHeaders(out.bytes().streamInput()); + assertEquals("bar", threadContext.getHeader("foo")); + assertEquals("test", threadContext.getHeader("test_transient_propagation_key")); + assertEquals("1", threadContext.getHeader("default")); + } + + private ThreadContextStatePropagator createDummyPropagator(final String key) { + return new ThreadContextStatePropagator() { + @Override + public Map transients(Map source, boolean isSystemContext) { + Map transients = new HashMap<>(); + if (isSystemContext == false && source.containsKey(key)) { + transients.put(key, source.get(key)); + } + return transients; + } + + @Override + public Map headers(Map source, boolean isSystemContext) { + Map headers = new HashMap<>(); + if (isSystemContext == false && source.containsKey(key)) { + headers.put(key, (String) source.get(key)); + } + return headers; + } + }; + } + public void testPutHeaders() { Settings build = Settings.builder().put("request.headers.default", "1").build(); ThreadContext threadContext = new ThreadContext(build); diff --git a/server/src/test/java/org/opensearch/tasks/TaskThreadContextStatePropagatorTests.java b/server/src/test/java/org/opensearch/tasks/TaskThreadContextStatePropagatorTests.java new file mode 100644 index 0000000000000..bfa0d566aabd7 --- /dev/null +++ b/server/src/test/java/org/opensearch/tasks/TaskThreadContextStatePropagatorTests.java @@ -0,0 +1,34 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.tasks; + +import org.opensearch.test.OpenSearchTestCase; + +import java.util.HashMap; +import java.util.Map; + +import static org.opensearch.tasks.TaskResourceTrackingService.TASK_ID; + +public class TaskThreadContextStatePropagatorTests extends OpenSearchTestCase { + private final TaskThreadContextStatePropagator taskThreadContextStatePropagator = new TaskThreadContextStatePropagator(); + + public void testTransient() { + Map transientHeader = new HashMap<>(); + transientHeader.put(TASK_ID, "t_1"); + Map transientPropagatedHeader = taskThreadContextStatePropagator.transients(transientHeader, false); + assertEquals("t_1", transientPropagatedHeader.get(TASK_ID)); + } + + public void testTransientForSystemContext() { + Map transientHeader = new HashMap<>(); + transientHeader.put(TASK_ID, "t_1"); + Map transientPropagatedHeader = taskThreadContextStatePropagator.transients(transientHeader, true); + assertEquals("t_1", transientPropagatedHeader.get(TASK_ID)); + } +} diff --git a/server/src/test/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorageTests.java b/server/src/test/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorageTests.java index ee816aa5f596d..bf11bcaf39a96 100644 --- a/server/src/test/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorageTests.java +++ b/server/src/test/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorageTests.java @@ -252,4 +252,20 @@ public void run() { assertThat(threadContext.getTransient(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(not(nullValue()))); assertThat(threadContextStorage.get(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(nullValue())); } + + public void testSpanNotPropagatedToChildSystemThreadContext() { + final Span span = tracer.startSpan(SpanCreationContext.internal().name("test")); + + try (SpanScope scope = tracer.withSpanInScope(span)) { + try (StoredContext ignored = threadContext.stashContext()) { + assertThat(threadContext.getTransient(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(not(nullValue()))); + assertThat(threadContextStorage.get(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(span)); + threadContext.markAsSystemContext(); + assertThat(threadContext.getTransient(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(nullValue())); + } + } + + assertThat(threadContext.getTransient(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(not(nullValue()))); + assertThat(threadContextStorage.get(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(nullValue())); + } } From 220aff14c8b61379348a2d1fd271b56c5407e1ef Mon Sep 17 00:00:00 2001 From: Gagan Juneja Date: Fri, 5 Jan 2024 23:28:17 +0530 Subject: [PATCH 06/10] Refactor code Signed-off-by: Gagan Juneja --- .../ThreadContextStatePropagator.java | 24 +++++++++++++++-- .../TaskThreadContextStatePropagator.java | 4 +-- ...hreadContextBasedTracerContextStorage.java | 16 ++++++++--- .../util/concurrent/ThreadContextTests.java | 27 ++++++++++++++++--- 4 files changed, 60 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContextStatePropagator.java b/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContextStatePropagator.java index d6b65b4c0bdf9..1010dcc599b90 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContextStatePropagator.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContextStatePropagator.java @@ -20,6 +20,14 @@ */ @PublicApi(since = "2.8.0") public interface ThreadContextStatePropagator { + /** + * Returns the list of transient headers that needs to be propagated from current context to new thread context. + * + * @param source current context transient headers + * @return the list of transient headers that needs to be propagated from current context to new thread context + */ + Map transients(Map source); + /** * Returns the list of transient headers that needs to be propagated from current context to new thread context. * @@ -27,7 +35,17 @@ public interface ThreadContextStatePropagator { * @param isSystemContext if the propagation is for system context. * @return the list of transient headers that needs to be propagated from current context to new thread context */ - Map transients(Map source, boolean isSystemContext); + default Map transients(Map source, boolean isSystemContext) { + return transients(source); + }; + + /** + * Returns the list of request headers that needs to be propagated from current context to request. + * + * @param source current context headers + * @return the list of request headers that needs to be propagated from current context to request + */ + Map headers(Map source); /** * Returns the list of request headers that needs to be propagated from current context to request. @@ -36,5 +54,7 @@ public interface ThreadContextStatePropagator { * @param isSystemContext if the propagation is for system context. * @return the list of request headers that needs to be propagated from current context to request */ - Map headers(Map source, boolean isSystemContext); + default Map headers(Map source, boolean isSystemContext) { + return headers(source); + } } diff --git a/server/src/main/java/org/opensearch/tasks/TaskThreadContextStatePropagator.java b/server/src/main/java/org/opensearch/tasks/TaskThreadContextStatePropagator.java index b7ed5e620b73b..ed111b34f048f 100644 --- a/server/src/main/java/org/opensearch/tasks/TaskThreadContextStatePropagator.java +++ b/server/src/main/java/org/opensearch/tasks/TaskThreadContextStatePropagator.java @@ -21,7 +21,7 @@ */ public class TaskThreadContextStatePropagator implements ThreadContextStatePropagator { @Override - public Map transients(Map source, boolean isSystemContext) { + public Map transients(Map source) { final Map transients = new HashMap<>(); if (source.containsKey(TASK_ID)) { @@ -32,7 +32,7 @@ public Map transients(Map source, boolean isSyst } @Override - public Map headers(Map source, boolean isSystemContext) { + public Map headers(Map source) { return Collections.emptyMap(); } } diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorage.java b/server/src/main/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorage.java index 756d364cfb80d..99a28b19e2921 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorage.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorage.java @@ -12,6 +12,7 @@ import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.util.concurrent.ThreadContextStatePropagator; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -50,9 +51,9 @@ public void put(String key, Span span) { } @Override - public Map transients(Map source, boolean isSystemContext) { + public Map transients(Map source) { final Map transients = new HashMap<>(); - if (isSystemContext == false && source.containsKey(CURRENT_SPAN)) { + if (source.containsKey(CURRENT_SPAN)) { final SpanReference current = (SpanReference) source.get(CURRENT_SPAN); if (current != null) { transients.put(CURRENT_SPAN, new SpanReference(current.getSpan())); @@ -62,7 +63,16 @@ public Map transients(Map source, boolean isSyst } @Override - public Map headers(Map source, boolean isSystemContext) { + public Map transients(Map source, boolean isSystemContext) { + if (isSystemContext == true) { + return Collections.emptyMap(); + } else { + return transients(source); + } + } + + @Override + public Map headers(Map source) { final Map headers = new HashMap<>(); if (source.containsKey(CURRENT_SPAN)) { diff --git a/server/src/test/java/org/opensearch/common/util/concurrent/ThreadContextTests.java b/server/src/test/java/org/opensearch/common/util/concurrent/ThreadContextTests.java index 7027d36f3b920..8fb9b71b1332f 100644 --- a/server/src/test/java/org/opensearch/common/util/concurrent/ThreadContextTests.java +++ b/server/src/test/java/org/opensearch/common/util/concurrent/ThreadContextTests.java @@ -787,23 +787,42 @@ public void testSerializeSystemContext() throws IOException { private ThreadContextStatePropagator createDummyPropagator(final String key) { return new ThreadContextStatePropagator() { + @Override - public Map transients(Map source, boolean isSystemContext) { + public Map transients(Map source) { Map transients = new HashMap<>(); - if (isSystemContext == false && source.containsKey(key)) { + if (source.containsKey(key)) { transients.put(key, source.get(key)); } return transients; } @Override - public Map headers(Map source, boolean isSystemContext) { + public Map transients(Map source, boolean isSystemContext) { + if (isSystemContext == true) { + return Collections.emptyMap(); + } else { + return transients(source); + } + } + + @Override + public Map headers(Map source) { Map headers = new HashMap<>(); - if (isSystemContext == false && source.containsKey(key)) { + if (source.containsKey(key)) { headers.put(key, (String) source.get(key)); } return headers; } + + @Override + public Map headers(Map source, boolean isSystemContext) { + if (isSystemContext == true) { + return Collections.emptyMap(); + } else { + return headers(source); + } + } }; } From 9b9a8951ba82916b1da2f3caeaec0af357ee654f Mon Sep 17 00:00:00 2001 From: Gagan Juneja Date: Mon, 8 Jan 2024 22:00:26 +0530 Subject: [PATCH 07/10] Refactor code Signed-off-by: Gagan Juneja --- .../ThreadContextStatePropagator.java | 2 + ...hreadContextBasedTracerContextStorage.java | 5 ++ .../util/concurrent/ThreadContextTests.java | 67 +++++++------------ 3 files changed, 31 insertions(+), 43 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContextStatePropagator.java b/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContextStatePropagator.java index 1010dcc599b90..e8c12ae13d5eb 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContextStatePropagator.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContextStatePropagator.java @@ -26,6 +26,7 @@ public interface ThreadContextStatePropagator { * @param source current context transient headers * @return the list of transient headers that needs to be propagated from current context to new thread context */ + @Deprecated(since = "2.12.0", forRemoval = true) Map transients(Map source); /** @@ -45,6 +46,7 @@ default Map transients(Map source, boolean isSys * @param source current context headers * @return the list of request headers that needs to be propagated from current context to request */ + @Deprecated(since = "2.12.0", forRemoval = true) Map headers(Map source); /** diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorage.java b/server/src/main/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorage.java index 99a28b19e2921..7a2f4c7a4f4b4 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorage.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorage.java @@ -85,6 +85,11 @@ public Map headers(Map source) { return headers; } + @Override + public Map headers(Map source, boolean isSystemContext) { + return headers(source); + } + Span getCurrentSpan(String key) { SpanReference currentSpanRef = threadContext.getTransient(key); return (currentSpanRef == null) ? null : currentSpanRef.getSpan(); diff --git a/server/src/test/java/org/opensearch/common/util/concurrent/ThreadContextTests.java b/server/src/test/java/org/opensearch/common/util/concurrent/ThreadContextTests.java index 8fb9b71b1332f..10669ca1a805b 100644 --- a/server/src/test/java/org/opensearch/common/util/concurrent/ThreadContextTests.java +++ b/server/src/test/java/org/opensearch/common/util/concurrent/ThreadContextTests.java @@ -44,6 +44,8 @@ import java.util.Map; import java.util.function.Supplier; +import org.mockito.Mockito; + import static org.opensearch.tasks.TaskResourceTrackingService.TASK_ID; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; @@ -742,8 +744,18 @@ public void testMarkAsSystemContext() throws IOException { public void testSystemContextWithPropagator() { Settings build = Settings.builder().put("request.headers.default", "1").build(); + Map transientHeaderMap = Collections.singletonMap("test_transient_propagation_key", "test"); + Map transientHeaderTransformedMap = Collections.singletonMap("test_transient_propagation_key", "test"); + Map headerMap = Collections.singletonMap("test_transient_propagation_key", "test"); + Map headerTransformedMap = Collections.singletonMap("test_transient_propagation_key", "test"); ThreadContext threadContext = new ThreadContext(build); - threadContext.registerThreadContextStatePropagator(createDummyPropagator(("test_transient_propagation_key"))); + ThreadContextStatePropagator mockPropagator = Mockito.mock(ThreadContextStatePropagator.class); + Mockito.when(mockPropagator.transients(transientHeaderMap, true)).thenReturn(Collections.emptyMap()); + Mockito.when(mockPropagator.transients(transientHeaderMap, false)).thenReturn(transientHeaderTransformedMap); + + Mockito.when(mockPropagator.headers(headerMap, true)).thenReturn(headerTransformedMap); + Mockito.when(mockPropagator.headers(headerMap, false)).thenReturn(headerTransformedMap); + threadContext.registerThreadContextStatePropagator(mockPropagator); threadContext.putHeader("foo", "bar"); threadContext.putTransient("test_transient_propagation_key", 1); assertEquals(Integer.valueOf(1), threadContext.getTransient("test_transient_propagation_key")); @@ -762,8 +774,18 @@ public void testSystemContextWithPropagator() { public void testSerializeSystemContext() throws IOException { Settings build = Settings.builder().put("request.headers.default", "1").build(); + Map transientHeaderMap = Collections.singletonMap("test_transient_propagation_key", "test"); + Map transientHeaderTransformedMap = Collections.singletonMap("test_transient_propagation_key", "test"); + Map headerMap = Collections.singletonMap("test_transient_propagation_key", "test"); + Map headerTransformedMap = Collections.singletonMap("test_transient_propagation_key", "test"); ThreadContext threadContext = new ThreadContext(build); - threadContext.registerThreadContextStatePropagator(createDummyPropagator(("test_transient_propagation_key"))); + ThreadContextStatePropagator mockPropagator = Mockito.mock(ThreadContextStatePropagator.class); + Mockito.when(mockPropagator.transients(transientHeaderMap, true)).thenReturn(Collections.emptyMap()); + Mockito.when(mockPropagator.transients(transientHeaderMap, false)).thenReturn(transientHeaderTransformedMap); + + Mockito.when(mockPropagator.headers(headerMap, true)).thenReturn(headerTransformedMap); + Mockito.when(mockPropagator.headers(headerMap, false)).thenReturn(headerTransformedMap); + threadContext.registerThreadContextStatePropagator(mockPropagator); threadContext.putHeader("foo", "bar"); threadContext.putTransient("test_transient_propagation_key", "test"); BytesStreamOutput out = new BytesStreamOutput(); @@ -785,47 +807,6 @@ public void testSerializeSystemContext() throws IOException { assertEquals("1", threadContext.getHeader("default")); } - private ThreadContextStatePropagator createDummyPropagator(final String key) { - return new ThreadContextStatePropagator() { - - @Override - public Map transients(Map source) { - Map transients = new HashMap<>(); - if (source.containsKey(key)) { - transients.put(key, source.get(key)); - } - return transients; - } - - @Override - public Map transients(Map source, boolean isSystemContext) { - if (isSystemContext == true) { - return Collections.emptyMap(); - } else { - return transients(source); - } - } - - @Override - public Map headers(Map source) { - Map headers = new HashMap<>(); - if (source.containsKey(key)) { - headers.put(key, (String) source.get(key)); - } - return headers; - } - - @Override - public Map headers(Map source, boolean isSystemContext) { - if (isSystemContext == true) { - return Collections.emptyMap(); - } else { - return headers(source); - } - } - }; - } - public void testPutHeaders() { Settings build = Settings.builder().put("request.headers.default", "1").build(); ThreadContext threadContext = new ThreadContext(build); From 569735282b86e7eb16443ece2d97324c5aa15cf3 Mon Sep 17 00:00:00 2001 From: Gagan Juneja Date: Mon, 8 Jan 2024 22:21:23 +0530 Subject: [PATCH 08/10] Refactor code Signed-off-by: Gagan Juneja --- .../tracing/ThreadContextBasedTracerContextStorage.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorage.java b/server/src/main/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorage.java index 7a2f4c7a4f4b4..908164d1935a7 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorage.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorage.java @@ -51,6 +51,7 @@ public void put(String key, Span span) { } @Override + @SuppressWarnings("removal") public Map transients(Map source) { final Map transients = new HashMap<>(); if (source.containsKey(CURRENT_SPAN)) { @@ -72,6 +73,7 @@ public Map transients(Map source, boolean isSyst } @Override + @SuppressWarnings("removal") public Map headers(Map source) { final Map headers = new HashMap<>(); From ad29c2e07d318371de32cc4329aaa78b32f9fd5e Mon Sep 17 00:00:00 2001 From: Gagan Juneja Date: Mon, 8 Jan 2024 22:41:24 +0530 Subject: [PATCH 09/10] Supress warning Signed-off-by: Gagan Juneja --- .../tasks/TaskThreadContextStatePropagator.java | 13 ++++++++++++- .../ThreadContextBasedTracerContextStorage.java | 2 -- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/tasks/TaskThreadContextStatePropagator.java b/server/src/main/java/org/opensearch/tasks/TaskThreadContextStatePropagator.java index ed111b34f048f..dd95329e57a59 100644 --- a/server/src/main/java/org/opensearch/tasks/TaskThreadContextStatePropagator.java +++ b/server/src/main/java/org/opensearch/tasks/TaskThreadContextStatePropagator.java @@ -20,7 +20,8 @@ * Propagates TASK_ID across thread contexts */ public class TaskThreadContextStatePropagator implements ThreadContextStatePropagator { - @Override + + @SuppressWarnings("removal") public Map transients(Map source) { final Map transients = new HashMap<>(); @@ -32,7 +33,17 @@ public Map transients(Map source) { } @Override + public Map transients(Map source, boolean isSystemContext) { + return transients(source); + } + + @SuppressWarnings("removal") public Map headers(Map source) { return Collections.emptyMap(); } + + @Override + public Map headers(Map source, boolean isSystemContext) { + return headers(source); + } } diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorage.java b/server/src/main/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorage.java index 908164d1935a7..6638b8e555979 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorage.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorage.java @@ -50,7 +50,6 @@ public void put(String key, Span span) { } } - @Override @SuppressWarnings("removal") public Map transients(Map source) { final Map transients = new HashMap<>(); @@ -72,7 +71,6 @@ public Map transients(Map source, boolean isSyst } } - @Override @SuppressWarnings("removal") public Map headers(Map source) { final Map headers = new HashMap<>(); From c5614ad2a77d99b20cbaa86216976f52bb681b2f Mon Sep 17 00:00:00 2001 From: Gagan Juneja Date: Mon, 8 Jan 2024 23:33:00 +0530 Subject: [PATCH 10/10] Refactor code Signed-off-by: Gagan Juneja --- .../org/opensearch/tasks/TaskThreadContextStatePropagator.java | 2 ++ .../tracing/ThreadContextBasedTracerContextStorage.java | 2 ++ 2 files changed, 4 insertions(+) diff --git a/server/src/main/java/org/opensearch/tasks/TaskThreadContextStatePropagator.java b/server/src/main/java/org/opensearch/tasks/TaskThreadContextStatePropagator.java index dd95329e57a59..99559e45aaaee 100644 --- a/server/src/main/java/org/opensearch/tasks/TaskThreadContextStatePropagator.java +++ b/server/src/main/java/org/opensearch/tasks/TaskThreadContextStatePropagator.java @@ -21,6 +21,7 @@ */ public class TaskThreadContextStatePropagator implements ThreadContextStatePropagator { + @Override @SuppressWarnings("removal") public Map transients(Map source) { final Map transients = new HashMap<>(); @@ -37,6 +38,7 @@ public Map transients(Map source, boolean isSyst return transients(source); } + @Override @SuppressWarnings("removal") public Map headers(Map source) { return Collections.emptyMap(); diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorage.java b/server/src/main/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorage.java index 6638b8e555979..908164d1935a7 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorage.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorage.java @@ -50,6 +50,7 @@ public void put(String key, Span span) { } } + @Override @SuppressWarnings("removal") public Map transients(Map source) { final Map transients = new HashMap<>(); @@ -71,6 +72,7 @@ public Map transients(Map source, boolean isSyst } } + @Override @SuppressWarnings("removal") public Map headers(Map source) { final Map headers = new HashMap<>();