Skip to content

Commit

Permalink
HADOOP-19156. ZooKeeper based state stores use different ZK address c…
Browse files Browse the repository at this point in the history
…onfigs. (#6767). Contributed by liu bin.

Signed-off-by: Ayush Saxena <[email protected]>
Signed-off-by: He Xiaoqiao <[email protected]>
  • Loading branch information
liubin101 authored May 29, 2024
1 parent f4fde40 commit 6c08e8e
Show file tree
Hide file tree
Showing 23 changed files with 86 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public static List<ZKUtil.ZKAuthInfo> getZKAuths(Configuration conf)
* Start the connection to the ZooKeeper ensemble.
* @throws IOException If the connection cannot be started.
*/
public void start() throws IOException{
public void start() throws IOException {
this.start(new ArrayList<>());
}

Expand All @@ -139,25 +139,47 @@ public void start(List<AuthInfo> authInfos) throws IOException {
this.start(authInfos, false);
}

/**
* Start the connection to the ZooKeeper ensemble.
* @param zkHostPort Host:Port of the ZooKeeper.
* @throws IOException If the connection cannot be started.
*/
public void start(String zkHostPort) throws IOException {
this.start(new ArrayList<>(), false, zkHostPort);
}

/**
* Start the connection to the ZooKeeper ensemble.
* @param authInfos List of authentication keys.
* @param sslEnabled If the connection should be SSL/TLS encrypted.
* @throws IOException If the connection cannot be started.
*/
public void start(List<AuthInfo> authInfos, boolean sslEnabled) throws IOException {
this.start(authInfos, sslEnabled, null);
}

/**
* Start the connection to the ZooKeeper ensemble.
*
* @param authInfos List of authentication keys.
* @param sslEnabled If the connection should be SSL/TLS encrypted.
* @param zkHostPort Host:Port of the ZooKeeper.
* @throws IOException If the connection cannot be started.
*/
public void start(List<AuthInfo> authInfos, boolean sslEnabled)
throws IOException{
public void start(List<AuthInfo> authInfos, boolean sslEnabled, String zkHostPort)
throws IOException {

ZKClientConfig zkClientConfig = new ZKClientConfig();

// Connect to the ZooKeeper ensemble
String zkHostPort = conf.get(CommonConfigurationKeys.ZK_ADDRESS);
if (zkHostPort == null) {
throw new IOException(
CommonConfigurationKeys.ZK_ADDRESS + " is not configured.");
zkHostPort = conf.get(CommonConfigurationKeys.ZK_ADDRESS);
if (zkHostPort == null) {
throw new IOException(
CommonConfigurationKeys.ZK_ADDRESS + " is not configured.");
}
LOG.debug("Configured {} as {}", CommonConfigurationKeys.ZK_ADDRESS, zkHostPort);
}
LOG.debug("Configured {} as {}", CommonConfigurationKeys.ZK_ADDRESS, zkHostPort);

int numRetries = conf.getInt(CommonConfigurationKeys.ZK_NUM_RETRIES,
CommonConfigurationKeys.ZK_NUM_RETRIES_DEFAULT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ public void setup() throws Exception {
DELETE_DATA_DIRECTORY_ON_CLOSE, SERVER_ID, TICK_TIME, MAX_CLIENT_CNXNS,
customConfiguration);
this.server = new TestingServer(spec, true);
this.hadoopConf.set(CommonConfigurationKeys.ZK_ADDRESS, this.server.getConnectString());
String zkHostPort = this.server.getConnectString();
this.curator = new ZKCuratorManager(this.hadoopConf);
this.curator.start(new ArrayList<>(), true);
this.curator.start(new ArrayList<>(), true, zkHostPort);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,10 @@ public void setup() throws Exception {
this.server = new TestingServer();

Configuration conf = new Configuration();
conf.set(
CommonConfigurationKeys.ZK_ADDRESS, this.server.getConnectString());
String zkHostPort = this.server.getConnectString();

this.curator = new ZKCuratorManager(conf);
this.curator.start();
this.curator.start(zkHostPort);
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,8 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
FEDERATION_STORE_ZK_DRIVER_PREFIX + "async.max.threads";
public static final int FEDERATION_STORE_ZK_ASYNC_MAX_THREADS_DEFAULT =
-1;
public static final String FEDERATION_STORE_ZK_ADDRESS =
FEDERATION_STORE_ZK_DRIVER_PREFIX + "address";

// HDFS Router-based federation File based store implementation specific configs
public static final String FEDERATION_STORE_FILE_ASYNC_THREADS =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,10 @@ public boolean initDriver() {
} else {
LOG.info("Init StateStoreZookeeperImpl by sync mode.");
}
String zkHostPort = conf.get(RBFConfigKeys.FEDERATION_STORE_ZK_ADDRESS);
try {
this.zkManager = new ZKCuratorManager(conf);
this.zkManager.start();
this.zkManager.start(zkHostPort);
this.zkAcl = ZKCuratorManager.getZKAcls(conf);
} catch (IOException e) {
LOG.error("Cannot initialize the ZK connection", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,13 @@
</description>
</property>

<property>
<name>dfs.federation.router.store.driver.zk.address</name>
<description>
Host:Port of the ZooKeeper for StateStoreZooKeeperImpl.
</description>
</property>

<property>
<name>dfs.federation.router.store.driver.zk.parent-path</name>
<value>/hdfs-federation</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.test.TestingServer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.store.RouterStore;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
Expand Down Expand Up @@ -74,7 +73,7 @@ public void setup() throws Exception {
.retryPolicy(new RetryNTimes(100, 100))
.build();
curatorFramework.start();
routerConfig.set(CommonConfigurationKeys.ZK_ADDRESS, connectStr);
routerConfig.set(RBFConfigKeys.FEDERATION_STORE_ZK_ADDRESS, connectStr);
router.init(routerConfig);
router.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

import org.apache.curator.test.TestingServer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hdfs.server.federation.FederationTestUtils;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
Expand Down Expand Up @@ -81,7 +80,7 @@ public static void setUp() throws Exception {
conf.setClass(RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS_DEFAULT,
FileSubclusterResolver.class);
conf.set(CommonConfigurationKeys.ZK_ADDRESS, connectString);
conf.set(RBFConfigKeys.FEDERATION_STORE_ZK_ADDRESS, connectString);
conf.setBoolean(RBFConfigKeys.DFS_ROUTER_STORE_ENABLE, true);
cluster.addRouterOverrides(conf);
cluster.startCluster();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

import org.apache.curator.test.TestingServer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hdfs.server.federation.FederationTestUtils;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
Expand Down Expand Up @@ -86,7 +85,7 @@ public static void setUp() throws Exception {
conf.setClass(RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS_DEFAULT,
FileSubclusterResolver.class);
conf.set(CommonConfigurationKeys.ZK_ADDRESS, connectString);
conf.set(RBFConfigKeys.FEDERATION_STORE_ZK_ADDRESS, connectString);
conf.setBoolean(RBFConfigKeys.DFS_ROUTER_STORE_ENABLE, true);
cluster = new MiniRouterDFSCluster(false, numNameservices, conf);
cluster.addRouterOverrides(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.test.TestingServer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreZooKeeperImpl;
Expand Down Expand Up @@ -71,7 +70,7 @@ public static void setupCluster() throws Exception {
// Create the ZK State Store
Configuration conf =
getStateStoreConfiguration(StateStoreZooKeeperImpl.class);
conf.set(CommonConfigurationKeys.ZK_ADDRESS, connectString);
conf.set(RBFConfigKeys.FEDERATION_STORE_ZK_ADDRESS, connectString);
// Disable auto-repair of connection
conf.setLong(RBFConfigKeys.FEDERATION_STORE_CONNECTION_TEST_MS,
TimeUnit.HOURS.toMillis(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,6 @@ private static void addDeprecatedKeys() {
SYSTEM_METRICS_PUBLISHER_ENABLED),
new DeprecationDelta(RM_ZK_ACL, CommonConfigurationKeys.ZK_ACL),
new DeprecationDelta(RM_ZK_AUTH, CommonConfigurationKeys.ZK_AUTH),
new DeprecationDelta(RM_ZK_ADDRESS,
CommonConfigurationKeys.ZK_ADDRESS),
new DeprecationDelta(RM_ZK_NUM_RETRIES,
CommonConfigurationKeys.ZK_NUM_RETRIES),
new DeprecationDelta(RM_ZK_TIMEOUT_MS,
Expand Down Expand Up @@ -4038,6 +4036,9 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String DEFAULT_FEDERATION_STATESTORE_CLIENT_CLASS =
"org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore";

public static final String FEDERATION_STATESTORE_ZK_ADDRESS =
FEDERATION_PREFIX + "state-store.zk.address";

public static final String FEDERATION_CACHE_TIME_TO_LIVE_SECS =
FEDERATION_PREFIX + "cache-ttl.secs";
// 5 minutes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ public void initializeMemberVariables() {
.add(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED);

// skip deprecated ZooKeeper settings
configurationPropsToSkipCompare.add(YarnConfiguration.RM_ZK_ADDRESS);
configurationPropsToSkipCompare.add(YarnConfiguration.RM_ZK_NUM_RETRIES);
configurationPropsToSkipCompare.add(YarnConfiguration.RM_ZK_TIMEOUT_MS);
configurationPropsToSkipCompare.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,15 @@
<value>${yarn.resourcemanager.max-completed-applications}</value>
</property>

<property>
<description>Host:Port of the ZooKeeper server to be used by the RM. This
must be supplied when using the ZooKeeper based implementation of the
RM state store and/or embedded automatic failover in an HA setting.
</description>
<name>yarn.resourcemanager.zk-address</name>
<!--value>127.0.0.1:2181</value-->
</property>

<property>
<description>Full path of the ZooKeeper znode where RM state will be
stored. This must be supplied when using
Expand Down Expand Up @@ -3798,6 +3807,13 @@
<value>org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore</value>
</property>

<property>
<description>
Host:Port of the ZooKeeper server to be used by the federation state store
</description>
<name>yarn.federation.state-store.zk.address</name>
</property>

<property>
<description>
The time in seconds after which the federation state store local cache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,10 @@ public void init(Configuration conf) throws YarnException {
baseZNode = conf.get(
YarnConfiguration.FEDERATION_STATESTORE_ZK_PARENT_PATH,
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_ZK_PARENT_PATH);
String zkHostPort = conf.get(YarnConfiguration.FEDERATION_STATESTORE_ZK_ADDRESS);
try {
this.zkManager = new ZKCuratorManager(conf);
this.zkManager.start();
this.zkManager.start(zkHostPort);
} catch (IOException e) {
LOG.error("Cannot initialize the ZK connection", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.test.TestingServer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.metrics2.MetricsRecord;
import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
import org.apache.hadoop.metrics2.impl.MetricsRecords;
Expand Down Expand Up @@ -94,7 +93,7 @@ public void before() throws IOException, YarnException {
curatorFramework.start();

Configuration conf = new YarnConfiguration();
conf.set(CommonConfigurationKeys.ZK_ADDRESS, connectString);
conf.set(YarnConfiguration.FEDERATION_STATESTORE_ZK_ADDRESS, connectString);
conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS, 10);
setConf(conf);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ protected EmbeddedElector createEmbeddedElector() throws IOException {
*/
public ZKCuratorManager createAndStartZKManager(Configuration
config) throws IOException {
String zkHostPort = config.get(YarnConfiguration.RM_ZK_ADDRESS);
ZKCuratorManager manager = new ZKCuratorManager(config);

// Get authentication
Expand All @@ -432,7 +433,7 @@ public ZKCuratorManager createAndStartZKManager(Configuration
config.getBoolean(CommonConfigurationKeys.ZK_CLIENT_SSL_ENABLED,
config.getBoolean(YarnConfiguration.RM_ZK_CLIENT_SSL_ENABLED,
YarnConfiguration.DEFAULT_RM_ZK_CLIENT_SSL_ENABLED));
manager.start(authInfos, isSSLEnabled);
manager.start(authInfos, isSSLEnabled, zkHostPort);
return manager;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void setup() throws Exception {
configuration.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
configuration.set(YarnConfiguration.RM_STORE,
ZKRMStateStore.class.getName());
configuration.set(CommonConfigurationKeys.ZK_ADDRESS, hostPort);
configuration.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
configuration.setInt(CommonConfigurationKeys.ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
configuration.set(YarnConfiguration.RM_CLUSTER_ID, "test-yarn-cluster");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ private String getDelegationTokenNode(int rmDTSequenceNumber, int splitIdx) {

private RMStateStore createStore(Configuration conf) throws Exception {
workingZnode = "/jira/issue/3077/rmstore";
conf.set(CommonConfigurationKeys.ZK_ADDRESS,
conf.set(YarnConfiguration.RM_ZK_ADDRESS,
curatorTestingServer.getConnectString());
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
conf.setLong(YarnConfiguration.RM_EPOCH, epoch);
Expand Down Expand Up @@ -347,7 +347,7 @@ public Version getCurrentVersion() throws Exception {
public RMStateStore getRMStateStore() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
workingZnode = "/jira/issue/3077/rmstore";
conf.set(CommonConfigurationKeys.ZK_ADDRESS,
conf.set(YarnConfiguration.RM_ZK_ADDRESS,
curatorTestingServer.getConnectString());
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
this.store = new TestZKRMStateStoreInternal(conf, workingZnode) {
Expand Down Expand Up @@ -388,7 +388,7 @@ public static Configuration createHARMConf(String rmIds, String rmId,
conf.set(YarnConfiguration.RM_HA_IDS, rmIds);
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
conf.set(YarnConfiguration.RM_STORE, ZKRMStateStore.class.getName());
conf.set(CommonConfigurationKeys.ZK_ADDRESS,
conf.set(YarnConfiguration.RM_ZK_ADDRESS,
curatorTestServer.getConnectString());
conf.setInt(CommonConfigurationKeys.ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
conf.set(YarnConfiguration.RM_HA_ID, rmId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public TestZKRMStateStore(Configuration conf, String workingZnode)

public RMStateStore getRMStateStore(Configuration conf) throws Exception {
String workingZnode = "/Test";
conf.set(CommonConfigurationKeys.ZK_ADDRESS,
conf.set(YarnConfiguration.RM_ZK_ADDRESS,
testingServer.getConnectString());
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
this.store = new TestZKRMStateStore(conf, workingZnode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.test.TestingServer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service;
Expand Down Expand Up @@ -100,7 +99,7 @@ public void setUp() throws Exception {
curatorTestingServer = setupCuratorServer();
curatorFramework = setupCuratorFramework(curatorTestingServer);

conf.set(CommonConfigurationKeys.ZK_ADDRESS,
conf.set(YarnConfiguration.RM_ZK_ADDRESS,
curatorTestingServer.getConnectString());
rm = new MockRM(conf);
rm.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ ZooKeeper: one must set the ZooKeeper settings for Hadoop:
| Property | Example | Description |
|:------------------------------------|:------------------------------------------------------------------------------------|:----------------------------------------|
| `yarn.federation.state-store.class` | `org.apache.hadoop.yarn.server.federation.store.impl.ZookeeperFederationStateStore` | The type of state-store to use. |
| `hadoop.zk.address` | `host:port` | The address for the ZooKeeper ensemble. |
| `yarn.federation.state-store.zk.address` | `host:port` | The address for the ZooKeeper ensemble. |

SQL: one must setup the following parameters:

Expand Down Expand Up @@ -1006,7 +1006,7 @@ Example of Machine-Role Mapping(Exclude HDFS):

<!-- ZK Address. -->
<property>
<name>hadoop.zk.address</name>
<name>yarn.federation.state-store.zk.address</name>
<value>zkHost:zkPort</value>
</property>

Expand Down Expand Up @@ -1067,7 +1067,7 @@ $HADOOP_HOME/bin/yarn --daemon start resourcemanager

<!-- ZK Address. -->
<property>
<name>hadoop.zk.address</name>
<name>yarn.federation.state-store.zk.address</name>
<value>zkHost:zkPort</value>
</property>

Expand Down Expand Up @@ -1135,7 +1135,7 @@ After we have finished configuring the `YARN-2` cluster, we can proceed with sta

<!-- ZK Address. -->
<property>
<name>hadoop.zk.address</name>
<name>yarn.federation.state-store.zk.address</name>
<value>zkHost:zkPort</value>
</property>

Expand Down
Loading

0 comments on commit 6c08e8e

Please sign in to comment.