Skip to content

Commit

Permalink
Fix the telemetry strict check framework
Browse files Browse the repository at this point in the history
Signed-off-by: Gagan Juneja <[email protected]>
  • Loading branch information
Gagan Juneja committed Sep 6, 2023
1 parent 41a7804 commit dc78ebe
Show file tree
Hide file tree
Showing 11 changed files with 122 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public Path nodeConfigPath(int nodeOrdinal) {
assertThat(second.nodes().getSize(), equalTo(1));
assertThat(first.nodes().getClusterManagerNodeId(), not(equalTo(second.nodes().getClusterManagerNodeId())));
assertThat(first.metadata().clusterUUID(), not(equalTo(second.metadata().clusterUUID())));
internalCluster().stopNodesAndClients();
}
}

Expand Down Expand Up @@ -178,6 +179,7 @@ public Path nodeConfigPath(int nodeOrdinal) {
final ClusterState first = internalCluster().getInstance(ClusterService.class).state();
assertThat(first.nodes().getSize(), equalTo(1));
assertBusy(() -> mockAppender.assertAllExpectationsMatched());
internalCluster().stopNodesAndClients();
}
}
}
Expand Down
9 changes: 9 additions & 0 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.tasks.TaskResultsService;
import org.opensearch.tasks.consumer.TopNSearchTasksLogger;
import org.opensearch.telemetry.Telemetry;
import org.opensearch.telemetry.TelemetryModule;
import org.opensearch.telemetry.TelemetrySettings;
import org.opensearch.telemetry.tracing.NoopTracerFactory;
Expand Down Expand Up @@ -392,6 +393,8 @@ public static class DiscoverySettings {
private FileCache fileCache;
private final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory;

private final Telemetry telemetry;

public Node(Environment environment) {
this(environment, Collections.emptyList(), true);
}
Expand Down Expand Up @@ -587,9 +590,11 @@ protected Node(
final TelemetrySettings telemetrySettings = new TelemetrySettings(settings, clusterService.getClusterSettings());
List<TelemetryPlugin> telemetryPlugins = pluginsService.filterPlugins(TelemetryPlugin.class);
TelemetryModule telemetryModule = new TelemetryModule(telemetryPlugins, telemetrySettings);
telemetry = telemetryModule.getTelemetry().orElse(null);
tracerFactory = new TracerFactory(telemetrySettings, telemetryModule.getTelemetry(), threadPool.getThreadContext());
} else {
tracerFactory = new NoopTracerFactory();
telemetry = null;
}

tracer = tracerFactory.getTracer();
Expand Down Expand Up @@ -1805,4 +1810,8 @@ private void initializeFileCache(Settings settings, CircuitBreaker circuitBreake
public FileCache fileCache() {
return this.fileCache;
}

protected Optional<Telemetry> getTelemetry() {
return Optional.ofNullable(telemetry);
}
}
37 changes: 19 additions & 18 deletions server/src/main/java/org/opensearch/transport/TransportService.java
Original file line number Diff line number Diff line change
Expand Up @@ -865,15 +865,15 @@ public final <T extends TransportResponse> void sendRequest(
final TransportRequestOptions options,
final TransportResponseHandler<T> handler
) {
try {
logger.debug("Action: " + action);
final Span span = tracer.startSpan(SpanBuilder.from(action, connection));
try (SpanScope spanScope = tracer.withSpanInScope(span)) {
final TransportResponseHandler<T> traceableTransportResponseHandler = TraceableTransportResponseHandler.create(
handler,
span,
tracer
);
logger.debug("Action: " + action);
final Span span = tracer.startSpan(SpanBuilder.from(action, connection));
try (SpanScope spanScope = tracer.withSpanInScope(span)) {
final TransportResponseHandler<T> traceableTransportResponseHandler = TraceableTransportResponseHandler.create(
handler,
span,
tracer
);
try {
final TransportResponseHandler<T> delegate;
if (request.getParentTask().isSet()) {
// TODO: capture the connection instead so that we can cancel child tasks on the remote connections.
Expand Down Expand Up @@ -913,16 +913,17 @@ public String toString() {
delegate = traceableTransportResponseHandler;
}
asyncSender.sendRequest(connection, action, request, options, delegate);
} catch (final Exception ex) {
// the caller might not handle this so we invoke the handler
final TransportException te;
if (ex instanceof TransportException) {
te = (TransportException) ex;
} else {
te = new TransportException("failure to send", ex);
}
span.endSpan();
traceableTransportResponseHandler.handleException(te);
}
} catch (final Exception ex) {
// the caller might not handle this so we invoke the handler
final TransportException te;
if (ex instanceof TransportException) {
te = (TransportException) ex;
} else {
te = new TransportException("failure to send", ex);
}
handler.handleException(te);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.opensearch.search.SearchService;
import org.opensearch.search.fetch.FetchPhase;
import org.opensearch.search.query.QueryPhase;
import org.opensearch.telemetry.Telemetry;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.test.MockHttpTransport;
import org.opensearch.test.transport.MockTransportService;
Expand All @@ -72,6 +73,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.Function;
Expand Down Expand Up @@ -272,4 +274,9 @@ protected void configureNodeAndClusterIdStateListener(ClusterService clusterServ
public NamedWriteableRegistry getNamedWriteableRegistry() {
return namedWriteableRegistry;
}

@Override
public Optional<Telemetry> getTelemetry() {
return super.getTelemetry();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,10 @@
import org.opensearch.script.ScriptService;
import org.opensearch.search.SearchService;
import org.opensearch.tasks.TaskManager;
import org.opensearch.telemetry.Telemetry;
import org.opensearch.test.disruption.ServiceDisruptionScheme;
import org.opensearch.test.telemetry.MockTelemetry;
import org.opensearch.test.telemetry.tracing.MockTracingTelemetry;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportService;
import org.opensearch.transport.TransportSettings;
Expand All @@ -137,6 +140,7 @@
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
Expand Down Expand Up @@ -957,16 +961,25 @@ public Client smartClient() {

@Override
public synchronized void close() throws IOException {
/**
* MockTracingTelemetry:validateTracingStateOnShutdown validates the shared span storage across all the nodes.
* Hence, we just need any one node to ensure the correctness of the system.
*/
Node anyNodeToGetRefOfTelemetry = null;
if (this.open.compareAndSet(true, false)) {
if (activeDisruptionScheme != null) {
activeDisruptionScheme.testClusterClosed();
activeDisruptionScheme = null;
}
try {
if (nodes.firstEntry() != null && nodes.firstEntry().getValue() != null) {
anyNodeToGetRefOfTelemetry = nodes.firstEntry().getValue().node;
}
IOUtils.close(nodes.values());
} finally {
nodes = Collections.emptyNavigableMap();
executor.shutdownNow();
validateTracingTerminalState(anyNodeToGetRefOfTelemetry);
}
}
}
Expand Down Expand Up @@ -1908,19 +1921,38 @@ private void stopNodesAndClient(NodeAndClient nodeAndClient) throws IOException
stopNodesAndClients(Collections.singleton(nodeAndClient));
}

/**
* Stop the nodes and clients.
* @throws IOException exception
*/
public void stopNodesAndClients() throws IOException {
stopNodesAndClients(nodes.values());
}

private synchronized void stopNodesAndClients(Collection<NodeAndClient> nodeAndClients) throws IOException {
final Set<String> excludedNodeIds = excludeClusterManagers(nodeAndClients);

for (NodeAndClient nodeAndClient : nodeAndClients) {
removeDisruptionSchemeFromNode(nodeAndClient);
final NodeAndClient previous = removeNode(nodeAndClient);
assert previous == nodeAndClient;
nodeAndClient.close();
}

removeExclusions(excludedNodeIds);
}

private void validateTracingTerminalState(Node node) {
if (node != null) {
Optional<Telemetry> telemetry = ((MockNode) node).getTelemetry();
if (isValidMockTracingTelemetry(telemetry)) {
((MockTracingTelemetry) telemetry.get().getTracingTelemetry()).validateTracingStateOnShutdown();
}
}
}

private static boolean isValidMockTracingTelemetry(Optional<Telemetry> telemetry) {
return telemetry.isPresent() && (telemetry.get() instanceof MockTelemetry) && (telemetry.get().getTracingTelemetry() != null);
}

/**
* Restarts a random data node in the cluster
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2085,7 +2085,7 @@ protected boolean addMockGeoShapeFieldMapper() {
* @return boolean.
*/
protected boolean addMockTelemetryPlugin() {
return false;
return true;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,11 @@
import org.opensearch.script.MockScriptService;
import org.opensearch.search.SearchService;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.telemetry.Telemetry;
import org.opensearch.telemetry.TelemetrySettings;
import org.opensearch.test.telemetry.MockTelemetry;
import org.opensearch.test.telemetry.MockTelemetryPlugin;
import org.opensearch.test.telemetry.tracing.MockTracingTelemetry;
import org.opensearch.transport.TransportSettings;
import org.junit.AfterClass;
import org.junit.BeforeClass;
Expand All @@ -83,6 +86,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import static org.opensearch.cluster.coordination.ClusterBootstrapService.INITIAL_CLUSTER_MANAGER_NODES_SETTING;
Expand All @@ -99,10 +103,12 @@
public abstract class OpenSearchSingleNodeTestCase extends OpenSearchTestCase {

private static Node NODE = null;
private static Node nodeForTracingStrictCheck = null;

protected void startNode(long seed) throws Exception {
assert NODE == null;
NODE = RandomizedContext.current().runWithPrivateRandomness(seed, this::newNode);
nodeForTracingStrictCheck = NODE;
// we must wait for the node to actually be up and running. otherwise the node might have started,
// elected itself cluster-manager but might not yet have removed the
// SERVICE_UNAVAILABLE/1/state not recovered / initialized block
Expand Down Expand Up @@ -190,6 +196,21 @@ public static void setUpClass() throws Exception {
@AfterClass
public static void tearDownClass() throws Exception {
stopNode();
validateTracingTerminalState(nodeForTracingStrictCheck);
nodeForTracingStrictCheck = null;
}

private static void validateTracingTerminalState(Node node) {
if (node != null) {
Optional<Telemetry> telemetry = ((MockNode) node).getTelemetry();
if (isValidMockTracingTelemetry(telemetry)) {
((MockTracingTelemetry) telemetry.get().getTracingTelemetry()).validateTracingStateOnShutdown();
}
}
}

private static boolean isValidMockTracingTelemetry(Optional<Telemetry> telemetry) {
return telemetry.isPresent() && telemetry.get() instanceof MockTelemetry && telemetry.get().getTracingTelemetry() != null;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,46 +13,22 @@
import org.opensearch.telemetry.metrics.MetricsTelemetry;
import org.opensearch.telemetry.tracing.TracingTelemetry;
import org.opensearch.test.telemetry.tracing.MockTracingTelemetry;
import org.opensearch.threadpool.ThreadPool;

import java.util.concurrent.TimeUnit;

/**
* Mock {@link Telemetry} implementation for testing.
*/
public class MockTelemetry implements Telemetry {
private final ThreadPool threadPool;

/**
* Constructor with settings.
* @param settings telemetry settings.
*/
public MockTelemetry(TelemetrySettings settings) {
this(settings, null);
}

/**
* Constructor with settings.
* @param settings telemetry settings.
* @param threadPool thread pool to watch for termination
*/
public MockTelemetry(TelemetrySettings settings, ThreadPool threadPool) {
this.threadPool = threadPool;
}

@Override
public TracingTelemetry getTracingTelemetry() {
return new MockTracingTelemetry(() -> {
// There could be some asynchronous tasks running that we should await for before the closing
// up the tracer instance.
if (threadPool != null) {
try {
threadPool.awaitTermination(10, TimeUnit.SECONDS);
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
});
return new MockTracingTelemetry();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,34 +8,18 @@

package org.opensearch.test.telemetry;

import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.SetOnce;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.TelemetryPlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
import org.opensearch.telemetry.Telemetry;
import org.opensearch.telemetry.TelemetrySettings;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.watcher.ResourceWatcherService;

import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.function.Supplier;

/**
* Mock {@link TelemetryPlugin} implementation for testing.
*/
public class MockTelemetryPlugin extends Plugin implements TelemetryPlugin {
private static final String MOCK_TRACER_NAME = "mock";
private final SetOnce<ThreadPool> threadPool = new SetOnce<>();

/**
* Base constructor.
Expand All @@ -44,27 +28,9 @@ public MockTelemetryPlugin() {

}

@Override
public Collection<Object> createComponents(
Client client,
ClusterService clusterService,
ThreadPool threadPool,
ResourceWatcherService resourceWatcherService,
ScriptService scriptService,
NamedXContentRegistry xContentRegistry,
Environment environment,
NodeEnvironment nodeEnvironment,
NamedWriteableRegistry namedWriteableRegistry,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
this.threadPool.set(threadPool);
return Collections.emptyList();
}

@Override
public Optional<Telemetry> getTelemetry(TelemetrySettings settings) {
return Optional.of(new MockTelemetry(settings, threadPool.get()));
return Optional.of(new MockTelemetry(settings));
}

@Override
Expand Down
Loading

0 comments on commit dc78ebe

Please sign in to comment.