From 3bcd5bb40f3c7d285b3783cc5158364eda6b9778 Mon Sep 17 00:00:00 2001 From: Finn Carroll Date: Thu, 7 Nov 2024 11:25:17 -0800 Subject: [PATCH] Add GrpcStats to NodeStats. Fix gRPC server stats interceptor prematurely going out of scope. Signed-off-by: Finn Carroll --- .../netty4/Netty4GrpcServerTransport.java | 35 ++++++++++--------- .../admin/cluster/node/stats/NodeStats.java | 16 +++++++++ .../cluster/node/stats/NodesStatsRequest.java | 1 + .../node/stats/TransportNodesStatsAction.java | 1 + .../stats/TransportClusterStatsAction.java | 1 + .../java/org/opensearch/node/NodeService.java | 2 ++ .../MockInternalClusterInfoService.java | 1 + .../opensearch/test/InternalTestCluster.java | 1 + 8 files changed, 42 insertions(+), 16 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/opensearch/grpc/netty4/Netty4GrpcServerTransport.java b/modules/transport-netty4/src/main/java/org/opensearch/grpc/netty4/Netty4GrpcServerTransport.java index ed565d480b8e1..e273d9fc2a696 100644 --- a/modules/transport-netty4/src/main/java/org/opensearch/grpc/netty4/Netty4GrpcServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/opensearch/grpc/netty4/Netty4GrpcServerTransport.java @@ -46,13 +46,15 @@ public class Netty4GrpcServerTransport extends AbstractGrpcServerTransport { private final SharedGroupFactory sharedGroupFactory; private final CopyOnWriteArrayList servers = new CopyOnWriteArrayList<>(); - private final ServerStatsInterceptor sharedServerStatsInterceptor; private volatile SharedGroupFactory.SharedGroup sharedGroup; + private final ServerStatsInterceptor sharedServerStatsInterceptor; + private final AtomicLong currentOpen = new AtomicLong(0); + private final AtomicLong totalOpened = new AtomicLong(0); public Netty4GrpcServerTransport(Settings settings, NetworkService networkService, SharedGroupFactory sharedGroupFactory) { super(settings, networkService); this.sharedGroupFactory = sharedGroupFactory; - this.sharedServerStatsInterceptor = new ServerStatsInterceptor(); + this.sharedServerStatsInterceptor = new ServerStatsInterceptor(currentOpen, totalOpened); } @Override @@ -133,12 +135,17 @@ protected void doClose() {} @Override public GrpcStats stats() { - return new GrpcStats(sharedServerStatsInterceptor.getActiveConnections(), sharedServerStatsInterceptor.getTotalRequests()); + return new GrpcStats(totalOpened.get(), currentOpen.get()); } static class ServerStatsInterceptor implements ServerInterceptor { - private final AtomicLong activeConnections = new AtomicLong(); - private final AtomicLong totalRequests = new AtomicLong(); + private final AtomicLong currentOpen; + private final AtomicLong totalOpened; + + ServerStatsInterceptor(AtomicLong currentOpen, AtomicLong totalOpened) { + this.currentOpen = currentOpen; + this.totalOpened = totalOpened; + } @Override public ServerCall.Listener interceptCall( @@ -146,28 +153,24 @@ public ServerCall.Listener interceptCall( Metadata headers, ServerCallHandler next ) { + logger.debug("Intercepted call - Method: {}, Authority: {}, Headers: {}", + call.getMethodDescriptor().getFullMethodName(), + call.getAuthority(), + headers); - activeConnections.incrementAndGet(); - totalRequests.incrementAndGet(); + currentOpen.incrementAndGet(); + totalOpened.incrementAndGet(); return new ForwardingServerCallListener.SimpleForwardingServerCallListener( next.startCall(new ForwardingServerCall.SimpleForwardingServerCall(call) { @Override public void close(Status status, Metadata trailers) { - activeConnections.decrementAndGet(); + currentOpen.decrementAndGet(); super.close(status, trailers); } }, headers) ) { }; } - - public long getActiveConnections() { - return activeConnections.get(); - } - - public long getTotalRequests() { - return totalRequests.get(); - } } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java index eb79e3403a25c..068a8c160f076 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java @@ -46,6 +46,7 @@ import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.discovery.DiscoveryStats; +import org.opensearch.grpc.GrpcStats; import org.opensearch.http.HttpStats; import org.opensearch.index.SegmentReplicationRejectionStats; import org.opensearch.index.stats.IndexingPressureStats; @@ -106,6 +107,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment { @Nullable private HttpStats http; + @Nullable + private GrpcStats grpc; + @Nullable private AllCircuitBreakerStats breaker; @@ -179,6 +183,7 @@ public NodeStats(StreamInput in) throws IOException { fs = in.readOptionalWriteable(FsInfo::new); transport = in.readOptionalWriteable(TransportStats::new); http = in.readOptionalWriteable(HttpStats::new); + grpc = in.readOptionalWriteable(GrpcStats::new); breaker = in.readOptionalWriteable(AllCircuitBreakerStats::new); scriptStats = in.readOptionalWriteable(ScriptStats::new); discoveryStats = in.readOptionalWriteable(DiscoveryStats::new); @@ -265,6 +270,7 @@ public NodeStats( @Nullable FsInfo fs, @Nullable TransportStats transport, @Nullable HttpStats http, + @Nullable GrpcStats grpc, @Nullable AllCircuitBreakerStats breaker, @Nullable ScriptStats scriptStats, @Nullable DiscoveryStats discoveryStats, @@ -296,6 +302,7 @@ public NodeStats( this.fs = fs; this.transport = transport; this.http = http; + this.grpc = grpc; this.breaker = breaker; this.scriptStats = scriptStats; this.discoveryStats = discoveryStats; @@ -385,6 +392,11 @@ public HttpStats getHttp() { return this.http; } + @Nullable + public GrpcStats getGrpc() { + return this.grpc; + } + @Nullable public AllCircuitBreakerStats getBreaker() { return this.breaker; @@ -500,6 +512,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalWriteable(fs); out.writeOptionalWriteable(transport); out.writeOptionalWriteable(http); + out.writeOptionalWriteable(grpc); out.writeOptionalWriteable(breaker); out.writeOptionalWriteable(scriptStats); out.writeOptionalWriteable(discoveryStats); @@ -592,6 +605,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (getHttp() != null) { getHttp().toXContent(builder, params); } + if (getGrpc() != null) { + getGrpc().toXContent(builder, params); + } if (getBreaker() != null) { getBreaker().toXContent(builder, params); } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java index a5b00ed82d3cb..153d84b53d2c5 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java @@ -202,6 +202,7 @@ public enum Metric { FS("fs"), TRANSPORT("transport"), HTTP("http"), + GRPC("grpc"), BREAKER("breaker"), SCRIPT("script"), DISCOVERY("discovery"), diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java index a98d245af872b..9d8791589e6f3 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java @@ -111,6 +111,7 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) { NodesStatsRequest.Metric.FS.containedIn(metrics), NodesStatsRequest.Metric.TRANSPORT.containedIn(metrics), NodesStatsRequest.Metric.HTTP.containedIn(metrics), + NodesStatsRequest.Metric.GRPC.containedIn(metrics), NodesStatsRequest.Metric.BREAKER.containedIn(metrics), NodesStatsRequest.Metric.SCRIPT.containedIn(metrics), NodesStatsRequest.Metric.DISCOVERY.containedIn(metrics), diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java index c6581b99eb559..3f49fa6f26d8a 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -182,6 +182,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq false, false, false, + false, isMetricRequired(Metric.INGEST, nodeRequest.request), false, false, diff --git a/server/src/main/java/org/opensearch/node/NodeService.java b/server/src/main/java/org/opensearch/node/NodeService.java index b05c89e70be16..81c050a7c7835 100644 --- a/server/src/main/java/org/opensearch/node/NodeService.java +++ b/server/src/main/java/org/opensearch/node/NodeService.java @@ -228,6 +228,7 @@ public NodeStats stats( boolean fs, boolean transport, boolean http, + boolean grpc, boolean circuitBreaker, boolean script, boolean discoveryStats, @@ -262,6 +263,7 @@ public NodeStats stats( fs ? monitorService.fsService().stats() : null, transport ? transportService.stats() : null, http ? (httpServerTransport == null ? null : httpServerTransport.stats()) : null, + grpc ? (grpcServerTransport == null ? null : grpcServerTransport.stats()) : null, circuitBreaker ? circuitBreakerService.stats() : null, script ? scriptService.stats() : null, discoveryStats ? discovery.stats() : null, diff --git a/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java b/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java index ded457601c0ae..aeeaedc7dd321 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java +++ b/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java @@ -107,6 +107,7 @@ List adjustNodesStats(List nodesStats) { ), nodeStats.getTransport(), nodeStats.getHttp(), + nodeStats.getGrpc(), nodeStats.getBreaker(), nodeStats.getScriptStats(), nodeStats.getDiscoveryStats(), diff --git a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java index fa5fb736f518f..3595bf6567de2 100644 --- a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java @@ -2753,6 +2753,7 @@ public void ensureEstimatedStats() { false, false, false, + false, false ); assertThat(