Skip to content

Commit

Permalink
Remove instrumentation from TransportService
Browse files Browse the repository at this point in the history
Signed-off-by: Gagan Juneja <[email protected]>
  • Loading branch information
Gagan Juneja committed Sep 8, 2023
1 parent 5225294 commit 8709bcd
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 246 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,7 @@ public void testSanityChecksWhenTracingEnabled() throws Exception {

InMemorySingletonSpanExporter exporter = InMemorySingletonSpanExporter.INSTANCE;
if (!exporter.getFinishedSpanItems().isEmpty()) {
/**
* At present, transport action is not instrumented and all the downstream search calls don't have a parent so,
* they all should be creating a separate trace. Once the transportAction will be instrumented this test will
* start failing and requires the value to be updated to 2.
*/
validators.validate(exporter.getFinishedSpanItems(), 6);
validators.validate(exporter.getFinishedSpanItems(), 2);
}
}

Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

95 changes: 42 additions & 53 deletions server/src/main/java/org/opensearch/transport/TransportService.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,7 @@
import org.opensearch.node.NodeClosedException;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskManager;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanBuilder;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.telemetry.tracing.handler.TraceableTransportResponseHandler;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -865,60 +861,53 @@ public final <T extends TransportResponse> void sendRequest(
final TransportRequestOptions options,
final TransportResponseHandler<T> handler
) {
final Span span = tracer.startSpan(SpanBuilder.from(action, connection));
try (SpanScope spanScope = tracer.withSpanInScope(span)) {
TransportResponseHandler<T> traceableTransportResponseHandler = TraceableTransportResponseHandler.create(handler, span, tracer);
try {
logger.debug("Action: " + action);
final TransportResponseHandler<T> delegate;
if (request.getParentTask().isSet()) {
// TODO: capture the connection instead so that we can cancel child tasks on the remote connections.
final Releasable unregisterChildNode = taskManager.registerChildNode(
request.getParentTask().getId(),
connection.getNode()
);
delegate = new TransportResponseHandler<T>() {
@Override
public void handleResponse(T response) {
unregisterChildNode.close();
traceableTransportResponseHandler.handleResponse(response);
}
try {
logger.debug("Action: " + action);
final TransportResponseHandler<T> delegate;
if (request.getParentTask().isSet()) {
// TODO: capture the connection instead so that we can cancel child tasks on the remote connections.
final Releasable unregisterChildNode = taskManager.registerChildNode(request.getParentTask().getId(), connection.getNode());
delegate = new TransportResponseHandler<T>() {
@Override
public void handleResponse(T response) {
unregisterChildNode.close();
handler.handleResponse(response);
}

@Override
public void handleException(TransportException exp) {
unregisterChildNode.close();
traceableTransportResponseHandler.handleException(exp);
}
@Override
public void handleException(TransportException exp) {
unregisterChildNode.close();
handler.handleException(exp);
}

@Override
public String executor() {
return traceableTransportResponseHandler.executor();
}
@Override
public String executor() {
return handler.executor();
}

@Override
public T read(StreamInput in) throws IOException {
return traceableTransportResponseHandler.read(in);
}
@Override
public T read(StreamInput in) throws IOException {
return handler.read(in);
}

@Override
public String toString() {
return getClass().getName() + "/[" + action + "]:" + traceableTransportResponseHandler.toString();
}
};
} else {
delegate = traceableTransportResponseHandler;
}
asyncSender.sendRequest(connection, action, request, options, delegate);
} catch (final Exception ex) {
// the caller might not handle this so we invoke the handler
final TransportException te;
if (ex instanceof TransportException) {
te = (TransportException) ex;
} else {
te = new TransportException("failure to send", ex);
}
traceableTransportResponseHandler.handleException(te);
@Override
public String toString() {
return getClass().getName() + "/[" + action + "]:" + handler.toString();
}
};
} else {
delegate = handler;
}
asyncSender.sendRequest(connection, action, request, options, delegate);
} catch (final Exception ex) {
// the caller might not handle this so we invoke the handler
final TransportException te;
if (ex instanceof TransportException) {
te = (TransportException) ex;
} else {
te = new TransportException("failure to send", ex);
}
handler.handleException(te);
}
}

Expand Down

0 comments on commit 8709bcd

Please sign in to comment.