From 8709bcdad83608f1b67e07762b70297fcd3348de Mon Sep 17 00:00:00 2001 From: Gagan Juneja Date: Fri, 8 Sep 2023 20:34:35 +0530 Subject: [PATCH] Remove instrumentation from TransportService Signed-off-by: Gagan Juneja --- .../TelemetryTracerEnabledSanityIT.java | 7 +- .../TraceableTransportResponseHandler.java | 98 ------------------- .../tracing/handler/package-info.java | 12 --- .../listener/TraceableTaskListener.java | 77 --------------- .../transport/TransportService.java | 95 ++++++++---------- 5 files changed, 43 insertions(+), 246 deletions(-) delete mode 100644 server/src/main/java/org/opensearch/telemetry/tracing/handler/TraceableTransportResponseHandler.java delete mode 100644 server/src/main/java/org/opensearch/telemetry/tracing/handler/package-info.java delete mode 100644 server/src/main/java/org/opensearch/telemetry/tracing/listener/TraceableTaskListener.java diff --git a/plugins/telemetry-otel/src/internalClusterTest/java/org/opensearch/telemetry/tracing/TelemetryTracerEnabledSanityIT.java b/plugins/telemetry-otel/src/internalClusterTest/java/org/opensearch/telemetry/tracing/TelemetryTracerEnabledSanityIT.java index 5966a37eb89d1..2d0111e64faad 100644 --- a/plugins/telemetry-otel/src/internalClusterTest/java/org/opensearch/telemetry/tracing/TelemetryTracerEnabledSanityIT.java +++ b/plugins/telemetry-otel/src/internalClusterTest/java/org/opensearch/telemetry/tracing/TelemetryTracerEnabledSanityIT.java @@ -89,12 +89,7 @@ public void testSanityChecksWhenTracingEnabled() throws Exception { InMemorySingletonSpanExporter exporter = InMemorySingletonSpanExporter.INSTANCE; if (!exporter.getFinishedSpanItems().isEmpty()) { - /** - * At present, transport action is not instrumented and all the downstream search calls don't have a parent so, - * they all should be creating a separate trace. Once the transportAction will be instrumented this test will - * start failing and requires the value to be updated to 2. - */ - validators.validate(exporter.getFinishedSpanItems(), 6); + validators.validate(exporter.getFinishedSpanItems(), 2); } } diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/handler/TraceableTransportResponseHandler.java b/server/src/main/java/org/opensearch/telemetry/tracing/handler/TraceableTransportResponseHandler.java deleted file mode 100644 index 0395e41087cfa..0000000000000 --- a/server/src/main/java/org/opensearch/telemetry/tracing/handler/TraceableTransportResponseHandler.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.telemetry.tracing.handler; - -import org.opensearch.common.util.FeatureFlags; -import org.opensearch.core.common.io.stream.StreamInput; -import org.opensearch.core.transport.TransportResponse; -import org.opensearch.telemetry.tracing.Span; -import org.opensearch.telemetry.tracing.SpanScope; -import org.opensearch.telemetry.tracing.Tracer; -import org.opensearch.transport.TransportException; -import org.opensearch.transport.TransportResponseHandler; - -import java.io.IOException; -import java.util.Objects; - -/** - * Tracer wrapped {@link TransportResponseHandler} - * @param TransportResponse - */ -public class TraceableTransportResponseHandler implements TransportResponseHandler { - - private final Span span; - private final TransportResponseHandler delegate; - private final Tracer tracer; - - /** - * Constructor. - * - * @param delegate delegate - * @param span span - * @param tracer tracer - */ - private TraceableTransportResponseHandler(TransportResponseHandler delegate, Span span, Tracer tracer) { - this.delegate = Objects.requireNonNull(delegate); - this.span = Objects.requireNonNull(span); - this.tracer = Objects.requireNonNull(tracer); - } - - /** - * Factory method. - * @param delegate delegate - * @param span span - * @param tracer tracer - * @return transportResponseHandler - */ - public static TransportResponseHandler create( - TransportResponseHandler delegate, - Span span, - Tracer tracer - ) { - if (FeatureFlags.isEnabled(FeatureFlags.TELEMETRY) == true) { - return new TraceableTransportResponseHandler(delegate, span, tracer); - } else { - return delegate; - } - } - - @Override - public T read(StreamInput in) throws IOException { - return delegate.read(in); - } - - @Override - public void handleResponse(T response) { - try (SpanScope scope = tracer.withSpanInScope(span)) { - delegate.handleResponse(response); - } finally { - span.endSpan(); - } - } - - @Override - public void handleException(TransportException exp) { - try (SpanScope scope = tracer.withSpanInScope(span)) { - delegate.handleException(exp); - } finally { - span.setError(exp); - span.endSpan(); - } - } - - @Override - public String executor() { - return delegate.executor(); - } - - @Override - public String toString() { - return delegate.toString(); - } -} diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/handler/package-info.java b/server/src/main/java/org/opensearch/telemetry/tracing/handler/package-info.java deleted file mode 100644 index ff9f8f57dc07c..0000000000000 --- a/server/src/main/java/org/opensearch/telemetry/tracing/handler/package-info.java +++ /dev/null @@ -1,12 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -/** - * This package contains classes needed for tracing requests. - */ -package org.opensearch.telemetry.tracing.handler; diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/listener/TraceableTaskListener.java b/server/src/main/java/org/opensearch/telemetry/tracing/listener/TraceableTaskListener.java deleted file mode 100644 index b5c2549f258b6..0000000000000 --- a/server/src/main/java/org/opensearch/telemetry/tracing/listener/TraceableTaskListener.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.telemetry.tracing.listener; - -import org.opensearch.common.util.FeatureFlags; -import org.opensearch.tasks.Task; -import org.opensearch.tasks.TaskListener; -import org.opensearch.telemetry.tracing.Span; -import org.opensearch.telemetry.tracing.SpanScope; -import org.opensearch.telemetry.tracing.Tracer; - -import java.util.Objects; - -/** - * Tracer wrapped {@link TaskListener} - * @param response. - */ -public class TraceableTaskListener implements TaskListener { - - private final TaskListener delegate; - private final Span span; - private final Tracer tracer; - - /** - * Constructor. - * - * @param delegate delegate - * @param span span - * @param tracer tracer - */ - private TraceableTaskListener(TaskListener delegate, Span span, Tracer tracer) { - this.delegate = Objects.requireNonNull(delegate); - this.span = Objects.requireNonNull(span); - this.tracer = Objects.requireNonNull(tracer); - } - - /** - * Factory method. - * @param delegate delegate - * @param span span - * @param tracer tracer - * @return task listener - */ - public static TaskListener create(TaskListener delegate, Span span, Tracer tracer) { - if (FeatureFlags.isEnabled(FeatureFlags.TELEMETRY) == true) { - return new TraceableTaskListener(delegate, span, tracer); - } else { - return delegate; - } - } - - @Override - public void onResponse(Task task, Response response) { - try (SpanScope scope = tracer.withSpanInScope(span)) { - delegate.onResponse(task, response); - } finally { - span.endSpan(); - } - - } - - @Override - public void onFailure(Task task, Exception e) { - try (SpanScope scope = tracer.withSpanInScope(span)) { - delegate.onFailure(task, e); - } finally { - span.setError(e); - span.endSpan(); - } - } -} diff --git a/server/src/main/java/org/opensearch/transport/TransportService.java b/server/src/main/java/org/opensearch/transport/TransportService.java index 23e9d37b66d91..52274872e8cc8 100644 --- a/server/src/main/java/org/opensearch/transport/TransportService.java +++ b/server/src/main/java/org/opensearch/transport/TransportService.java @@ -66,11 +66,7 @@ import org.opensearch.node.NodeClosedException; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskManager; -import org.opensearch.telemetry.tracing.Span; -import org.opensearch.telemetry.tracing.SpanBuilder; -import org.opensearch.telemetry.tracing.SpanScope; import org.opensearch.telemetry.tracing.Tracer; -import org.opensearch.telemetry.tracing.handler.TraceableTransportResponseHandler; import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.ThreadPool; @@ -865,60 +861,53 @@ public final void sendRequest( final TransportRequestOptions options, final TransportResponseHandler handler ) { - final Span span = tracer.startSpan(SpanBuilder.from(action, connection)); - try (SpanScope spanScope = tracer.withSpanInScope(span)) { - TransportResponseHandler traceableTransportResponseHandler = TraceableTransportResponseHandler.create(handler, span, tracer); - try { - logger.debug("Action: " + action); - final TransportResponseHandler delegate; - if (request.getParentTask().isSet()) { - // TODO: capture the connection instead so that we can cancel child tasks on the remote connections. - final Releasable unregisterChildNode = taskManager.registerChildNode( - request.getParentTask().getId(), - connection.getNode() - ); - delegate = new TransportResponseHandler() { - @Override - public void handleResponse(T response) { - unregisterChildNode.close(); - traceableTransportResponseHandler.handleResponse(response); - } + try { + logger.debug("Action: " + action); + final TransportResponseHandler delegate; + if (request.getParentTask().isSet()) { + // TODO: capture the connection instead so that we can cancel child tasks on the remote connections. + final Releasable unregisterChildNode = taskManager.registerChildNode(request.getParentTask().getId(), connection.getNode()); + delegate = new TransportResponseHandler() { + @Override + public void handleResponse(T response) { + unregisterChildNode.close(); + handler.handleResponse(response); + } - @Override - public void handleException(TransportException exp) { - unregisterChildNode.close(); - traceableTransportResponseHandler.handleException(exp); - } + @Override + public void handleException(TransportException exp) { + unregisterChildNode.close(); + handler.handleException(exp); + } - @Override - public String executor() { - return traceableTransportResponseHandler.executor(); - } + @Override + public String executor() { + return handler.executor(); + } - @Override - public T read(StreamInput in) throws IOException { - return traceableTransportResponseHandler.read(in); - } + @Override + public T read(StreamInput in) throws IOException { + return handler.read(in); + } - @Override - public String toString() { - return getClass().getName() + "/[" + action + "]:" + traceableTransportResponseHandler.toString(); - } - }; - } else { - delegate = traceableTransportResponseHandler; - } - asyncSender.sendRequest(connection, action, request, options, delegate); - } catch (final Exception ex) { - // the caller might not handle this so we invoke the handler - final TransportException te; - if (ex instanceof TransportException) { - te = (TransportException) ex; - } else { - te = new TransportException("failure to send", ex); - } - traceableTransportResponseHandler.handleException(te); + @Override + public String toString() { + return getClass().getName() + "/[" + action + "]:" + handler.toString(); + } + }; + } else { + delegate = handler; + } + asyncSender.sendRequest(connection, action, request, options, delegate); + } catch (final Exception ex) { + // the caller might not handle this so we invoke the handler + final TransportException te; + if (ex instanceof TransportException) { + te = (TransportException) ex; + } else { + te = new TransportException("failure to send", ex); } + handler.handleException(te); } }