Skip to content

Commit

Permalink
[KYUUBI #6034] Kyuubi Server HA&ZK get server from serverHosts suppor…
Browse files Browse the repository at this point in the history
…t more strategy

# 🔍 Description
## Issue References 🔗

This pull request fixes #6034

## Describe Your Solution 🔧
Currently, use beeline to connect kyuubiServer with HA mode, the strategy only support random, this will lead to a high load on the machine. So i make this pr to support choose strategy.
[description]
First, we need know, beeline connect kyuubiServer dependency on kyuubi-hive-jdbc, it is isolated from the kyuubi cluster, so the code only support random choose serverHost from zk node /${namespace}. Because kyuubi-hive-jdbc is a stateless module, only run once, cannot store var about get serverHost from zk node.
[Solution]
This pr, we could implement a interface named ChooseServerStrategy to choose serverHost. I implement two strategy
1. poll: it will create a zk node named ${namespace}-counter, when a beeline client want connect kyuubiServer, the node will increment 1, use this value to take the remainder from serverHosts, like counter % serverHost.size, so we could get a order serverHost
2. random: random get serverHost from serverHosts
3. User Definied Class: implemented the ChooseServerStrategy, then put the jar to beeline-jars, it can use your strategy to choose serverHost

## Types of changes 🔖

- [ ] Bugfix (non-breaking change which fixes an issue)
- [x] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪
Test the Strategy in my test Cluster
#### Behavior Without This Pull Request ⚰️
![image](https://github.com/apache/kyuubi/assets/51512358/d65b14c1-1b02-4436-8843-27b2e55d27ce)
![image](https://github.com/apache/kyuubi/assets/51512358/0524a30c-c2c3-464e-8453-84f3f1a74fb1)
![image](https://github.com/apache/kyuubi/assets/51512358/12feb93e-b743-4a43-821d-454f3c1af336)

#### Behavior With This Pull Request 🎉

[Use Case]
1. poll: `bin/beeline -u 'jdbc:hive2://xxx:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=kyuubi;zooKeeperStrategy=poll?spark.yarn.queue=root.kylin;spark.app.name=testspark;spark.shuffle.useOldFetchProtocol=true' -n mfw_hadoop --verbose=true --showNestedErrs=true`
2. random: `bin/beeline -u 'jdbc:hive2://xxx:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=kyuubi;zooKeeperStrategy=random?spark.yarn.queue=root.kylin;spark.app.name=testspark;spark.shuffle.useOldFetchProtocol=true' -n mfw_hadoop --verbose=true --showNestedErrs=true` or `bin/beeline -u 'jdbc:hive2://xxx:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=kyuubi?spark.yarn.queue=root.kylin;spark.app.name=testspark;spark.shuffle.useOldFetchProtocol=true' -n mfw_hadoop --verbose=true --showNestedErrs=true`
3. YourStrategy: `bin/beeline -u 'jdbc:hive2://xxx:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=kyuubi;zooKeeperStrategy=xxx.xxx.xxx.XxxChooseServerStrategy?spark.yarn.queue=root.kylin;spark.app.name=testspark;spark.shuffle.useOldFetchProtocol=true' -n mfw_hadoop --verbose=true --showNestedErrs=true`

[Result: The Cluster have two Server (221,233)]
1. poll:
1.1. zkNode: counterValue
![image](https://github.com/apache/kyuubi/assets/51512358/5cbd15f9-bba4-4b23-bbfb-d61ed46f931f)

1.2. result:
![image](https://github.com/apache/kyuubi/assets/51512358/5a867167-8b06-49ed-aa44-b70726f3ae97)
![image](https://github.com/apache/kyuubi/assets/51512358/404b05e8-c828-458c-a9c4-97a323bf6ce7)
![image](https://github.com/apache/kyuubi/assets/51512358/3182e92b-6976-4931-a899-5e0d89cd2ac2)
![image](https://github.com/apache/kyuubi/assets/51512358/a55450ff-49cf-4b4a-9b90-91dd02982aa5)

2. random:
![image](https://github.com/apache/kyuubi/assets/51512358/d65b14c1-1b02-4436-8843-27b2e55d27ce)
![image](https://github.com/apache/kyuubi/assets/51512358/0524a30c-c2c3-464e-8453-84f3f1a74fb1)
![image](https://github.com/apache/kyuubi/assets/51512358/12feb93e-b743-4a43-821d-454f3c1af336)

3. YourStrategy(the test case only get the first serverHost):
![image](https://github.com/apache/kyuubi/assets/51512358/2e6395c2-6496-4516-9cf6-90abc921de7f)
![image](https://github.com/apache/kyuubi/assets/51512358/72975513-48d2-4f41-8a95-95cde0302c5b)
![image](https://github.com/apache/kyuubi/assets/51512358/487951fd-de45-4e1c-861a-94e0e5564e37)

#### Related Unit Tests

There is no Unit Tests.
---

# Checklist 📝

- [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #6213 from davidyuan1223/ha_zk_support_more_strategy.

Closes #6034

961d3e9 [Bowen Liang] rename ServerStrategyFactory to ServerSelectStrategyFactory
353f940 [Bowen Liang] repeat
8822ad4 [Bowen Liang] repeat
6193394 [Bowen Liang] nit
e94f9e9 [Bowen Liang] nit
40f427a [Bowen Liang] rename StrategyFactory to StrategyFactoryServerStrategyFactory
7668f99 [Bowen Liang] test name
e194ea6 [Bowen Liang] remove ZooKeeperHiveClientException from method signature of chooseServer
265965e [Bowen Liang] polling
b39c567 [Bowen Liang] style
1ab79b4 [Bowen Liang] strategyName
8f8ca28 [Bowen Liang] nit
228bf10 [Bowen Liang] rename parameter zooKeeperStrategy to serverSelectStrategy
125c823 [Bowen Liang] rename ChooseServerStrategy to ServerSelectStrategy
b4aeb3d [Bowen Liang] repeat testing on pollingChooseStrategy
4655480 [davidyuan] update
09a84f1 [david yuan] remove the distirbuted lock
93f4a26 [davidyuan] remove reset
7b0c1b8 [davidyuan] fix var not valid and counter getAndIncrement
c95382a [davidyuan] fix var not valid and counter getAndIncrement
9ed2cac [david yuan] remove test comment
8eddd76 [davidyuan] Add Strategy Unit Test Case and fix the polling strategy counter begin with 0
73952f8 [davidyuan] Kyuubi Server HA&ZK get server from serverHosts support more strategy
97b9597 [davidyuan] Kyuubi Server HA&ZK get server from serverHosts support more strategy
ee5a9ad [davidyuan] Kyuubi Server HA&ZK get server from serverHosts support more strategy
6a04453 [davidyuan] Kyuubi Server HA&ZK get server from serverHosts support more strategy
1892f14 [davidyuan] add common method to get session level config
7c0c605 [yuanfuyuan] fix_4186

Lead-authored-by: Bowen Liang <[email protected]>
Co-authored-by: davidyuan <[email protected]>
Co-authored-by: davidyuan <[email protected]>
Co-authored-by: david yuan <[email protected]>
Co-authored-by: yuanfuyuan <[email protected]>
Signed-off-by: Bowen Liang <[email protected]>
(cherry picked from commit 8862767)
Signed-off-by: Bowen Liang <[email protected]>
  • Loading branch information
4 people committed Oct 23, 2024
1 parent 4c268f4 commit 675bc55
Show file tree
Hide file tree
Showing 8 changed files with 225 additions and 3 deletions.
7 changes: 7 additions & 0 deletions kyuubi-ha/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,13 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-hive-jdbc</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ import org.apache.kyuubi.ha.HighAvailabilityConf._
import org.apache.kyuubi.ha.client._
import org.apache.kyuubi.ha.client.DiscoveryClientProvider.withDiscoveryClient
import org.apache.kyuubi.ha.client.zookeeper.ZookeeperClientProvider._
import org.apache.kyuubi.jdbc.hive.strategy.{ServerSelectStrategy, ServerSelectStrategyFactory}
import org.apache.kyuubi.service._
import org.apache.kyuubi.shaded.curator.framework.CuratorFrameworkFactory
import org.apache.kyuubi.shaded.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
import org.apache.kyuubi.shaded.curator.retry.ExponentialBackoffRetry
import org.apache.kyuubi.shaded.zookeeper.ZooDefs
import org.apache.kyuubi.shaded.zookeeper.data.ACL
Expand Down Expand Up @@ -227,4 +228,41 @@ abstract class ZookeeperDiscoveryClientSuite extends DiscoveryClientTests
discovery.stop()
}
}

test("server select strategy with zookeeper") {
val zkClient = CuratorFrameworkFactory.builder()
.connectString(getConnectString)
.sessionTimeoutMs(5000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build
zkClient.start()

val namespace = "kyuubi-strategy-test"
val testServerHosts = Seq(
"testNode1",
"testNode2",
"testNode3").asJava
// test polling strategy
val pollingStrategy = ServerSelectStrategyFactory.createStrategy("polling")
1 to testServerHosts.size() * 2 foreach { _ =>
assertResult(f"testNode1")(pollingStrategy.chooseServer(testServerHosts, zkClient, namespace))
assertResult(f"testNode2")(pollingStrategy.chooseServer(testServerHosts, zkClient, namespace))
assertResult(f"testNode3")(pollingStrategy.chooseServer(testServerHosts, zkClient, namespace))
}

// test only get first serverHost strategy
val customStrategy = new ServerSelectStrategy {
override def chooseServer(
serverHosts: util.List[String],
zkClient: CuratorFramework,
namespace: String): String = serverHosts.get(0)
}
1 to testServerHosts.size() * 2 foreach { _ =>
assertResult("testNode1") {
customStrategy.chooseServer(testServerHosts, zkClient, namespace)
}
}

zkClient.close()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public class JdbcConnectionParams {
// Use ZooKeeper for indirection while using dynamic service discovery
static final String SERVICE_DISCOVERY_MODE_ZOOKEEPER = "zooKeeper";
static final String ZOOKEEPER_NAMESPACE = "zooKeeperNamespace";
static final String SERVER_SELECT_STRATEGY = "serverSelectStrategy";
// Default namespace value on ZooKeeper.
// This value is used if the param "zooKeeperNamespace" is not specified in the JDBC Uri.
static final String ZOOKEEPER_DEFAULT_NAMESPACE = "hiveserver2";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.kyuubi.jdbc.hive.strategy.ServerSelectStrategy;
import org.apache.kyuubi.jdbc.hive.strategy.ServerSelectStrategyFactory;
import org.apache.kyuubi.jdbc.hive.strategy.zk.RandomSelectStrategy;
import org.apache.kyuubi.shaded.curator.framework.CuratorFramework;
import org.apache.kyuubi.shaded.curator.framework.CuratorFrameworkFactory;
import org.apache.kyuubi.shaded.curator.retry.ExponentialBackoffRetry;
Expand Down Expand Up @@ -111,7 +113,7 @@ static void configureConnParams(JdbcConnectionParams connParams)
try (CuratorFramework zooKeeperClient = getZkClient(connParams)) {
List<String> serverHosts = getServerHosts(connParams, zooKeeperClient);
// Now pick a server node randomly
String serverNode = serverHosts.get(ThreadLocalRandom.current().nextInt(serverHosts.size()));
String serverNode = chooseServer(connParams, serverHosts, zooKeeperClient);
updateParamsWithZKServerNode(connParams, zooKeeperClient, serverNode);
} catch (Exception e) {
throw new ZooKeeperHiveClientException(
Expand All @@ -120,6 +122,22 @@ static void configureConnParams(JdbcConnectionParams connParams)
// Close the client connection with ZooKeeper
}

private static String chooseServer(
JdbcConnectionParams connParams, List<String> serverHosts, CuratorFramework zkClient) {
String zooKeeperNamespace = getZooKeeperNamespace(connParams);
String strategyName =
connParams
.getSessionVars()
.getOrDefault(
JdbcConnectionParams.SERVER_SELECT_STRATEGY, RandomSelectStrategy.strategyName);
try {
ServerSelectStrategy strategy = ServerSelectStrategyFactory.createStrategy(strategyName);
return strategy.chooseServer(serverHosts, zkClient, zooKeeperNamespace);
} catch (Exception e) {
throw new RuntimeException("Failed to choose server with strategy " + strategyName, e);
}
}

static List<JdbcConnectionParams> getDirectParamsList(JdbcConnectionParams connParams)
throws ZooKeeperHiveClientException {
try (CuratorFramework zooKeeperClient = getZkClient(connParams)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.jdbc.hive.strategy;

import java.util.List;
import org.apache.kyuubi.shaded.curator.framework.CuratorFramework;

public interface ServerSelectStrategy {
String chooseServer(List<String> serverHosts, CuratorFramework zkClient, String namespace);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.jdbc.hive.strategy;

import java.lang.reflect.Constructor;
import org.apache.kyuubi.jdbc.hive.strategy.zk.PollingSelectStrategy;
import org.apache.kyuubi.jdbc.hive.strategy.zk.RandomSelectStrategy;

public class ServerSelectStrategyFactory {
public static ServerSelectStrategy createStrategy(String strategyName) {
try {
switch (strategyName) {
case PollingSelectStrategy.strategyName:
return new PollingSelectStrategy();
case RandomSelectStrategy.strategyName:
return new RandomSelectStrategy();
default:
Class<?> clazz = Class.forName(strategyName);
if (ServerSelectStrategy.class.isAssignableFrom(clazz)) {
Constructor<? extends ServerSelectStrategy> constructor =
clazz.asSubclass(ServerSelectStrategy.class).getConstructor();
return constructor.newInstance();
} else {
throw new ClassNotFoundException(
"The loaded class does not implement ServerSelectStrategy");
}
}
} catch (Exception e) {
throw new RuntimeException("Failed to init server select strategy", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.jdbc.hive.strategy.zk;

import java.util.List;
import org.apache.kyuubi.jdbc.hive.strategy.ServerSelectStrategy;
import org.apache.kyuubi.shaded.curator.framework.CuratorFramework;
import org.apache.kyuubi.shaded.curator.framework.recipes.atomic.AtomicValue;
import org.apache.kyuubi.shaded.curator.framework.recipes.atomic.DistributedAtomicInteger;
import org.apache.kyuubi.shaded.curator.retry.RetryForever;

public class PollingSelectStrategy implements ServerSelectStrategy {
public static final String strategyName = "polling";

private static final String COUNTER_PATH_PREFIX = "/";
private static final String COUNTER_PATH_SUFFIX = "-counter";

@Override
public String chooseServer(
List<String> serverHosts, CuratorFramework zkClient, String namespace) {
String counterPath = COUNTER_PATH_PREFIX + namespace + COUNTER_PATH_SUFFIX;
try {
return serverHosts.get(getAndIncrement(zkClient, counterPath) % serverHosts.size());
} catch (Exception e) {
throw new RuntimeException("Failed to choose server by polling select strategy", e);
}
}

private int getAndIncrement(CuratorFramework zkClient, String path) throws Exception {
DistributedAtomicInteger dai =
new DistributedAtomicInteger(zkClient, path, new RetryForever(3000));
AtomicValue<Integer> atomicVal;
do {
atomicVal = dai.add(1);
} while (atomicVal == null || !atomicVal.succeeded());
return atomicVal.preValue();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.jdbc.hive.strategy.zk;

import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.kyuubi.jdbc.hive.strategy.ServerSelectStrategy;
import org.apache.kyuubi.shaded.curator.framework.CuratorFramework;

public class RandomSelectStrategy implements ServerSelectStrategy {
public static final String strategyName = "random";

@Override
public String chooseServer(
List<String> serverHosts, CuratorFramework zkClient, String namespace) {
return serverHosts.get(ThreadLocalRandom.current().nextInt(serverHosts.size()));
}
}

0 comments on commit 675bc55

Please sign in to comment.