Skip to content

Commit

Permalink
Add GrpcStats to NodeStats. Fix gRPC server stats interceptor prematu…
Browse files Browse the repository at this point in the history
…rely going out of scope.

Signed-off-by: Finn Carroll <[email protected]>
  • Loading branch information
finnegancarroll committed Nov 7, 2024
1 parent ca6b2d3 commit 3bcd5bb
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,15 @@ public class Netty4GrpcServerTransport extends AbstractGrpcServerTransport {

private final SharedGroupFactory sharedGroupFactory;
private final CopyOnWriteArrayList<Server> servers = new CopyOnWriteArrayList<>();
private final ServerStatsInterceptor sharedServerStatsInterceptor;
private volatile SharedGroupFactory.SharedGroup sharedGroup;
private final ServerStatsInterceptor sharedServerStatsInterceptor;
private final AtomicLong currentOpen = new AtomicLong(0);
private final AtomicLong totalOpened = new AtomicLong(0);

public Netty4GrpcServerTransport(Settings settings, NetworkService networkService, SharedGroupFactory sharedGroupFactory) {
super(settings, networkService);
this.sharedGroupFactory = sharedGroupFactory;
this.sharedServerStatsInterceptor = new ServerStatsInterceptor();
this.sharedServerStatsInterceptor = new ServerStatsInterceptor(currentOpen, totalOpened);
}

@Override
Expand Down Expand Up @@ -133,41 +135,42 @@ protected void doClose() {}

@Override
public GrpcStats stats() {
return new GrpcStats(sharedServerStatsInterceptor.getActiveConnections(), sharedServerStatsInterceptor.getTotalRequests());
return new GrpcStats(totalOpened.get(), currentOpen.get());
}

static class ServerStatsInterceptor implements ServerInterceptor {
private final AtomicLong activeConnections = new AtomicLong();
private final AtomicLong totalRequests = new AtomicLong();
private final AtomicLong currentOpen;
private final AtomicLong totalOpened;

ServerStatsInterceptor(AtomicLong currentOpen, AtomicLong totalOpened) {
this.currentOpen = currentOpen;
this.totalOpened = totalOpened;
}

@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next
) {
logger.debug("Intercepted call - Method: {}, Authority: {}, Headers: {}",
call.getMethodDescriptor().getFullMethodName(),
call.getAuthority(),
headers);

activeConnections.incrementAndGet();
totalRequests.incrementAndGet();
currentOpen.incrementAndGet();
totalOpened.incrementAndGet();

return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(
next.startCall(new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void close(Status status, Metadata trailers) {
activeConnections.decrementAndGet();
currentOpen.decrementAndGet();
super.close(status, trailers);
}
}, headers)
) {
};
}

public long getActiveConnections() {
return activeConnections.get();
}

public long getTotalRequests() {
return totalRequests.get();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.discovery.DiscoveryStats;
import org.opensearch.grpc.GrpcStats;
import org.opensearch.http.HttpStats;
import org.opensearch.index.SegmentReplicationRejectionStats;
import org.opensearch.index.stats.IndexingPressureStats;
Expand Down Expand Up @@ -106,6 +107,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
@Nullable
private HttpStats http;

@Nullable
private GrpcStats grpc;

@Nullable
private AllCircuitBreakerStats breaker;

Expand Down Expand Up @@ -179,6 +183,7 @@ public NodeStats(StreamInput in) throws IOException {
fs = in.readOptionalWriteable(FsInfo::new);
transport = in.readOptionalWriteable(TransportStats::new);
http = in.readOptionalWriteable(HttpStats::new);
grpc = in.readOptionalWriteable(GrpcStats::new);
breaker = in.readOptionalWriteable(AllCircuitBreakerStats::new);
scriptStats = in.readOptionalWriteable(ScriptStats::new);
discoveryStats = in.readOptionalWriteable(DiscoveryStats::new);
Expand Down Expand Up @@ -265,6 +270,7 @@ public NodeStats(
@Nullable FsInfo fs,
@Nullable TransportStats transport,
@Nullable HttpStats http,
@Nullable GrpcStats grpc,
@Nullable AllCircuitBreakerStats breaker,
@Nullable ScriptStats scriptStats,
@Nullable DiscoveryStats discoveryStats,
Expand Down Expand Up @@ -296,6 +302,7 @@ public NodeStats(
this.fs = fs;
this.transport = transport;
this.http = http;
this.grpc = grpc;
this.breaker = breaker;
this.scriptStats = scriptStats;
this.discoveryStats = discoveryStats;
Expand Down Expand Up @@ -385,6 +392,11 @@ public HttpStats getHttp() {
return this.http;
}

@Nullable
public GrpcStats getGrpc() {
return this.grpc;
}

@Nullable
public AllCircuitBreakerStats getBreaker() {
return this.breaker;
Expand Down Expand Up @@ -500,6 +512,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(fs);
out.writeOptionalWriteable(transport);
out.writeOptionalWriteable(http);
out.writeOptionalWriteable(grpc);
out.writeOptionalWriteable(breaker);
out.writeOptionalWriteable(scriptStats);
out.writeOptionalWriteable(discoveryStats);
Expand Down Expand Up @@ -592,6 +605,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (getHttp() != null) {
getHttp().toXContent(builder, params);
}
if (getGrpc() != null) {
getGrpc().toXContent(builder, params);
}
if (getBreaker() != null) {
getBreaker().toXContent(builder, params);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ public enum Metric {
FS("fs"),
TRANSPORT("transport"),
HTTP("http"),
GRPC("grpc"),
BREAKER("breaker"),
SCRIPT("script"),
DISCOVERY("discovery"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
NodesStatsRequest.Metric.FS.containedIn(metrics),
NodesStatsRequest.Metric.TRANSPORT.containedIn(metrics),
NodesStatsRequest.Metric.HTTP.containedIn(metrics),
NodesStatsRequest.Metric.GRPC.containedIn(metrics),
NodesStatsRequest.Metric.BREAKER.containedIn(metrics),
NodesStatsRequest.Metric.SCRIPT.containedIn(metrics),
NodesStatsRequest.Metric.DISCOVERY.containedIn(metrics),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
false,
false,
false,
false,
isMetricRequired(Metric.INGEST, nodeRequest.request),
false,
false,
Expand Down
2 changes: 2 additions & 0 deletions server/src/main/java/org/opensearch/node/NodeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ public NodeStats stats(
boolean fs,
boolean transport,
boolean http,
boolean grpc,
boolean circuitBreaker,
boolean script,
boolean discoveryStats,
Expand Down Expand Up @@ -262,6 +263,7 @@ public NodeStats stats(
fs ? monitorService.fsService().stats() : null,
transport ? transportService.stats() : null,
http ? (httpServerTransport == null ? null : httpServerTransport.stats()) : null,
grpc ? (grpcServerTransport == null ? null : grpcServerTransport.stats()) : null,
circuitBreaker ? circuitBreakerService.stats() : null,
script ? scriptService.stats() : null,
discoveryStats ? discovery.stats() : null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ List<NodeStats> adjustNodesStats(List<NodeStats> nodesStats) {
),
nodeStats.getTransport(),
nodeStats.getHttp(),
nodeStats.getGrpc(),
nodeStats.getBreaker(),
nodeStats.getScriptStats(),
nodeStats.getDiscoveryStats(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2753,6 +2753,7 @@ public void ensureEstimatedStats() {
false,
false,
false,
false,
false
);
assertThat(
Expand Down

0 comments on commit 3bcd5bb

Please sign in to comment.