Skip to content

Commit

Permalink
Merge branch 'apache:trunk' into YARN-11011
Browse files Browse the repository at this point in the history
  • Loading branch information
slfan1989 authored Oct 27, 2023
2 parents 0599f31 + e4eda40 commit 14cfab9
Show file tree
Hide file tree
Showing 90 changed files with 2,784 additions and 482 deletions.
62 changes: 31 additions & 31 deletions LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -257,36 +257,36 @@ io.grpc:grpc-netty:1.26.0
io.grpc:grpc-protobuf:1.26.0
io.grpc:grpc-protobuf-lite:1.26.0
io.grpc:grpc-stub:1.26.0
io.netty:netty-all:4.1.94.Final
io.netty:netty-buffer:4.1.94.Final
io.netty:netty-codec:4.1.94.Final
io.netty:netty-codec-dns:4.1.94.Final
io.netty:netty-codec-haproxy:4.1.94.Final
io.netty:netty-codec-http:4.1.94.Final
io.netty:netty-codec-http2:4.1.94.Final
io.netty:netty-codec-memcache:4.1.94.Final
io.netty:netty-codec-mqtt:4.1.94.Final
io.netty:netty-codec-redis:4.1.94.Final
io.netty:netty-codec-smtp:4.1.94.Final
io.netty:netty-codec-socks:4.1.94.Final
io.netty:netty-codec-stomp:4.1.94.Final
io.netty:netty-codec-xml:4.1.94.Final
io.netty:netty-common:4.1.94.Final
io.netty:netty-handler:4.1.94.Final
io.netty:netty-handler-proxy:4.1.94.Final
io.netty:netty-resolver:4.1.94.Final
io.netty:netty-resolver-dns:4.1.94.Final
io.netty:netty-transport:4.1.94.Final
io.netty:netty-transport-rxtx:4.1.94.Final
io.netty:netty-transport-sctp:4.1.94.Final
io.netty:netty-transport-udt:4.1.94.Final
io.netty:netty-transport-classes-epoll:4.1.94.Final
io.netty:netty-transport-native-unix-common:4.1.94.Final
io.netty:netty-transport-classes-kqueue:4.1.94.Final
io.netty:netty-resolver-dns-classes-macos:4.1.94.Final
io.netty:netty-transport-native-epoll:4.1.94.Final
io.netty:netty-transport-native-kqueue:4.1.94.Final
io.netty:netty-resolver-dns-native-macos:4.1.94.Final
io.netty:netty-all:4.1.100.Final
io.netty:netty-buffer:4.1.100.Final
io.netty:netty-codec:4.1.100.Final
io.netty:netty-codec-dns:4.1.100.Final
io.netty:netty-codec-haproxy:4.1.100.Final
io.netty:netty-codec-http:4.1.100.Final
io.netty:netty-codec-http2:4.1.100.Final
io.netty:netty-codec-memcache:4.1.100.Final
io.netty:netty-codec-mqtt:4.1.100.Final
io.netty:netty-codec-redis:4.1.100.Final
io.netty:netty-codec-smtp:4.1.100.Final
io.netty:netty-codec-socks:4.1.100.Final
io.netty:netty-codec-stomp:4.1.100.Final
io.netty:netty-codec-xml:4.1.100.Final
io.netty:netty-common:4.1.100.Final
io.netty:netty-handler:4.1.100.Final
io.netty:netty-handler-proxy:4.1.100.Final
io.netty:netty-resolver:4.1.100.Final
io.netty:netty-resolver-dns:4.1.100.Final
io.netty:netty-transport:4.1.100.Final
io.netty:netty-transport-rxtx:4.1.100.Final
io.netty:netty-transport-sctp:4.1.100.Final
io.netty:netty-transport-udt:4.1.100.Final
io.netty:netty-transport-classes-epoll:4.1.100.Final
io.netty:netty-transport-native-unix-common:4.1.100.Final
io.netty:netty-transport-classes-kqueue:4.1.100.Final
io.netty:netty-resolver-dns-classes-macos:4.1.100.Final
io.netty:netty-transport-native-epoll:4.1.100.Final
io.netty:netty-transport-native-kqueue:4.1.100.Final
io.netty:netty-resolver-dns-native-macos:4.1.100.Final
io.opencensus:opencensus-api:0.12.3
io.opencensus:opencensus-contrib-grpc-metrics:0.12.3
io.reactivex:rxjava:1.3.8
Expand Down Expand Up @@ -337,7 +337,7 @@ org.apache.kerby:kerby-xdr:2.0.3
org.apache.kerby:token-provider:2.0.3
org.apache.solr:solr-solrj:8.11.2
org.apache.yetus:audience-annotations:0.5.0
org.apache.zookeeper:zookeeper:3.6.3
org.apache.zookeeper:zookeeper:3.7.2
org.codehaus.jettison:jettison:1.5.4
org.eclipse.jetty:jetty-annotations:9.4.51.v20230217
org.eclipse.jetty:jetty-http:9.4.51.v20230217
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,10 @@ public class CommonConfigurationKeysPublic {
"ipc.server.log.slow.rpc";
public static final boolean IPC_SERVER_LOG_SLOW_RPC_DEFAULT = false;

public static final String IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_KEY =
"ipc.server.log.slow.rpc.threshold.ms";
public static final long IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_DEFAULT = 0;

public static final String IPC_SERVER_PURGE_INTERVAL_MINUTES_KEY =
"ipc.server.purge.interval";
public static final int IPC_SERVER_PURGE_INTERVAL_MINUTES_DEFAULT = 15;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,13 @@ public final class StoreStatisticNames {
public static final String OBJECT_MULTIPART_UPLOAD_ABORTED =
"object_multipart_aborted";

/**
* Object multipart list request.
* Value :{@value}.
*/
public static final String OBJECT_MULTIPART_UPLOAD_LIST =
"object_multipart_list";

/**
* Object put/multipart upload count.
* Value :{@value}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.util.ZKUtil.ZKAuthInfo;
import org.apache.hadoop.util.StringUtils;
import org.apache.zookeeper.client.ZKClientConfig;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
Expand All @@ -48,6 +50,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.naming.ConfigurationException;

import org.apache.hadoop.security.SecurityUtil.TruststoreKeystore;

/**
*
* This class implements a simple library to perform leader election on top of
Expand Down Expand Up @@ -170,6 +176,7 @@ enum State {
private final int zkSessionTimeout;
private final List<ACL> zkAcl;
private final List<ZKAuthInfo> zkAuthInfo;
private TruststoreKeystore truststoreKeystore;
private byte[] appData;
private final String zkLockFilePath;
private final String zkBreadCrumbPath;
Expand Down Expand Up @@ -209,6 +216,7 @@ enum State {
* @param app
* reference to callback interface object
* @param maxRetryNum maxRetryNum.
* @param truststoreKeystore truststore keystore, that we will use for ZK if SSL/TLS is enabled
* @throws IOException raised on errors performing I/O.
* @throws HadoopIllegalArgumentException
* if valid data is not supplied.
Expand All @@ -218,10 +226,10 @@ enum State {
public ActiveStandbyElector(String zookeeperHostPorts,
int zookeeperSessionTimeout, String parentZnodeName, List<ACL> acl,
List<ZKAuthInfo> authInfo, ActiveStandbyElectorCallback app,
int maxRetryNum) throws IOException, HadoopIllegalArgumentException,
KeeperException {
int maxRetryNum, TruststoreKeystore truststoreKeystore)
throws IOException, HadoopIllegalArgumentException, KeeperException {
this(zookeeperHostPorts, zookeeperSessionTimeout, parentZnodeName, acl,
authInfo, app, maxRetryNum, true);
authInfo, app, maxRetryNum, true, truststoreKeystore);
}

/**
Expand Down Expand Up @@ -254,6 +262,7 @@ public ActiveStandbyElector(String zookeeperHostPorts,
* @param failFast
* whether need to add the retry when establishing ZK connection.
* @param maxRetryNum max Retry Num
* @param truststoreKeystore truststore keystore, that we will use for ZK if SSL/TLS is enabled
* @throws IOException
* raised on errors performing I/O.
* @throws HadoopIllegalArgumentException
Expand All @@ -264,7 +273,7 @@ public ActiveStandbyElector(String zookeeperHostPorts,
public ActiveStandbyElector(String zookeeperHostPorts,
int zookeeperSessionTimeout, String parentZnodeName, List<ACL> acl,
List<ZKAuthInfo> authInfo, ActiveStandbyElectorCallback app,
int maxRetryNum, boolean failFast) throws IOException,
int maxRetryNum, boolean failFast, TruststoreKeystore truststoreKeystore) throws IOException,
HadoopIllegalArgumentException, KeeperException {
if (app == null || acl == null || parentZnodeName == null
|| zookeeperHostPorts == null || zookeeperSessionTimeout <= 0) {
Expand All @@ -279,6 +288,7 @@ public ActiveStandbyElector(String zookeeperHostPorts,
zkLockFilePath = znodeWorkingDir + "/" + LOCK_FILENAME;
zkBreadCrumbPath = znodeWorkingDir + "/" + BREADCRUMB_FILENAME;
this.maxRetryNum = maxRetryNum;
this.truststoreKeystore = truststoreKeystore;

// establish the ZK Connection for future API calls
if (failFast) {
Expand Down Expand Up @@ -740,7 +750,19 @@ protected synchronized ZooKeeper connectToZooKeeper() throws IOException,
* @throws IOException raised on errors performing I/O.
*/
protected ZooKeeper createZooKeeper() throws IOException {
return new ZooKeeper(zkHostPort, zkSessionTimeout, watcher);
ZKClientConfig zkClientConfig = new ZKClientConfig();
if (truststoreKeystore != null) {
try {
SecurityUtil.setSslConfiguration(zkClientConfig, truststoreKeystore);
} catch (ConfigurationException ce) {
throw new IOException(ce);
}
}
return initiateZookeeper(zkClientConfig);
}

protected ZooKeeper initiateZookeeper(ZKClientConfig zkClientConfig) throws IOException {
return new ZooKeeper(zkHostPort, zkSessionTimeout, watcher, zkClientConfig);
}

private void fatalError(String errorMessage) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.security.SecurityUtil.TruststoreKeystore;

@InterfaceAudience.LimitedPrivate("HDFS")
public abstract class ZKFailoverController {

Expand Down Expand Up @@ -147,6 +149,7 @@ protected abstract void checkRpcAdminAccess()
protected abstract InetSocketAddress getRpcAddressToBindTo();
protected abstract PolicyProvider getPolicyProvider();
protected abstract List<HAServiceTarget> getAllOtherNodes();
protected abstract boolean isSSLEnabled();

/**
* Return the name of a znode inside the configured parent znode in which
Expand Down Expand Up @@ -372,9 +375,10 @@ private void initZK() throws HadoopIllegalArgumentException, IOException,
int maxRetryNum = conf.getInt(
CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_KEY,
CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_DEFAULT);
TruststoreKeystore truststoreKeystore = isSSLEnabled() ? new TruststoreKeystore(conf) : null;
elector = new ActiveStandbyElector(zkQuorum,
zkTimeout, getParentZnode(), zkAcls, zkAuths,
new ElectorCallbacks(), maxRetryNum);
new ElectorCallbacks(), maxRetryNum, truststoreKeystore);
}

private String getParentZnode() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.apache.hadoop.ipc;

import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE_DEFAULT;

import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
Expand Down Expand Up @@ -63,7 +66,7 @@ static Class<? extends RpcScheduler> convertSchedulerClass(
}

private volatile boolean clientBackOffEnabled;
private boolean serverFailOverEnabled;
private volatile boolean serverFailOverEnabled;

// Atomic refs point to active callQueue
// We have two so we can better control swapping
Expand All @@ -81,18 +84,15 @@ public CallQueueManager(Class<? extends BlockingQueue<E>> backingClass,
namespace, conf);
int[] capacityWeights = parseCapacityWeights(priorityLevels,
namespace, conf);
this.serverFailOverEnabled = getServerFailOverEnable(namespace, conf);
BlockingQueue<E> bq = createCallQueueInstance(backingClass,
priorityLevels, maxQueueSize, namespace, capacityWeights, conf);
this.clientBackOffEnabled = clientBackOffEnabled;
this.serverFailOverEnabled = conf.getBoolean(
namespace + "." +
CommonConfigurationKeys.IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE,
CommonConfigurationKeys.IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE_DEFAULT);
this.putRef = new AtomicReference<BlockingQueue<E>>(bq);
this.takeRef = new AtomicReference<BlockingQueue<E>>(bq);
LOG.info("Using callQueue: {}, queueCapacity: {}, " +
"scheduler: {}, ipcBackoff: {}.",
backingClass, maxQueueSize, schedulerClass, clientBackOffEnabled);
"scheduler: {}, ipcBackoff: {}, ipcFailOver: {}.",
backingClass, maxQueueSize, schedulerClass, clientBackOffEnabled, serverFailOverEnabled);
}

@VisibleForTesting // only!
Expand All @@ -105,6 +105,41 @@ public CallQueueManager(Class<? extends BlockingQueue<E>> backingClass,
this.serverFailOverEnabled = serverFailOverEnabled;
}

/**
* Return boolean value configured by property 'ipc.<port>.callqueue.overflow.trigger.failover'
* if it is present. If the config is not present, default config
* (without port) is used to derive class i.e 'ipc.callqueue.overflow.trigger.failover',
* and derived value is returned if configured. Otherwise, default value
* {@link CommonConfigurationKeys#IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE_DEFAULT} is returned.
*
* @param namespace Namespace "ipc" + "." + Server's listener port.
* @param conf Configuration properties.
* @return Value returned based on configuration.
*/
private boolean getServerFailOverEnable(String namespace, Configuration conf) {
String propertyKey = namespace + "." +
CommonConfigurationKeys.IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE;

if (conf.get(propertyKey) != null) {
return conf.getBoolean(propertyKey,
CommonConfigurationKeys.IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE_DEFAULT);
}

String[] nsPort = namespace.split("\\.");
if (nsPort.length == 2) {
// Only if ns is split with ".", we can separate namespace and port.
// In the absence of "ipc.<port>.callqueue.overflow.trigger.failover" property,
// we look up "ipc.callqueue.overflow.trigger.failover" property.
return conf.getBoolean(nsPort[0] + "."
+ IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE, IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE_DEFAULT);
}

// Otherwise return default value.
LOG.info("{} not specified set default value is {}",
IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE, IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE_DEFAULT);
return CommonConfigurationKeys.IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE_DEFAULT;
}

private static <T extends RpcScheduler> T createScheduler(
Class<T> theClass, int priorityLevels, String ns, Configuration conf) {
// Used for custom, configurable scheduler
Expand Down Expand Up @@ -155,9 +190,9 @@ private <T extends BlockingQueue<E>> T createCallQueueInstance(
// Used for custom, configurable callqueues
try {
Constructor<T> ctor = theClass.getDeclaredConstructor(int.class,
int.class, String.class, int[].class, Configuration.class);
return ctor.newInstance(priorityLevels, maxLen, ns,
capacityWeights, conf);
int.class, String.class, int[].class, boolean.class, Configuration.class);
return ctor.newInstance(priorityLevels, maxLen, ns, capacityWeights,
this.serverFailOverEnabled, conf);
} catch (RuntimeException e) {
throw e;
} catch (InvocationTargetException e) {
Expand Down Expand Up @@ -199,6 +234,20 @@ boolean isClientBackoffEnabled() {
return clientBackOffEnabled;
}

@VisibleForTesting
public boolean isServerFailOverEnabled() {
return serverFailOverEnabled;
}

@VisibleForTesting
public boolean isServerFailOverEnabledByQueue() {
BlockingQueue<E> bq = putRef.get();
if (bq instanceof FairCallQueue) {
return ((FairCallQueue<E>) bq).isServerFailOverEnabled();
}
return false;
}

// Based on policy to determine back off current call
boolean shouldBackOff(Schedulable e) {
return scheduler.shouldBackOff(e);
Expand Down Expand Up @@ -421,6 +470,9 @@ public synchronized void swapQueue(
RpcScheduler newScheduler = createScheduler(schedulerClass, priorityLevels,
ns, conf);
int[] capacityWeights = parseCapacityWeights(priorityLevels, ns, conf);

// Update serverFailOverEnabled.
this.serverFailOverEnabled = getServerFailOverEnable(ns, conf);
BlockingQueue<E> newQ = createCallQueueInstance(queueClassToUse,
priorityLevels, maxSize, ns, capacityWeights, conf);

Expand Down
Loading

0 comments on commit 14cfab9

Please sign in to comment.