Skip to content

Commit

Permalink
Add Tracing Instrumentation at Network and Rest layer (opensearch-pro…
Browse files Browse the repository at this point in the history
…ject#9415)

* Add Instrumentation

Signed-off-by: Gagan Juneja <[email protected]>

* update changelog

Signed-off-by: Gagan Juneja <[email protected]>

* clear the context

Signed-off-by: Gagan Juneja <[email protected]>

* Add Instrumentation

Signed-off-by: Gagan Juneja <[email protected]>

* Fix test cases

Signed-off-by: Gagan Juneja <[email protected]>

* Empty-Commit

Signed-off-by: Gagan Juneja <[email protected]>

* Add Instrumentation

Signed-off-by: Gagan Juneja <[email protected]>

* Add Instrumentation

Signed-off-by: Gagan Juneja <[email protected]>

* Remove internal transport action check

Signed-off-by: Gagan Juneja <[email protected]>

* Changes

Signed-off-by: Gagan Juneja <[email protected]>

* Refactor code

Signed-off-by: Gagan Juneja <[email protected]>

* Address review comment

Signed-off-by: Gagan Juneja <[email protected]>

* Fix failing test cases

Signed-off-by: Gagan Juneja <[email protected]>

* Address review comment

Signed-off-by: Gagan Juneja <[email protected]>

* Fix failing test cases

Signed-off-by: Gagan Juneja <[email protected]>

* Fix failing test cases

Signed-off-by: Gagan Juneja <[email protected]>

* Fix failing test cases

Signed-off-by: Gagan Juneja <[email protected]>

* Fix failing test cases

Signed-off-by: Gagan Juneja <[email protected]>

* Empty-Commit

Signed-off-by: Gagan Juneja <[email protected]>

* Fix failing test cases

Signed-off-by: Gagan Juneja <[email protected]>

* Fix java doc

Signed-off-by: Gagan Juneja <[email protected]>

* Address review comment

Signed-off-by: Gagan Juneja <[email protected]>

* Address review comment

Signed-off-by: Gagan Juneja <[email protected]>

* Empty-Commit

Signed-off-by: Gagan Juneja <[email protected]>

* Address review comment

Signed-off-by: Gagan Juneja <[email protected]>

* Address review comment

Signed-off-by: Gagan Juneja <[email protected]>

* Address merge conflicts

Signed-off-by: Gagan Juneja <[email protected]>

* Address merge conflicts

Signed-off-by: Gagan Juneja <[email protected]>

* Empty-Commit

Signed-off-by: Gagan Juneja <[email protected]>

* Empty-Commit

Signed-off-by: Gagan Juneja <[email protected]>

* Enable NumberOfTraceIDsEqualToRequests test

Signed-off-by: Gagan Juneja <[email protected]>

* Fix java doc

Signed-off-by: Gagan Juneja <[email protected]>

* Fix java doc

Signed-off-by: Gagan Juneja <[email protected]>

* Empty-Commit

Signed-off-by: Gagan Juneja <[email protected]>

* Address review comment

Signed-off-by: Gagan Juneja <[email protected]>

* Address review comment

Signed-off-by: Gagan Juneja <[email protected]>

* Fix test case

Signed-off-by: Gagan Juneja <[email protected]>

* Remove instrumentation from TransportService

Signed-off-by: Gagan Juneja <[email protected]>

---------

Signed-off-by: Gagan Juneja <[email protected]>
Signed-off-by: Gagan Juneja <[email protected]>
Co-authored-by: Gagan Juneja <[email protected]>
  • Loading branch information
Gaganjuneja and Gagan Juneja committed Sep 8, 2023
1 parent 529a5b8 commit df16ce0
Show file tree
Hide file tree
Showing 122 changed files with 1,207 additions and 276 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Move ZStd to a plugin ([#9658](https://github.com/opensearch-project/OpenSearch/pull/9658))
- [Remote Store] Add support for Remote Translog Store upload stats in `_nodes/stats/` API ([#8908](https://github.com/opensearch-project/OpenSearch/pull/8908))
- [Remote Store] Removing feature flag to mark feature GA ([#9761](https://github.com/opensearch-project/OpenSearch/pull/9761))
- Add instrumentation in rest and network layer. ([#9415](https://github.com/opensearch-project/OpenSearch/pull/9415))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,13 @@ protected void addDefaultAttributes(Span span) {
}

@Override
public Span startSpan(String spanName, Map<String, List<String>> headers, Attributes attributes) {
public Span startSpan(SpanCreationContext spanCreationContext, Map<String, List<String>> headers) {
Optional<Span> propagatedSpan = tracingTelemetry.getContextPropagator().extractFromHeaders(headers);
return startSpan(spanName, propagatedSpan.map(SpanContext::new).orElse(null), attributes);
return startSpan(
spanCreationContext.getSpanName(),
propagatedSpan.map(SpanContext::new).orElse(null),
spanCreationContext.getAttributes()
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.attributes.Attributes;
import org.opensearch.telemetry.tracing.SpanCreationContext;

import java.util.List;
import java.util.Map;
Expand All @@ -28,10 +28,9 @@ public interface HttpTracer {
/**
* Start the span with propagating the tracing info from the HttpRequest header.
*
* @param spanName span name.
* @param spanCreationContext span name.
* @param header http request header.
* @param attributes span attributes.
* @return span.
*/
Span startSpan(String spanName, Map<String, List<String>> header, Attributes attributes);
Span startSpan(SpanCreationContext spanCreationContext, Map<String, List<String>> header);
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void close() {
}

@Override
public Span startSpan(String spanName, Map<String, List<String>> header, Attributes attributes) {
public Span startSpan(SpanCreationContext spanCreationContext, Map<String, List<String>> header) {
return NoopSpan.INSTANCE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.opensearch.http.HttpHandlingSettings;
import org.opensearch.http.HttpReadTimeoutException;
import org.opensearch.http.HttpServerChannel;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.NettyAllocator;
import org.opensearch.transport.NettyByteBufSizer;
Expand Down Expand Up @@ -174,9 +175,10 @@ public Netty4HttpServerTransport(
NamedXContentRegistry xContentRegistry,
Dispatcher dispatcher,
ClusterSettings clusterSettings,
SharedGroupFactory sharedGroupFactory
SharedGroupFactory sharedGroupFactory,
Tracer tracer
) {
super(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher, clusterSettings);
super(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher, clusterSettings, tracer);
Netty4Utils.setAvailableProcessors(OpenSearchExecutors.NODE_PROCESSORS_SETTING.get(settings));
NettyAllocator.logAllocatorDescriptionIfNeeded();
this.sharedGroupFactory = sharedGroupFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.opensearch.http.netty4.Netty4HttpServerTransport;
import org.opensearch.plugins.NetworkPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.netty4.Netty4Transport;

Expand Down Expand Up @@ -122,7 +123,8 @@ public Map<String, Supplier<HttpServerTransport>> getHttpTransports(
NamedXContentRegistry xContentRegistry,
NetworkService networkService,
HttpServerTransport.Dispatcher dispatcher,
ClusterSettings clusterSettings
ClusterSettings clusterSettings,
Tracer tracer
) {
return Collections.singletonMap(
NETTY_HTTP_TRANSPORT_NAME,
Expand All @@ -134,7 +136,8 @@ public Map<String, Supplier<HttpServerTransport>> getHttpTransports(
xContentRegistry,
dispatcher,
clusterSettings,
getSharedGroupFactory(settings)
getSharedGroupFactory(settings),
tracer
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestChannel;
import org.opensearch.rest.RestRequest;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -112,7 +113,8 @@ public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext,
xContentRegistry(),
dispatcher,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
new SharedGroupFactory(Settings.EMPTY)
new SharedGroupFactory(Settings.EMPTY),
NoopTracer.INSTANCE
)
) {
httpServerTransport.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.opensearch.http.HttpResponse;
import org.opensearch.http.HttpServerTransport;
import org.opensearch.http.NullDispatcher;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -135,7 +136,8 @@ class CustomNettyHttpServerTransport extends Netty4HttpServerTransport {
xContentRegistry(),
new NullDispatcher(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
new SharedGroupFactory(settings)
new SharedGroupFactory(settings),
NoopTracer.INSTANCE
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestChannel;
import org.opensearch.rest.RestRequest;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.rest.FakeRestRequest;
import org.opensearch.threadpool.TestThreadPool;
Expand Down Expand Up @@ -198,7 +199,8 @@ public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext,
xContentRegistry(),
dispatcher,
clusterSettings,
new SharedGroupFactory(settings)
new SharedGroupFactory(settings),
NoopTracer.INSTANCE
)
) {
transport.start();
Expand Down Expand Up @@ -247,7 +249,8 @@ public void testBindUnavailableAddress() {
xContentRegistry(),
new NullDispatcher(),
clusterSettings,
new SharedGroupFactory(Settings.EMPTY)
new SharedGroupFactory(Settings.EMPTY),
NoopTracer.INSTANCE
)
) {
transport.start();
Expand All @@ -265,7 +268,8 @@ public void testBindUnavailableAddress() {
xContentRegistry(),
new NullDispatcher(),
clusterSettings,
new SharedGroupFactory(settings)
new SharedGroupFactory(settings),
NoopTracer.INSTANCE
)
) {
BindHttpException bindHttpException = expectThrows(BindHttpException.class, otherTransport::start);
Expand Down Expand Up @@ -317,7 +321,8 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th
xContentRegistry(),
dispatcher,
clusterSettings,
new SharedGroupFactory(settings)
new SharedGroupFactory(settings),
NoopTracer.INSTANCE
)
) {
transport.start();
Expand Down Expand Up @@ -379,7 +384,8 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th
xContentRegistry(),
dispatcher,
clusterSettings,
new SharedGroupFactory(Settings.EMPTY)
new SharedGroupFactory(Settings.EMPTY),
NoopTracer.INSTANCE
)
) {
transport.start();
Expand Down Expand Up @@ -448,7 +454,8 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th
xContentRegistry(),
dispatcher,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
new SharedGroupFactory(settings)
new SharedGroupFactory(settings),
NoopTracer.INSTANCE
)
) {
transport.start();
Expand Down Expand Up @@ -521,7 +528,8 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th
xContentRegistry(),
dispatcher,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
new SharedGroupFactory(settings)
new SharedGroupFactory(settings),
NoopTracer.INSTANCE
)
) {
transport.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.opensearch.common.util.PageCacheRecycler;
import org.opensearch.core.common.transport.TransportAddress;
import org.opensearch.core.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.Transport;
import org.opensearch.transport.TransportService;
Expand Down Expand Up @@ -99,7 +100,14 @@ public TransportAddress[] addressesFromString(String address) {
return new TransportAddress[] { poorMansDNS.getOrDefault(address, buildNewFakeTransportAddress()) };
}
};
return new MockTransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
return new MockTransportService(
Settings.EMPTY,
transport,
threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
null,
NoopTracer.INSTANCE
);
}

protected List<TransportAddress> buildDynamicHosts(Settings nodeSettings, int nodes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.opensearch.core.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.discovery.SeedHostsProvider;
import org.opensearch.discovery.SeedHostsResolver;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportService;
import org.opensearch.transport.nio.MockNioTransport;
Expand Down Expand Up @@ -80,7 +81,8 @@ protected MockTransportService createTransportService() {
),
threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
null
null,
NoopTracer.INSTANCE
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.common.network.NetworkService;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.transport.TransportAddress;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.threadpool.TestThreadPool;
Expand Down Expand Up @@ -109,7 +110,7 @@ public void setProjectName() {

@Before
public void createTransportService() {
transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null);
transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, NoopTracer.INSTANCE);
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
import org.opensearch.test.telemetry.tracing.MockSpanData;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import io.opentelemetry.sdk.common.CompletableResultCode;
Expand All @@ -21,7 +23,7 @@

public class InMemorySingletonSpanExporter implements SpanExporter {

private static final InMemorySingletonSpanExporter INSTANCE = new InMemorySingletonSpanExporter(InMemorySpanExporter.create());
public static final InMemorySingletonSpanExporter INSTANCE = new InMemorySingletonSpanExporter(InMemorySpanExporter.create());

private static InMemorySpanExporter delegate;

Expand Down Expand Up @@ -62,10 +64,30 @@ private List<MockSpanData> convertSpanDataListToMockSpanDataList(List<SpanData>
spanData.getStartEpochNanos(),
spanData.getEndEpochNanos(),
spanData.hasEnded(),
spanData.getName()
spanData.getName(),
getAttributes(spanData)
)
)
.collect(Collectors.toList());
return mockSpanDataList;
}

private Map<String, Object> getAttributes(SpanData spanData) {
if (spanData.getAttributes() != null) {
return spanData.getAttributes()
.asMap()
.entrySet()
.stream()
.collect(Collectors.toMap(e -> e.getKey().getKey(), e -> e.getValue()));
} else {
return Collections.emptyMap();
}
}

/**
* Clears the state.
*/
public void reset() {
delegate.reset();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,15 @@ public void testSanityCheckWhenTracingDisabled() throws Exception {

ensureGreen();
refresh();
InMemorySingletonSpanExporter exporter = InMemorySingletonSpanExporter.INSTANCE;
exporter.reset();

// Make the search call;
client.prepareSearch().setQuery(queryStringQuery("fox")).get();

// Sleep for about 3s to wait for traces are published (the delay is 1s)
Thread.sleep(3000);

InMemorySingletonSpanExporter exporter = InMemorySingletonSpanExporter.create();
assertTrue(exporter.getFinishedSpanItems().isEmpty());
}

Expand Down
Loading

0 comments on commit df16ce0

Please sign in to comment.