Skip to content

Commit

Permalink
HADOOP-19362. RPC metrics should be updated correctly when call is de…
Browse files Browse the repository at this point in the history
…fered. (apache#7224). Contributed by hfutatzhanghb.

Reviewed-by: Jian Zhang <[email protected]>
Signed-off-by: He Xiaoqiao <[email protected]>
  • Loading branch information
hfutatzhanghb authored Dec 20, 2024
1 parent 6cb2e86 commit f32a937
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.retry.RetryPolicy;
Expand All @@ -34,7 +35,6 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.tracing.TraceScope;
import org.apache.hadoop.tracing.Tracer;
import org.apache.hadoop.util.Time;
Expand Down Expand Up @@ -393,28 +393,38 @@ static class ProtobufRpcEngineCallbackImpl
private final RPC.Server server;
private final Call call;
private final String methodName;
private final long setupTime;

public ProtobufRpcEngineCallbackImpl() {
this.server = CURRENT_CALL_INFO.get().getServer();
this.call = Server.getCurCall().get();
this.methodName = CURRENT_CALL_INFO.get().getMethodName();
this.setupTime = Time.now();
}

private void updateProcessingDetails(Call rpcCall, long deltaNanos) {
ProcessingDetails details = rpcCall.getProcessingDetails();
rpcCall.getProcessingDetails().set(ProcessingDetails.Timing.PROCESSING, deltaNanos,
TimeUnit.NANOSECONDS);
deltaNanos -= details.get(ProcessingDetails.Timing.LOCKWAIT, TimeUnit.NANOSECONDS);
deltaNanos -= details.get(ProcessingDetails.Timing.LOCKSHARED, TimeUnit.NANOSECONDS);
deltaNanos -= details.get(ProcessingDetails.Timing.LOCKEXCLUSIVE, TimeUnit.NANOSECONDS);
details.set(ProcessingDetails.Timing.LOCKFREE, deltaNanos, TimeUnit.NANOSECONDS);
}

@Override
public void setResponse(Message message) {
long processingTime = Time.now() - setupTime;
long deltaNanos = Time.monotonicNowNanos() - call.getStartHandleTimestampNanos();
updateProcessingDetails(call, deltaNanos);
call.setDeferredResponse(RpcWritable.wrap(message));
server.updateDeferredMetrics(methodName, processingTime);
server.updateDeferredMetrics(call, methodName, deltaNanos);
}

@Override
public void error(Throwable t) {
long processingTime = Time.now() - setupTime;
String detailedMetricsName = t.getClass().getSimpleName();
server.updateDeferredMetrics(detailedMetricsName, processingTime);
long deltaNanos = Time.monotonicNowNanos() - call.getStartHandleTimestampNanos();
updateProcessingDetails(call, deltaNanos);
call.setDeferredError(t);
String detailedMetricsName = t.getClass().getSimpleName();
server.updateDeferredMetrics(call, detailedMetricsName, deltaNanos);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.ProcessingDetails.Timing;
import org.apache.hadoop.ipc.RPC.RpcInvoker;
import org.apache.hadoop.ipc.protobuf.ProtobufRpcEngine2Protos.RequestHeaderProto;
import org.apache.hadoop.security.UserGroupInformation;
Expand Down Expand Up @@ -425,28 +426,37 @@ static class ProtobufRpcEngineCallbackImpl
private final RPC.Server server;
private final Call call;
private final String methodName;
private final long setupTime;

ProtobufRpcEngineCallbackImpl() {
this.server = CURRENT_CALL_INFO.get().getServer();
this.call = Server.getCurCall().get();
this.methodName = CURRENT_CALL_INFO.get().getMethodName();
this.setupTime = Time.now();
}

private void updateProcessingDetails(Call rpcCall, long deltaNanos) {
ProcessingDetails details = rpcCall.getProcessingDetails();
rpcCall.getProcessingDetails().set(Timing.PROCESSING, deltaNanos, TimeUnit.NANOSECONDS);
deltaNanos -= details.get(Timing.LOCKWAIT, TimeUnit.NANOSECONDS);
deltaNanos -= details.get(Timing.LOCKSHARED, TimeUnit.NANOSECONDS);
deltaNanos -= details.get(Timing.LOCKEXCLUSIVE, TimeUnit.NANOSECONDS);
details.set(Timing.LOCKFREE, deltaNanos, TimeUnit.NANOSECONDS);
}

@Override
public void setResponse(Message message) {
long processingTime = Time.now() - setupTime;
long deltaNanos = Time.monotonicNowNanos() - call.getStartHandleTimestampNanos();
updateProcessingDetails(call, deltaNanos);
call.setDeferredResponse(RpcWritable.wrap(message));
server.updateDeferredMetrics(methodName, processingTime);
server.updateDeferredMetrics(call, methodName, deltaNanos);
}

@Override
public void error(Throwable t) {
long processingTime = Time.now() - setupTime;
String detailedMetricsName = t.getClass().getSimpleName();
server.updateDeferredMetrics(detailedMetricsName, processingTime);
long deltaNanos = Time.monotonicNowNanos() - call.getStartHandleTimestampNanos();
updateProcessingDetails(call, deltaNanos);
call.setDeferredError(t);
String detailedMetricsName = t.getClass().getSimpleName();
server.updateDeferredMetrics(call, detailedMetricsName, deltaNanos);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,13 +351,13 @@ public static Server get() {
* after the call returns.
*/
private static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();

/** @return Get the current call. */
@VisibleForTesting
public static ThreadLocal<Call> getCurCall() {
return CurCall;
}

/**
* Returns the currently active RPC call's sequential ID number. A negative
* call ID indicates an invalid value, such as if there is no currently active
Expand Down Expand Up @@ -638,7 +638,8 @@ void updateMetrics(Call call, long processingStartTimeNanos, boolean connDropped
rpcMetrics.addRpcQueueTime(queueTime);

if (call.isResponseDeferred() || connDropped) {
// call was skipped; don't include it in processing metrics
// The call was skipped; don't include it in processing metrics.
// Will update metrics in method updateDeferredMetrics.
return;
}

Expand Down Expand Up @@ -668,9 +669,41 @@ void updateMetrics(Call call, long processingStartTimeNanos, boolean connDropped
}
}

void updateDeferredMetrics(String name, long processingTime) {
/**
* Update rpc metrics for defered calls.
* @param call The Rpc Call
* @param name Rpc method name
* @param processingTime processing call in ms unit.
*/
void updateDeferredMetrics(Call call, String name, long processingTime) {
long completionTimeNanos = Time.monotonicNowNanos();
long arrivalTimeNanos = call.timestampNanos;

ProcessingDetails details = call.getProcessingDetails();
long waitTime =
details.get(Timing.LOCKWAIT, rpcMetrics.getMetricsTimeUnit());
long responseTime =
details.get(Timing.RESPONSE, rpcMetrics.getMetricsTimeUnit());
rpcMetrics.addRpcLockWaitTime(waitTime);
rpcMetrics.addRpcProcessingTime(processingTime);
rpcMetrics.addRpcResponseTime(responseTime);
rpcMetrics.addDeferredRpcProcessingTime(processingTime);
rpcDetailedMetrics.addDeferredProcessingTime(name, processingTime);
// don't include lock wait for detailed metrics.
processingTime -= waitTime;
rpcDetailedMetrics.addProcessingTime(name, processingTime);

// Overall processing time is from arrival to completion.
long overallProcessingTime = rpcMetrics.getMetricsTimeUnit()
.convert(completionTimeNanos - arrivalTimeNanos, TimeUnit.NANOSECONDS);
rpcDetailedMetrics.addOverallProcessingTime(name, overallProcessingTime);
callQueue.addResponseTime(name, call, details);
if (isLogSlowRPC()) {
logSlowRpcCalls(name, call, details);
}
if (details.getReturnStatus() == RpcStatusProto.SUCCESS) {
rpcMetrics.incrRpcCallSuccesses();
}
}

/**
Expand Down Expand Up @@ -963,6 +996,7 @@ public static class Call implements Schedulable,
final int callId; // the client's call id
final int retryCount; // the retry count of the call
private final long timestampNanos; // time the call was received
protected long startHandleTimestampNanos; // time the call was run
long responseTimestampNanos; // time the call was served
private AtomicInteger responseWaitCount = new AtomicInteger(1);
final RPC.RpcKind rpcKind;
Expand Down Expand Up @@ -1167,6 +1201,15 @@ public void setDeferredError(Throwable t) {
public long getTimestampNanos() {
return timestampNanos;
}


public long getStartHandleTimestampNanos() {
return startHandleTimestampNanos;
}

public void setStartHandleTimestampNanos(long startHandleTimestampNanos) {
this.startHandleTimestampNanos = startHandleTimestampNanos;
}
}

/** A RPC extended call queued for handling. */
Expand Down Expand Up @@ -1243,6 +1286,7 @@ public Void run() throws Exception {
}

long startNanos = Time.monotonicNowNanos();
this.setStartHandleTimestampNanos(startNanos);
Writable value = null;
ResponseParams responseParams = new ResponseParams();

Expand Down Expand Up @@ -1331,6 +1375,7 @@ void doResponse(Throwable t, RpcStatusProto status) throws IOException {
* Send a deferred response, ignoring errors.
*/
private void sendDeferedResponse() {
long startNanos = Time.monotonicNowNanos();
try {
connection.sendResponse(this);
} catch (Exception e) {
Expand All @@ -1342,6 +1387,8 @@ private void sendDeferedResponse() {
.currentThread().getName() + ", CallId="
+ callId + ", hostname=" + getHostAddress());
}
getProcessingDetails().set(Timing.RESPONSE,
Time.monotonicNowNanos() - startNanos, TimeUnit.NANOSECONDS);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.ipc;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
Expand All @@ -26,25 +27,35 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.thirdparty.protobuf.BlockingService;
import org.apache.hadoop.thirdparty.protobuf.RpcController;
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.protobuf.TestProtos;
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcHandoffProto;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;

public class TestProtoBufRpcServerHandoff {

public static final Logger LOG =
LoggerFactory.getLogger(TestProtoBufRpcServerHandoff.class);

@Test(timeout = 20000)
public void test() throws Exception {
Configuration conf = new Configuration();
private static Configuration conf = null;
private static RPC.Server server = null;
private static InetSocketAddress address = null;

@Before
public void setUp() throws IOException {
conf = new Configuration();

TestProtoBufRpcServerHandoffServer serverImpl =
new TestProtoBufRpcServerHandoffServer();
Expand All @@ -53,18 +64,21 @@ public void test() throws Exception {

RPC.setProtocolEngine(conf, TestProtoBufRpcServerHandoffProtocol.class,
ProtobufRpcEngine2.class);
RPC.Server server = new RPC.Builder(conf)
server = new RPC.Builder(conf)
.setProtocol(TestProtoBufRpcServerHandoffProtocol.class)
.setInstance(blockingService)
.setVerbose(true)
.setNumHandlers(1) // Num Handlers explicitly set to 1 for test.
.build();
server.start();

InetSocketAddress address = server.getListenerAddress();
address = server.getListenerAddress();
long serverStartTime = System.currentTimeMillis();
LOG.info("Server started at: " + address + " at time: " + serverStartTime);
}

@Test(timeout = 20000)
public void test() throws Exception {
final TestProtoBufRpcServerHandoffProtocol client = RPC.getProxy(
TestProtoBufRpcServerHandoffProtocol.class, 1, address, conf);

Expand Down Expand Up @@ -93,6 +107,40 @@ public void test() throws Exception {

}

@Test(timeout = 20000)
public void testHandoffMetrics() throws Exception {
final TestProtoBufRpcServerHandoffProtocol client = RPC.getProxy(
TestProtoBufRpcServerHandoffProtocol.class, 1, address, conf);

ExecutorService executorService = Executors.newFixedThreadPool(2);
CompletionService<ClientInvocationCallable> completionService =
new ExecutorCompletionService<ClientInvocationCallable>(
executorService);

completionService.submit(new ClientInvocationCallable(client, 5000L));
completionService.submit(new ClientInvocationCallable(client, 5000L));

long submitTime = System.currentTimeMillis();
Future<ClientInvocationCallable> future1 = completionService.take();
Future<ClientInvocationCallable> future2 = completionService.take();

ClientInvocationCallable callable1 = future1.get();
ClientInvocationCallable callable2 = future2.get();

LOG.info(callable1.toString());
LOG.info(callable2.toString());

// Ensure the 5 second sleep responses are within a reasonable time of each
// other.
Assert.assertTrue(Math.abs(callable1.endTime - callable2.endTime) < 2000L);
Assert.assertTrue(System.currentTimeMillis() - submitTime < 7000L);

// Check rpcMetrics
MetricsRecordBuilder rb = getMetrics(server.rpcMetrics.name());
assertCounterGt("DeferredRpcProcessingTimeNumOps", 1L, rb);
assertCounter("RpcProcessingTimeNumOps", 2L, rb);
}

private static class ClientInvocationCallable
implements Callable<ClientInvocationCallable> {
final TestProtoBufRpcServerHandoffProtocol client;
Expand Down

0 comments on commit f32a937

Please sign in to comment.