From dc78ebed763943de0774ff3b5f64286d8666922d Mon Sep 17 00:00:00 2001 From: Gagan Juneja Date: Wed, 6 Sep 2023 16:20:14 +0530 Subject: [PATCH] Fix the telemetry strict check framework Signed-off-by: Gagan Juneja --- .../single/SingleNodeDiscoveryIT.java | 2 + .../main/java/org/opensearch/node/Node.java | 9 +++++ .../transport/TransportService.java | 37 ++++++++++--------- .../java/org/opensearch/node/MockNode.java | 7 ++++ .../opensearch/test/InternalTestCluster.java | 36 +++++++++++++++++- .../test/OpenSearchIntegTestCase.java | 2 +- .../test/OpenSearchSingleNodeTestCase.java | 21 +++++++++++ .../test/telemetry/MockTelemetry.java | 26 +------------ .../test/telemetry/MockTelemetryPlugin.java | 36 +----------------- .../tracing/MockTracingTelemetry.java | 36 +++++++++--------- .../tracing/StrictCheckSpanProcessor.java | 9 ++++- 11 files changed, 122 insertions(+), 99 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/discovery/single/SingleNodeDiscoveryIT.java b/server/src/internalClusterTest/java/org/opensearch/discovery/single/SingleNodeDiscoveryIT.java index 90bdcf7fded11..59e257ae47a44 100644 --- a/server/src/internalClusterTest/java/org/opensearch/discovery/single/SingleNodeDiscoveryIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/discovery/single/SingleNodeDiscoveryIT.java @@ -114,6 +114,7 @@ public Path nodeConfigPath(int nodeOrdinal) { assertThat(second.nodes().getSize(), equalTo(1)); assertThat(first.nodes().getClusterManagerNodeId(), not(equalTo(second.nodes().getClusterManagerNodeId()))); assertThat(first.metadata().clusterUUID(), not(equalTo(second.metadata().clusterUUID()))); + internalCluster().stopNodesAndClients(); } } @@ -178,6 +179,7 @@ public Path nodeConfigPath(int nodeOrdinal) { final ClusterState first = internalCluster().getInstance(ClusterService.class).state(); assertThat(first.nodes().getSize(), equalTo(1)); assertBusy(() -> mockAppender.assertAllExpectationsMatched()); + internalCluster().stopNodesAndClients(); } } } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 67667346036f6..6256e9ae0aa11 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -219,6 +219,7 @@ import org.opensearch.tasks.TaskResourceTrackingService; import org.opensearch.tasks.TaskResultsService; import org.opensearch.tasks.consumer.TopNSearchTasksLogger; +import org.opensearch.telemetry.Telemetry; import org.opensearch.telemetry.TelemetryModule; import org.opensearch.telemetry.TelemetrySettings; import org.opensearch.telemetry.tracing.NoopTracerFactory; @@ -392,6 +393,8 @@ public static class DiscoverySettings { private FileCache fileCache; private final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory; + private final Telemetry telemetry; + public Node(Environment environment) { this(environment, Collections.emptyList(), true); } @@ -587,9 +590,11 @@ protected Node( final TelemetrySettings telemetrySettings = new TelemetrySettings(settings, clusterService.getClusterSettings()); List telemetryPlugins = pluginsService.filterPlugins(TelemetryPlugin.class); TelemetryModule telemetryModule = new TelemetryModule(telemetryPlugins, telemetrySettings); + telemetry = telemetryModule.getTelemetry().orElse(null); tracerFactory = new TracerFactory(telemetrySettings, telemetryModule.getTelemetry(), threadPool.getThreadContext()); } else { tracerFactory = new NoopTracerFactory(); + telemetry = null; } tracer = tracerFactory.getTracer(); @@ -1805,4 +1810,8 @@ private void initializeFileCache(Settings settings, CircuitBreaker circuitBreake public FileCache fileCache() { return this.fileCache; } + + protected Optional getTelemetry() { + return Optional.ofNullable(telemetry); + } } diff --git a/server/src/main/java/org/opensearch/transport/TransportService.java b/server/src/main/java/org/opensearch/transport/TransportService.java index 83311393bc810..7cf34de471af5 100644 --- a/server/src/main/java/org/opensearch/transport/TransportService.java +++ b/server/src/main/java/org/opensearch/transport/TransportService.java @@ -865,15 +865,15 @@ public final void sendRequest( final TransportRequestOptions options, final TransportResponseHandler handler ) { - try { - logger.debug("Action: " + action); - final Span span = tracer.startSpan(SpanBuilder.from(action, connection)); - try (SpanScope spanScope = tracer.withSpanInScope(span)) { - final TransportResponseHandler traceableTransportResponseHandler = TraceableTransportResponseHandler.create( - handler, - span, - tracer - ); + logger.debug("Action: " + action); + final Span span = tracer.startSpan(SpanBuilder.from(action, connection)); + try (SpanScope spanScope = tracer.withSpanInScope(span)) { + final TransportResponseHandler traceableTransportResponseHandler = TraceableTransportResponseHandler.create( + handler, + span, + tracer + ); + try { final TransportResponseHandler delegate; if (request.getParentTask().isSet()) { // TODO: capture the connection instead so that we can cancel child tasks on the remote connections. @@ -913,16 +913,17 @@ public String toString() { delegate = traceableTransportResponseHandler; } asyncSender.sendRequest(connection, action, request, options, delegate); + } catch (final Exception ex) { + // the caller might not handle this so we invoke the handler + final TransportException te; + if (ex instanceof TransportException) { + te = (TransportException) ex; + } else { + te = new TransportException("failure to send", ex); + } + span.endSpan(); + traceableTransportResponseHandler.handleException(te); } - } catch (final Exception ex) { - // the caller might not handle this so we invoke the handler - final TransportException te; - if (ex instanceof TransportException) { - te = (TransportException) ex; - } else { - te = new TransportException("failure to send", ex); - } - handler.handleException(te); } } diff --git a/test/framework/src/main/java/org/opensearch/node/MockNode.java b/test/framework/src/main/java/org/opensearch/node/MockNode.java index e6c7e21d5b3ea..ca16a714fd6ca 100644 --- a/test/framework/src/main/java/org/opensearch/node/MockNode.java +++ b/test/framework/src/main/java/org/opensearch/node/MockNode.java @@ -60,6 +60,7 @@ import org.opensearch.search.SearchService; import org.opensearch.search.fetch.FetchPhase; import org.opensearch.search.query.QueryPhase; +import org.opensearch.telemetry.Telemetry; import org.opensearch.telemetry.tracing.Tracer; import org.opensearch.test.MockHttpTransport; import org.opensearch.test.transport.MockTransportService; @@ -72,6 +73,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.Executor; import java.util.function.Function; @@ -272,4 +274,9 @@ protected void configureNodeAndClusterIdStateListener(ClusterService clusterServ public NamedWriteableRegistry getNamedWriteableRegistry() { return namedWriteableRegistry; } + + @Override + public Optional getTelemetry() { + return super.getTelemetry(); + } } diff --git a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java index 11e847e29a097..1339d8a802043 100644 --- a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java @@ -116,7 +116,10 @@ import org.opensearch.script.ScriptService; import org.opensearch.search.SearchService; import org.opensearch.tasks.TaskManager; +import org.opensearch.telemetry.Telemetry; import org.opensearch.test.disruption.ServiceDisruptionScheme; +import org.opensearch.test.telemetry.MockTelemetry; +import org.opensearch.test.telemetry.tracing.MockTracingTelemetry; import org.opensearch.test.transport.MockTransportService; import org.opensearch.transport.TransportService; import org.opensearch.transport.TransportSettings; @@ -137,6 +140,7 @@ import java.util.Map; import java.util.NavigableMap; import java.util.Objects; +import java.util.Optional; import java.util.Random; import java.util.Set; import java.util.TreeMap; @@ -957,16 +961,25 @@ public Client smartClient() { @Override public synchronized void close() throws IOException { + /** + * MockTracingTelemetry:validateTracingStateOnShutdown validates the shared span storage across all the nodes. + * Hence, we just need any one node to ensure the correctness of the system. + */ + Node anyNodeToGetRefOfTelemetry = null; if (this.open.compareAndSet(true, false)) { if (activeDisruptionScheme != null) { activeDisruptionScheme.testClusterClosed(); activeDisruptionScheme = null; } try { + if (nodes.firstEntry() != null && nodes.firstEntry().getValue() != null) { + anyNodeToGetRefOfTelemetry = nodes.firstEntry().getValue().node; + } IOUtils.close(nodes.values()); } finally { nodes = Collections.emptyNavigableMap(); executor.shutdownNow(); + validateTracingTerminalState(anyNodeToGetRefOfTelemetry); } } } @@ -1908,19 +1921,38 @@ private void stopNodesAndClient(NodeAndClient nodeAndClient) throws IOException stopNodesAndClients(Collections.singleton(nodeAndClient)); } + /** + * Stop the nodes and clients. + * @throws IOException exception + */ + public void stopNodesAndClients() throws IOException { + stopNodesAndClients(nodes.values()); + } + private synchronized void stopNodesAndClients(Collection nodeAndClients) throws IOException { final Set excludedNodeIds = excludeClusterManagers(nodeAndClients); - for (NodeAndClient nodeAndClient : nodeAndClients) { removeDisruptionSchemeFromNode(nodeAndClient); final NodeAndClient previous = removeNode(nodeAndClient); assert previous == nodeAndClient; nodeAndClient.close(); } - removeExclusions(excludedNodeIds); } + private void validateTracingTerminalState(Node node) { + if (node != null) { + Optional telemetry = ((MockNode) node).getTelemetry(); + if (isValidMockTracingTelemetry(telemetry)) { + ((MockTracingTelemetry) telemetry.get().getTracingTelemetry()).validateTracingStateOnShutdown(); + } + } + } + + private static boolean isValidMockTracingTelemetry(Optional telemetry) { + return telemetry.isPresent() && (telemetry.get() instanceof MockTelemetry) && (telemetry.get().getTracingTelemetry() != null); + } + /** * Restarts a random data node in the cluster */ diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java index c15283a7ea245..c80bb2fbaea19 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java @@ -2085,7 +2085,7 @@ protected boolean addMockGeoShapeFieldMapper() { * @return boolean. */ protected boolean addMockTelemetryPlugin() { - return false; + return true; } /** diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchSingleNodeTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchSingleNodeTestCase.java index 2de4c8fdbdfb8..072fb14547291 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchSingleNodeTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchSingleNodeTestCase.java @@ -71,8 +71,11 @@ import org.opensearch.script.MockScriptService; import org.opensearch.search.SearchService; import org.opensearch.search.internal.SearchContext; +import org.opensearch.telemetry.Telemetry; import org.opensearch.telemetry.TelemetrySettings; +import org.opensearch.test.telemetry.MockTelemetry; import org.opensearch.test.telemetry.MockTelemetryPlugin; +import org.opensearch.test.telemetry.tracing.MockTracingTelemetry; import org.opensearch.transport.TransportSettings; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -83,6 +86,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Optional; import java.util.concurrent.TimeUnit; import static org.opensearch.cluster.coordination.ClusterBootstrapService.INITIAL_CLUSTER_MANAGER_NODES_SETTING; @@ -99,10 +103,12 @@ public abstract class OpenSearchSingleNodeTestCase extends OpenSearchTestCase { private static Node NODE = null; + private static Node nodeForTracingStrictCheck = null; protected void startNode(long seed) throws Exception { assert NODE == null; NODE = RandomizedContext.current().runWithPrivateRandomness(seed, this::newNode); + nodeForTracingStrictCheck = NODE; // we must wait for the node to actually be up and running. otherwise the node might have started, // elected itself cluster-manager but might not yet have removed the // SERVICE_UNAVAILABLE/1/state not recovered / initialized block @@ -190,6 +196,21 @@ public static void setUpClass() throws Exception { @AfterClass public static void tearDownClass() throws Exception { stopNode(); + validateTracingTerminalState(nodeForTracingStrictCheck); + nodeForTracingStrictCheck = null; + } + + private static void validateTracingTerminalState(Node node) { + if (node != null) { + Optional telemetry = ((MockNode) node).getTelemetry(); + if (isValidMockTracingTelemetry(telemetry)) { + ((MockTracingTelemetry) telemetry.get().getTracingTelemetry()).validateTracingStateOnShutdown(); + } + } + } + + private static boolean isValidMockTracingTelemetry(Optional telemetry) { + return telemetry.isPresent() && telemetry.get() instanceof MockTelemetry && telemetry.get().getTracingTelemetry() != null; } /** diff --git a/test/framework/src/main/java/org/opensearch/test/telemetry/MockTelemetry.java b/test/framework/src/main/java/org/opensearch/test/telemetry/MockTelemetry.java index 894e8a67cea1f..6b428a7f65594 100644 --- a/test/framework/src/main/java/org/opensearch/test/telemetry/MockTelemetry.java +++ b/test/framework/src/main/java/org/opensearch/test/telemetry/MockTelemetry.java @@ -13,46 +13,22 @@ import org.opensearch.telemetry.metrics.MetricsTelemetry; import org.opensearch.telemetry.tracing.TracingTelemetry; import org.opensearch.test.telemetry.tracing.MockTracingTelemetry; -import org.opensearch.threadpool.ThreadPool; - -import java.util.concurrent.TimeUnit; /** * Mock {@link Telemetry} implementation for testing. */ public class MockTelemetry implements Telemetry { - private final ThreadPool threadPool; - /** * Constructor with settings. * @param settings telemetry settings. */ public MockTelemetry(TelemetrySettings settings) { - this(settings, null); - } - /** - * Constructor with settings. - * @param settings telemetry settings. - * @param threadPool thread pool to watch for termination - */ - public MockTelemetry(TelemetrySettings settings, ThreadPool threadPool) { - this.threadPool = threadPool; } @Override public TracingTelemetry getTracingTelemetry() { - return new MockTracingTelemetry(() -> { - // There could be some asynchronous tasks running that we should await for before the closing - // up the tracer instance. - if (threadPool != null) { - try { - threadPool.awaitTermination(10, TimeUnit.SECONDS); - } catch (final InterruptedException ex) { - Thread.currentThread().interrupt(); - } - } - }); + return new MockTracingTelemetry(); } @Override diff --git a/test/framework/src/main/java/org/opensearch/test/telemetry/MockTelemetryPlugin.java b/test/framework/src/main/java/org/opensearch/test/telemetry/MockTelemetryPlugin.java index ebba9857aa8f1..4f483098caf82 100644 --- a/test/framework/src/main/java/org/opensearch/test/telemetry/MockTelemetryPlugin.java +++ b/test/framework/src/main/java/org/opensearch/test/telemetry/MockTelemetryPlugin.java @@ -8,34 +8,18 @@ package org.opensearch.test.telemetry; -import org.opensearch.client.Client; -import org.opensearch.cluster.metadata.IndexNameExpressionResolver; -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.SetOnce; -import org.opensearch.core.common.io.stream.NamedWriteableRegistry; -import org.opensearch.core.xcontent.NamedXContentRegistry; -import org.opensearch.env.Environment; -import org.opensearch.env.NodeEnvironment; import org.opensearch.plugins.Plugin; import org.opensearch.plugins.TelemetryPlugin; -import org.opensearch.repositories.RepositoriesService; -import org.opensearch.script.ScriptService; import org.opensearch.telemetry.Telemetry; import org.opensearch.telemetry.TelemetrySettings; -import org.opensearch.threadpool.ThreadPool; -import org.opensearch.watcher.ResourceWatcherService; -import java.util.Collection; -import java.util.Collections; import java.util.Optional; -import java.util.function.Supplier; /** * Mock {@link TelemetryPlugin} implementation for testing. */ public class MockTelemetryPlugin extends Plugin implements TelemetryPlugin { private static final String MOCK_TRACER_NAME = "mock"; - private final SetOnce threadPool = new SetOnce<>(); /** * Base constructor. @@ -44,27 +28,9 @@ public MockTelemetryPlugin() { } - @Override - public Collection createComponents( - Client client, - ClusterService clusterService, - ThreadPool threadPool, - ResourceWatcherService resourceWatcherService, - ScriptService scriptService, - NamedXContentRegistry xContentRegistry, - Environment environment, - NodeEnvironment nodeEnvironment, - NamedWriteableRegistry namedWriteableRegistry, - IndexNameExpressionResolver indexNameExpressionResolver, - Supplier repositoriesServiceSupplier - ) { - this.threadPool.set(threadPool); - return Collections.emptyList(); - } - @Override public Optional getTelemetry(TelemetrySettings settings) { - return Optional.of(new MockTelemetry(settings, threadPool.get())); + return Optional.of(new MockTelemetry(settings)); } @Override diff --git a/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/MockTracingTelemetry.java b/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/MockTracingTelemetry.java index a5e51dd27541b..99108d4fa9f8c 100644 --- a/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/MockTracingTelemetry.java +++ b/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/MockTracingTelemetry.java @@ -17,6 +17,7 @@ import java.util.Arrays; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; /** * Mock {@link TracingTelemetry} implementation for testing. @@ -24,28 +25,19 @@ public class MockTracingTelemetry implements TracingTelemetry { private final SpanProcessor spanProcessor = new StrictCheckSpanProcessor(); - private final Runnable onClose; + private final AtomicBoolean shutdown = new AtomicBoolean(false); /** * Base constructor. */ - public MockTracingTelemetry() { - this(() -> {}); - } - - /** - * Base constructor. - * - * @param onClose on close hook - */ - public MockTracingTelemetry(final Runnable onClose) { - this.onClose = onClose; - } + public MockTracingTelemetry() {} @Override public Span createSpan(String spanName, Span parentSpan, Attributes attributes) { Span span = new MockSpan(spanName, parentSpan, spanProcessor, attributes); - spanProcessor.onStart(span); + if (shutdown.get() == false) { + spanProcessor.onStart(span); + } return span; } @@ -56,15 +48,25 @@ public TracingContextPropagator getContextPropagator() { @Override public void close() { - // Run onClose hook - onClose.run(); + shutdown.set(true); + } + /** + * Ensures the strict check succeeds for all the spans. + */ + public void validateTracingStateOnShutdown() { List spanData = ((StrictCheckSpanProcessor) spanProcessor).getFinishedSpanItems(); if (spanData.size() != 0) { TelemetryValidators validators = new TelemetryValidators( Arrays.asList(new AllSpansAreEndedProperly(), new AllSpansHaveUniqueId()) ); - validators.validate(spanData, 1); + try { + validators.validate(spanData, 1); + } catch (Error e) { + ((StrictCheckSpanProcessor) spanProcessor).clear(); + throw e; + } } + } } diff --git a/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/StrictCheckSpanProcessor.java b/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/StrictCheckSpanProcessor.java index 6fc3c9cc55bbf..203f77f1550c9 100644 --- a/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/StrictCheckSpanProcessor.java +++ b/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/StrictCheckSpanProcessor.java @@ -24,7 +24,7 @@ public class StrictCheckSpanProcessor implements SpanProcessor { */ public StrictCheckSpanProcessor() {} - private Map spanMap = new ConcurrentHashMap<>(); + private static Map spanMap = new ConcurrentHashMap<>(); @Override public void onStart(Span span) { @@ -61,4 +61,11 @@ private MockSpanData toMockSpanData(Span span) { ); return spanData; } + + /** + * Clears the StrictCheck span storage. + */ + public void clear() { + spanMap.clear(); + } }