Skip to content

Commit

Permalink
[hadoop 3.3] LIHADOOP-75599 : Feature in RouterRpcClient to abort inv…
Browse files Browse the repository at this point in the history
…oking RPCs to the NameNode when the initial peer caller is closed prior to invocation (apache#411)
  • Loading branch information
Raymond Lam authored Apr 4, 2024
1 parent b1e7006 commit 28e27d6
Show file tree
Hide file tree
Showing 8 changed files with 221 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -877,7 +877,7 @@ public Call(int id, int retryCount, Void ignore1, Void ignore2,
*
* @return true
*/
boolean isOpen() {
public boolean isOpen() {
return true;
}

Expand Down Expand Up @@ -1018,6 +1018,13 @@ public void setDeferredError(Throwable t) {
public long getTimestampNanos() {
return timestampNanos;
}

/**
* Connection of the incoming peer making the call
*/
public Connection getConnection() {
return null;
}
}

/** A RPC extended call queued for handling. */
Expand Down Expand Up @@ -1056,7 +1063,7 @@ private class RpcCall extends Call {
}

@Override
boolean isOpen() {
public boolean isOpen() {
return connection.channel.isOpen();
}

Expand All @@ -1081,6 +1088,11 @@ public InetAddress getHostInetAddress() {
return connection.getHostInetAddress();
}

@Override
public Connection getConnection() {
return connection;
}

@Override
public int getRemotePort() {
return connection.getRemotePort();
Expand Down Expand Up @@ -1996,6 +2008,10 @@ public Server getServer() {
return Server.this;
}

public SocketChannel getChannel() {
return channel;
}

/* Return true if the connection has no outstanding rpc */
private boolean isIdle() {
return rpcCount.get() == 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,4 +142,10 @@ public interface FederationRPCMBean {
* @return Number of operations accepted of each namespace.
*/
String getProxyOpPermitAcceptedPerNs();

/**
* Get the number of operations skipped due to incoming peer closing its Connection Channel.
* @return Number of operations skipped due to incoming peer closing its Connection Channel.
*/
long getSkippedProxyOpPeerClosedChannel();
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ public class FederationRPCMetrics implements FederationRPCMBean {

@Metric("Number of operations to hit permit limits")
private MutableCounterLong proxyOpPermitRejected;

@Metric("Number of RPCs skipped due to incoming peer closing its Connection Channel prior to RPC invocation")
private MutableCounterLong skippedProxyOpPeerClosedChannel;

public FederationRPCMetrics(Configuration conf, RouterRpcServer rpcServer) {
this.rpcServer = rpcServer;
Expand Down Expand Up @@ -322,4 +325,13 @@ public String getProxyOpPermitRejectedPerNs() {
public String getProxyOpPermitAcceptedPerNs() {
return rpcServer.getRPCClient().getAcceptedPermitsPerNsJSON();
}

public void incrSkippedProxyOpPeerClosedChannel() {
skippedProxyOpPeerClosedChannel.incr();
}

@Override
public long getSkippedProxyOpPeerClosedChannel() {
return skippedProxyOpPeerClosedChannel.value();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;

import java.io.IOException;


/**
* Exception when the incoming peer (caller of the local router) has closed their Connection Channel before
* the local router has proxied or retried their RPC.
*/
public class PeerClosedConnectionChannelException extends IOException {

private static final long serialVersionUID = 1L;

public PeerClosedConnectionChannelException(String msg) {
super(msg);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -372,4 +372,10 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
NoRouterRpcFairnessPolicyController.class;
public static final String DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX =
FEDERATION_ROUTER_FAIRNESS_PREFIX + "handler.count.";

// LIHADOOP-75599 : Feature flag to enable skipping RPCs upon closed connection of incoming peer to this router
public static final String DFS_ROUTER_CLIENT_SKIP_CLOSED_CONNECTION_RPC_ENABLED =
FEDERATION_ROUTER_PREFIX + "client.skip-closed-connection-rpc.enabled";
public static final boolean DFS_ROUTER_CLIENT_SKIP_CLOSED_CONNECTION_RPC_ENABLED_DEFAULT = false;

}
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ public class RouterRpcClient {
private Map<String, LongAdder> acceptedPermitsPerNs = new ConcurrentHashMap<>();

private final boolean enableProxyUser;

private final boolean skipClosedConnectionRpc;

/**
* Create a router RPC client to manage remote procedure calls to NNs.
Expand Down Expand Up @@ -244,6 +246,10 @@ public RouterRpcClient(Configuration conf, Router router,
activeNNStateIdRefreshPeriodMs);
}
this.lastActiveNNRefreshTimes = new ConcurrentHashMap<>();

this.skipClosedConnectionRpc = conf.getBoolean(
RBFConfigKeys.DFS_ROUTER_CLIENT_SKIP_CLOSED_CONNECTION_RPC_ENABLED,
RBFConfigKeys.DFS_ROUTER_CLIENT_SKIP_CLOSED_CONNECTION_RPC_ENABLED_DEFAULT);
}

/**
Expand Down Expand Up @@ -371,6 +377,15 @@ public String getAsyncCallerPoolJson() {
return JSON.toString(info);
}

/**
* Get the connectionManager of this client.
* @return connectionManager.
*/
@VisibleForTesting
public ConnectionManager getConnectionManager() {
return connectionManager;
}

/**
* JSON representation of the rejected permits for each nameservice.
*
Expand Down Expand Up @@ -401,7 +416,8 @@ public String getAcceptedPermitsPerNsJSON() {
* NN + current user.
* @throws IOException If we cannot get a connection to the NameNode.
*/
private ConnectionContext getConnection(UserGroupInformation ugi, String nsId,
@VisibleForTesting
public ConnectionContext getConnection(UserGroupInformation ugi, String nsId,
String rpcAddress, Class<?> proto) throws IOException {
ConnectionContext connection = null;
try {
Expand Down Expand Up @@ -534,6 +550,24 @@ private Object invokeMethod(
connection = this.getConnection(ugi, nsId, rpcAddress, protocol);
ProxyAndInfo<?> client = connection.getClient();
final Object proxy = client.getProxy();

// LIHADOOP-75599: Check to see if the incoming peer is still open before trying to issue the RPC Call.
if (skipClosedConnectionRpc) {
final Call curCall = Server.getCurCall().get();
if (curCall != null && !curCall.isOpen()) {
String msg = "Aborting " + method.getName() + " due to Closed Connection Channel of incoming caller peer "
+ CallerContext.CLIENT_IP_STR + "=" + Server.getRemoteAddress() + ", "
+ CallerContext.CLIENT_PORT_STR + "=" + Integer.toString(Server.getRemotePort()) + ", "
+ CallerContext.CLIENT_ID_STR + "=" + StringUtils.byteToHexString(Server.getClientId()) + ", "
+ CallerContext.CLIENT_CALL_ID_STR + "=" + Integer.toString(Server.getCallId());
LOG.info(msg);

if (rpcMonitor != null) {
rpcMonitor.getRPCMetrics().incrSkippedProxyOpPeerClosedChannel();
}
throw new PeerClosedConnectionChannelException(msg);
}
}

ret = invoke(nsId, 0, method, proxy, params);
if (failover &&
Expand Down Expand Up @@ -598,6 +632,8 @@ private Object invokeMethod(
nsId, rpcAddress, ioe.getMessage());
// Throw RetriableException so that client can retry
throw new RetriableException(ioe);
} else if (ioe instanceof PeerClosedConnectionChannelException) {
throw ioe;
} else {
// Other communication error, this is a failure
// Communication retries are handled by the retry policy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
import org.apache.hadoop.hdfs.server.federation.router.ConnectionContext;
import org.apache.hadoop.hdfs.server.federation.router.ConnectionManager;
import org.apache.hadoop.hdfs.server.federation.router.Router;
import org.apache.hadoop.hdfs.server.federation.router.RouterClient;
Expand All @@ -83,6 +84,7 @@
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
Expand Down Expand Up @@ -399,6 +401,64 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
spyConnectionManager);
}

/**
* Simulate that a RouterRpcServer, connection between it and client is
* closed before sending rpc to namenode.
* @param server RouterRpcServer
* @param reservedRpcClientList the list of original rpcClient
* @throws IOException
*/
public static void simulateClientConnectionClose(
final RouterRpcServer server,
final List<RouterRpcClient> reservedRpcClientList)
throws IOException {
RouterRpcClient rpcClient = server.getRPCClient();
RouterRpcClient spyClient = spy(rpcClient);
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
UserGroupInformation ugi =
invocation.getArgument(0, UserGroupInformation.class);
String nsId = invocation.getArgument(1, String.class);
String rpcAddress = invocation.getArgument(2, String.class);
Class cls = invocation.getArgument(3, Class.class);

ConnectionContext connection = null;
UserGroupInformation connUGI = ugi;
if (UserGroupInformation.isSecurityEnabled()) {
UserGroupInformation routerUser = UserGroupInformation.getLoginUser();
connUGI = UserGroupInformation.createProxyUser(
ugi.getUserName(), routerUser);
}
connection = spyClient.getConnectionManager()
.getConnection(connUGI, rpcAddress, cls, nsId);

// Close the connection channel of current call
Server.Call curCall = Server.getCurCall().get();
if (curCall.getConnection() != null) {
curCall.getConnection().getChannel().close();
}
return connection;
}
}).when(spyClient).getConnection(any(UserGroupInformation.class),
any(String.class), any(String.class), any(Class.class));

reservedRpcClientList.add(rpcClient);
Whitebox.setInternalState(server.getClientProtocolModule(),
"rpcClient", spyClient);
}

/**
* Recovery the rpcClient of RouterRpcServer.
* @param server RouterRpcServer
* @param rpcClient RpcClient
*/
public static void recoveryClientConnection(final RouterRpcServer server,
final RouterRpcClient rpcClient) {
Whitebox.setInternalState(server.getClientProtocolModule(),
"rpcClient", rpcClient);
}

/**
* Switch namenodes of all hdfs name services to standby.
* @param cluster a federated HDFS cluster
Expand Down
Loading

0 comments on commit 28e27d6

Please sign in to comment.