Skip to content

Commit

Permalink
[SPARK-25642][YARN] Adding two new metrics to record the number of re…
Browse files Browse the repository at this point in the history
…gistered connections as well as the number of active connections to YARN Shuffle Service

Recently, the ability to expose the metrics for YARN Shuffle Service was added as part of [SPARK-18364](apache#22485). We need to add some metrics to be able to determine the number of active connections as well as open connections to the external shuffle service to benchmark network and connection issues on large cluster environments.

Added two more shuffle server metrics for Spark Yarn shuffle service: numRegisteredConnections which indicate the number of registered connections to the shuffle service and numActiveConnections which indicate the number of active connections to the shuffle service at any given point in time.

If these metrics are outputted to a file, we get something like this:

1533674653489 default.shuffleService: Hostname=server1.abc.com, openBlockRequestLatencyMillis_count=729, openBlockRequestLatencyMillis_rate15=0.7110833548897356, openBlockRequestLatencyMillis_rate5=1.657808981793011, openBlockRequestLatencyMillis_rate1=2.2404486061620474, openBlockRequestLatencyMillis_rateMean=0.9242558551196706,
numRegisteredConnections=35,
blockTransferRateBytes_count=2635880512, blockTransferRateBytes_rate15=2578547.6094160094, blockTransferRateBytes_rate5=6048721.726302424, blockTransferRateBytes_rate1=8548922.518223226, blockTransferRateBytes_rateMean=3341878.633637769, registeredExecutorsSize=5, registerExecutorRequestLatencyMillis_count=5, registerExecutorRequestLatencyMillis_rate15=0.0027973949328659836, registerExecutorRequestLatencyMillis_rate5=0.0021278007987206426, registerExecutorRequestLatencyMillis_rate1=2.8270296777387467E-6, registerExecutorRequestLatencyMillis_rateMean=0.006339206380043053, numActiveConnections=35

Closes apache#22498 from pgandhi999/SPARK-18364.

Authored-by: pgandhi <[email protected]>
Signed-off-by: Marcelo Vanzin <[email protected]>
  • Loading branch information
pgandhi authored and Marcelo Vanzin committed Dec 21, 2018
1 parent d6a5f85 commit 8dd29fe
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.List;

import com.codahale.metrics.Counter;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
Expand Down Expand Up @@ -66,6 +67,8 @@ public class TransportContext {
private final RpcHandler rpcHandler;
private final boolean closeIdleConnections;
private final boolean isClientOnly;
// Number of registered connections to the shuffle service
private Counter registeredConnections = new Counter();

/**
* Force to create MessageEncoder and MessageDecoder so that we can make sure they will be created
Expand Down Expand Up @@ -221,7 +224,7 @@ private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler
TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
rpcHandler, conf.maxChunksBeingTransferred());
return new TransportChannelHandler(client, responseHandler, requestHandler,
conf.connectionTimeoutMs(), closeIdleConnections);
conf.connectionTimeoutMs(), closeIdleConnections, this);
}

/**
Expand All @@ -234,4 +237,8 @@ private ChunkFetchRequestHandler createChunkFetchHandler(TransportChannelHandler
}

public TransportConf getConf() { return conf; }

public Counter getRegisteredConnections() {
return registeredConnections;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import org.apache.spark.network.TransportContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -57,18 +58,21 @@ public class TransportChannelHandler extends SimpleChannelInboundHandler<Message
private final TransportRequestHandler requestHandler;
private final long requestTimeoutNs;
private final boolean closeIdleConnections;
private final TransportContext transportContext;

public TransportChannelHandler(
TransportClient client,
TransportResponseHandler responseHandler,
TransportRequestHandler requestHandler,
long requestTimeoutMs,
boolean closeIdleConnections) {
boolean closeIdleConnections,
TransportContext transportContext) {
this.client = client;
this.responseHandler = responseHandler;
this.requestHandler = requestHandler;
this.requestTimeoutNs = requestTimeoutMs * 1000L * 1000;
this.closeIdleConnections = closeIdleConnections;
this.transportContext = transportContext;
}

public TransportClient getClient() {
Expand Down Expand Up @@ -176,4 +180,16 @@ public TransportResponseHandler getResponseHandler() {
return responseHandler;
}

@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
transportContext.getRegisteredConnections().inc();
super.channelRegistered(ctx);
}

@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
transportContext.getRegisteredConnections().dec();
super.channelUnregistered(ctx);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.concurrent.TimeUnit;

import com.codahale.metrics.Counter;
import com.codahale.metrics.MetricSet;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -159,4 +160,8 @@ public void close() {
}
bootstrap = null;
}

public Counter getRegisteredConnections() {
return context.getRegisteredConnections();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricSet;
import com.codahale.metrics.Timer;
import com.codahale.metrics.Counter;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -173,22 +174,29 @@ private void checkAuth(TransportClient client, String appId) {
/**
* A simple class to wrap all shuffle service wrapper metrics
*/
private class ShuffleMetrics implements MetricSet {
@VisibleForTesting
public class ShuffleMetrics implements MetricSet {
private final Map<String, Metric> allMetrics;
// Time latency for open block request in ms
private final Timer openBlockRequestLatencyMillis = new Timer();
// Time latency for executor registration latency in ms
private final Timer registerExecutorRequestLatencyMillis = new Timer();
// Block transfer rate in byte per second
private final Meter blockTransferRateBytes = new Meter();
// Number of active connections to the shuffle service
private Counter activeConnections = new Counter();
// Number of registered connections to the shuffle service
private Counter registeredConnections = new Counter();

private ShuffleMetrics() {
public ShuffleMetrics() {
allMetrics = new HashMap<>();
allMetrics.put("openBlockRequestLatencyMillis", openBlockRequestLatencyMillis);
allMetrics.put("registerExecutorRequestLatencyMillis", registerExecutorRequestLatencyMillis);
allMetrics.put("blockTransferRateBytes", blockTransferRateBytes);
allMetrics.put("registeredExecutorsSize",
(Gauge<Integer>) () -> blockManager.getRegisteredExecutorsSize());
allMetrics.put("numActiveConnections", activeConnections);
allMetrics.put("numRegisteredConnections", registeredConnections);
}

@Override
Expand Down Expand Up @@ -244,4 +252,16 @@ public ManagedBuffer next() {
}
}

@Override
public void channelActive(TransportClient client) {
metrics.activeConnections.inc();
super.channelActive(client);
}

@Override
public void channelInactive(TransportClient client) {
metrics.activeConnections.dec();
super.channelInactive(client);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -170,15 +170,6 @@ protected void serviceInit(Configuration conf) throws Exception {
TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf));
blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile);

// register metrics on the block handler into the Node Manager's metrics system.
YarnShuffleServiceMetrics serviceMetrics =
new YarnShuffleServiceMetrics(blockHandler.getAllMetrics());

MetricsSystemImpl metricsSystem = (MetricsSystemImpl) DefaultMetricsSystem.instance();
metricsSystem.register(
"sparkShuffleService", "Metrics on the Spark Shuffle Service", serviceMetrics);
logger.info("Registered metrics with Hadoop's DefaultMetricsSystem");

// If authentication is enabled, set up the shuffle server to use a
// special RPC handler that filters out unauthenticated fetch requests
List<TransportServerBootstrap> bootstraps = Lists.newArrayList();
Expand All @@ -199,6 +190,18 @@ protected void serviceInit(Configuration conf) throws Exception {
port = shuffleServer.getPort();
boundPort = port;
String authEnabledString = authEnabled ? "enabled" : "not enabled";

// register metrics on the block handler into the Node Manager's metrics system.
blockHandler.getAllMetrics().getMetrics().put("numRegisteredConnections",
shuffleServer.getRegisteredConnections());
YarnShuffleServiceMetrics serviceMetrics =
new YarnShuffleServiceMetrics(blockHandler.getAllMetrics());

MetricsSystemImpl metricsSystem = (MetricsSystemImpl) DefaultMetricsSystem.instance();
metricsSystem.register(
"sparkShuffleService", "Metrics on the Spark Shuffle Service", serviceMetrics);
logger.info("Registered metrics with Hadoop's DefaultMetricsSystem");

logger.info("Started YARN shuffle service for Spark on port {}. " +
"Authentication is {}. Registered executor file is {}", port, authEnabledString,
registeredExecutorFile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ public static void collectMetric(
throw new IllegalStateException(
"Not supported class type of metric[" + name + "] for value " + gaugeValue);
}
} else if (metric instanceof Counter) {
Counter c = (Counter) metric;
long counterValue = c.getCount();
metricsRecordBuilder.addGauge(new ShuffleServiceMetricsInfo(name, "Number of " +
"connections to shuffle service " + name), counterValue);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
server = transportContext.createServer(port, bootstraps.asJava)

shuffleServiceSource.registerMetricSet(server.getAllMetrics)
blockHandler.getAllMetrics.getMetrics.put("numRegisteredConnections",
server.getRegisteredConnections)
shuffleServiceSource.registerMetricSet(blockHandler.getAllMetrics)
masterMetricsSystem.registerSource(shuffleServiceSource)
masterMetricsSystem.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ class YarnShuffleServiceMetricsSuite extends SparkFunSuite with Matchers {
test("metrics named as expected") {
val allMetrics = Set(
"openBlockRequestLatencyMillis", "registerExecutorRequestLatencyMillis",
"blockTransferRateBytes", "registeredExecutorsSize")
"blockTransferRateBytes", "registeredExecutorsSize", "numActiveConnections",
"numRegisteredConnections")

metrics.getMetrics.keySet().asScala should be (allMetrics)
}
Expand Down

0 comments on commit 8dd29fe

Please sign in to comment.