From d7a750ca7a0c7e52019abcad5659ea71f59fd8f5 Mon Sep 17 00:00:00 2001 From: Gagan Juneja Date: Thu, 14 Sep 2023 15:49:50 +0530 Subject: [PATCH] Add instrumentation to transport service Signed-off-by: Gagan Juneja --- .../single/SingleNodeDiscoveryIT.java | 2 + .../TraceableTransportResponseHandler.java | 107 ++++++++++++++++++ .../tracing/handler/package-info.java | 12 ++ .../transport/TransportResponseHandler.java | 2 + .../transport/TransportService.java | 99 +++++++++------- .../test/OpenSearchIntegTestCase.java | 3 +- 6 files changed, 181 insertions(+), 44 deletions(-) create mode 100644 server/src/main/java/org/opensearch/telemetry/tracing/handler/TraceableTransportResponseHandler.java create mode 100644 server/src/main/java/org/opensearch/telemetry/tracing/handler/package-info.java diff --git a/server/src/internalClusterTest/java/org/opensearch/discovery/single/SingleNodeDiscoveryIT.java b/server/src/internalClusterTest/java/org/opensearch/discovery/single/SingleNodeDiscoveryIT.java index 90bdcf7fded11..1f6c8eac6c391 100644 --- a/server/src/internalClusterTest/java/org/opensearch/discovery/single/SingleNodeDiscoveryIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/discovery/single/SingleNodeDiscoveryIT.java @@ -76,6 +76,7 @@ public void testSingleNodesDoNotDiscoverEachOther() throws IOException, Interrup @Override public Settings nodeSettings(int nodeOrdinal) { return Settings.builder() + .put(featureFlagSettings()) .put("discovery.type", "single-node") .put("transport.type", getTestTransportType()) /* @@ -142,6 +143,7 @@ public boolean innerMatch(final LogEvent event) { @Override public Settings nodeSettings(int nodeOrdinal) { return Settings.builder() + .put(featureFlagSettings()) .put("discovery.type", "zen") .put("transport.type", getTestTransportType()) .put(DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.getKey(), "0s") diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/handler/TraceableTransportResponseHandler.java b/server/src/main/java/org/opensearch/telemetry/tracing/handler/TraceableTransportResponseHandler.java new file mode 100644 index 0000000000000..abddfcc6cebc1 --- /dev/null +++ b/server/src/main/java/org/opensearch/telemetry/tracing/handler/TraceableTransportResponseHandler.java @@ -0,0 +1,107 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.telemetry.tracing.handler; + +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.transport.TransportResponse; +import org.opensearch.telemetry.tracing.Span; +import org.opensearch.telemetry.tracing.SpanScope; +import org.opensearch.telemetry.tracing.Tracer; +import org.opensearch.transport.TransportException; +import org.opensearch.transport.TransportResponseHandler; + +import java.io.IOException; +import java.util.Objects; + +/** + * Tracer wrapped {@link TransportResponseHandler} + * @param TransportResponse + */ +public class TraceableTransportResponseHandler implements TransportResponseHandler { + + private final Span span; + private final TransportResponseHandler delegate; + private final Tracer tracer; + + /** + * Constructor. + * + * @param delegate delegate + * @param span span + * @param tracer tracer + */ + private TraceableTransportResponseHandler(TransportResponseHandler delegate, Span span, Tracer tracer) { + this.delegate = Objects.requireNonNull(delegate); + this.span = Objects.requireNonNull(span); + this.tracer = Objects.requireNonNull(tracer); + } + + /** + * Factory method. + * @param delegate delegate + * @param span span + * @param tracer tracer + * @return transportResponseHandler + */ + public static TransportResponseHandler create( + TransportResponseHandler delegate, + Span span, + Tracer tracer + ) { + if (FeatureFlags.isEnabled(FeatureFlags.TELEMETRY) == true) { + return new TraceableTransportResponseHandler(delegate, span, tracer); + } else { + return delegate; + } + } + + @Override + public T read(StreamInput in) throws IOException { + return delegate.read(in); + } + + @Override + public void handleResponse(T response) { + try (SpanScope scope = tracer.withSpanInScope(span)) { + delegate.handleResponse(response); + } finally { + span.endSpan(); + } + } + + @Override + public void handleException(TransportException exp) { + try (SpanScope scope = tracer.withSpanInScope(span)) { + delegate.handleException(exp); + } finally { + span.setError(exp); + span.endSpan(); + } + } + + @Override + public String executor() { + return delegate.executor(); + } + + @Override + public String toString() { + return delegate.toString(); + } + + @Override + public void handleRejection(Exception exp) { + try (SpanScope scope = tracer.withSpanInScope(span)) { + delegate.handleRejection(exp); + } finally { + span.endSpan(); + } + } +} diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/handler/package-info.java b/server/src/main/java/org/opensearch/telemetry/tracing/handler/package-info.java new file mode 100644 index 0000000000000..ff9f8f57dc07c --- /dev/null +++ b/server/src/main/java/org/opensearch/telemetry/tracing/handler/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * This package contains classes needed for tracing requests. + */ +package org.opensearch.telemetry.tracing.handler; diff --git a/server/src/main/java/org/opensearch/transport/TransportResponseHandler.java b/server/src/main/java/org/opensearch/transport/TransportResponseHandler.java index 0b39983cc3bee..1388b006ca8b1 100644 --- a/server/src/main/java/org/opensearch/transport/TransportResponseHandler.java +++ b/server/src/main/java/org/opensearch/transport/TransportResponseHandler.java @@ -52,6 +52,8 @@ public interface TransportResponseHandler extends W String executor(); + default void handleRejection(Exception exp) {}; + default TransportResponseHandler wrap(Function converter, Writeable.Reader reader) { final TransportResponseHandler self = this; return new TransportResponseHandler() { diff --git a/server/src/main/java/org/opensearch/transport/TransportService.java b/server/src/main/java/org/opensearch/transport/TransportService.java index 52274872e8cc8..32bedb52a9cef 100644 --- a/server/src/main/java/org/opensearch/transport/TransportService.java +++ b/server/src/main/java/org/opensearch/transport/TransportService.java @@ -66,7 +66,11 @@ import org.opensearch.node.NodeClosedException; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskManager; +import org.opensearch.telemetry.tracing.Span; +import org.opensearch.telemetry.tracing.SpanBuilder; +import org.opensearch.telemetry.tracing.SpanScope; import org.opensearch.telemetry.tracing.Tracer; +import org.opensearch.telemetry.tracing.handler.TraceableTransportResponseHandler; import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.ThreadPool; @@ -333,6 +337,7 @@ protected void doStop() { getExecutorService().execute(new AbstractRunnable() { @Override public void onRejection(Exception e) { + holderToNotify.handler().handleRejection(e); // if we get rejected during node shutdown we don't wanna bubble it up logger.debug( () -> new ParameterizedMessage( @@ -345,6 +350,7 @@ public void onRejection(Exception e) { @Override public void onFailure(Exception e) { + holderToNotify.handler().handleRejection(e); logger.warn( () -> new ParameterizedMessage( "failed to notify response handler on exception, action: {}", @@ -861,53 +867,60 @@ public final void sendRequest( final TransportRequestOptions options, final TransportResponseHandler handler ) { - try { - logger.debug("Action: " + action); - final TransportResponseHandler delegate; - if (request.getParentTask().isSet()) { - // TODO: capture the connection instead so that we can cancel child tasks on the remote connections. - final Releasable unregisterChildNode = taskManager.registerChildNode(request.getParentTask().getId(), connection.getNode()); - delegate = new TransportResponseHandler() { - @Override - public void handleResponse(T response) { - unregisterChildNode.close(); - handler.handleResponse(response); - } + final Span span = tracer.startSpan(SpanBuilder.from(action, connection)); + try (SpanScope spanScope = tracer.withSpanInScope(span)) { + TransportResponseHandler traceableTransportResponseHandler = TraceableTransportResponseHandler.create(handler, span, tracer); + try { + logger.debug("Action: " + action); + final TransportResponseHandler delegate; + if (request.getParentTask().isSet()) { + // TODO: capture the connection instead so that we can cancel child tasks on the remote connections. + final Releasable unregisterChildNode = taskManager.registerChildNode( + request.getParentTask().getId(), + connection.getNode() + ); + delegate = new TransportResponseHandler() { + @Override + public void handleResponse(T response) { + unregisterChildNode.close(); + traceableTransportResponseHandler.handleResponse(response); + } - @Override - public void handleException(TransportException exp) { - unregisterChildNode.close(); - handler.handleException(exp); - } + @Override + public void handleException(TransportException exp) { + unregisterChildNode.close(); + traceableTransportResponseHandler.handleException(exp); + } - @Override - public String executor() { - return handler.executor(); - } + @Override + public String executor() { + return traceableTransportResponseHandler.executor(); + } - @Override - public T read(StreamInput in) throws IOException { - return handler.read(in); - } + @Override + public T read(StreamInput in) throws IOException { + return traceableTransportResponseHandler.read(in); + } - @Override - public String toString() { - return getClass().getName() + "/[" + action + "]:" + handler.toString(); - } - }; - } else { - delegate = handler; - } - asyncSender.sendRequest(connection, action, request, options, delegate); - } catch (final Exception ex) { - // the caller might not handle this so we invoke the handler - final TransportException te; - if (ex instanceof TransportException) { - te = (TransportException) ex; - } else { - te = new TransportException("failure to send", ex); + @Override + public String toString() { + return getClass().getName() + "/[" + action + "]:" + handler.toString(); + } + }; + } else { + delegate = traceableTransportResponseHandler; + } + asyncSender.sendRequest(connection, action, request, options, delegate); + } catch (final Exception ex) { + // the caller might not handle this so we invoke the handler + final TransportException te; + if (ex instanceof TransportException) { + te = (TransportException) ex; + } else { + te = new TransportException("failure to send", ex); + } + traceableTransportResponseHandler.handleException(te); } - handler.handleException(te); } } @@ -1017,6 +1030,7 @@ private void sendRequestInternal( threadPool.executor(executor).execute(new AbstractRunnable() { @Override public void onRejection(Exception e) { + contextToNotify.handler().handleRejection(e); // if we get rejected during node shutdown we don't wanna bubble it up logger.debug( () -> new ParameterizedMessage( @@ -1029,6 +1043,7 @@ public void onRejection(Exception e) { @Override public void onFailure(Exception e) { + contextToNotify.handler().handleRejection(e); logger.warn( () -> new ParameterizedMessage( "failed to notify response handler on exception, action: {}", diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java index 25f453fe024ff..6e064f943ca07 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java @@ -2300,9 +2300,8 @@ public static void afterClass() throws Exception { INSTANCE.printTestMessage("cleaning up after"); INSTANCE.afterInternal(true); checkStaticState(true); - StrictCheckSpanProcessor.validateTracingStateOnShutdown(); } - + StrictCheckSpanProcessor.validateTracingStateOnShutdown(); } finally { SUITE_SEED = null; currentCluster = null;