Skip to content

Commit

Permalink
[hive] bugfix jdbc paimon hive client pool (apache#3004)
Browse files Browse the repository at this point in the history
  • Loading branch information
坤泰 committed Aug 20, 2024
1 parent 154126a commit fcbee80
Show file tree
Hide file tree
Showing 3 changed files with 573 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand All @@ -64,6 +65,7 @@
public class CachedClientPool implements ClientPool<IMetaStoreClient, TException> {

private static final String CONF_ELEMENT_PREFIX = "conf:";
private static final String CONF_OPTIONS_ALL = "*";

private static Cache<Key, HiveClientPool> clientPoolCache;

Expand All @@ -77,15 +79,35 @@ public CachedClientPool(Configuration conf, Options options, String clientClassN
this.conf = conf;
this.clientPoolSize = options.get(CLIENT_POOL_SIZE);
this.evictionInterval = options.get(CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS);
this.key = extractKey(options.get(CLIENT_POOL_CACHE_KEYS), conf);
this.key = extractKey(options.get(CLIENT_POOL_CACHE_KEYS), conf, options);
this.clientClassName = clientClassName;
init();
}

@VisibleForTesting
HiveClientPool clientPool() {
return clientPoolCache.get(
key, k -> new HiveClientPool(clientPoolSize, conf, clientClassName));
key,
k ->
new HiveClientPool(
clientPoolSize, conf, clientClassName, this.getCurrentUser(k)));
}

private UserGroupInformation getCurrentUser(Key key) {
try {
for (Object elem : key.elements) {
if (elem instanceof UserGroupInformationConf) {
return ((UserGroupInformationConf) elem).ugi;
}
if (elem instanceof UserNameConf) {
return UserGroupInformation.createProxyUser(
((UserNameConf) elem).userName, UserGroupInformation.getCurrentUser());
}
}
return UserGroupInformation.getCurrentUser();
} catch (Exception e) {
return null;
}
}

private synchronized void init() {
Expand Down Expand Up @@ -142,7 +164,7 @@ public void execute(ExecuteAction<IMetaStoreClient, TException> action, boolean
}

@VisibleForTesting
static Key extractKey(String cacheKeys, Configuration conf) {
static Key extractKey(String cacheKeys, Configuration conf, Options options) {
// generate key elements in a certain order, so that the Key instances are comparable
List<Object> elements = Lists.newArrayList();
elements.add(conf.get(HiveConf.ConfVars.METASTOREURIS.varname, ""));
Expand All @@ -157,12 +179,15 @@ static Key extractKey(String cacheKeys, Configuration conf) {
String trimmed = element.trim();
if (trimmed.toLowerCase(Locale.ROOT).startsWith(CONF_ELEMENT_PREFIX)) {
String key = trimmed.substring(CONF_ELEMENT_PREFIX.length());

Preconditions.checkArgument(
!confElements.containsKey(key),
"Conf key element %s already specified",
key);
confElements.put(key, conf.get(key));
String value = CONF_OPTIONS_ALL;
if (!CONF_OPTIONS_ALL.equals(key)) {
Preconditions.checkArgument(
!confElements.containsKey(key),
"Conf key element %s already specified",
key);
value = confElements.get(key);
}
confElements.put(key, value);
} else {
KeyElementType type = KeyElementType.valueOf(trimmed.toUpperCase());
switch (type) {
Expand All @@ -183,7 +208,8 @@ static Key extractKey(String cacheKeys, Configuration conf) {
switch (type) {
case UGI:
try {
elements.add(UserGroupInformation.getCurrentUser());
elements.add(
UserGroupInformationConf.of(UserGroupInformation.getCurrentUser()));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
Expand All @@ -200,7 +226,11 @@ static Key extractKey(String cacheKeys, Configuration conf) {
}
}
for (String key : confElements.keySet()) {
elements.add(ConfElement.of(key, confElements.get(key)));
if (CONF_OPTIONS_ALL.equals(confElements.get(key))) {
elements.add(options);
} else {
elements.add(ConfElement.of(key, confElements.get(key)));
}
}
return Key.of(elements);
}
Expand All @@ -219,6 +249,38 @@ public List<Object> elements() {
public static Key of(List<Object> elements) {
return new Key(elements);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

final Key that = (Key) o;
if (this.elements.size() != that.elements.size()) {
return false;
}
for (int i = 0; i < elements.size(); i++) {
if (!Objects.equals(this.elements.get(i), that.elements.get(i))) {
return false;
}
}
return true;
}

@Override
public int hashCode() {
int hashCode = 0;
synchronized (elements) {
for (Object p : elements) {
hashCode ^= p.hashCode();
}
}
return hashCode;
}
}

static class ConfElement {
Expand All @@ -239,11 +301,87 @@ public String value() {
return value;
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
ConfElement other = (ConfElement) obj;
return Objects.equals(key, other.key) && Objects.equals(value, other.value);
}

@Override
public int hashCode() {
return Objects.hash(key, value);
}

public static ConfElement of(String key, String value) {
return new ConfElement(key, value);
}
}

static class UserGroupInformationConf {

private final UserGroupInformation ugi;

private UserGroupInformationConf(UserGroupInformation ugi) {
this.ugi = ugi;
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
UserGroupInformationConf other = (UserGroupInformationConf) obj;
return Objects.equals(other.ugi.getUserName(), ugi.getUserName());
}

@Override
public int hashCode() {
return Objects.hash(ugi.getUserName());
}

public static UserGroupInformationConf of(UserGroupInformation ugi) {
return new UserGroupInformationConf(ugi);
}
}

static class UserNameConf {
private final String userName;

private UserNameConf(String userName) {
this.userName = userName;
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
UserNameConf other = (UserNameConf) obj;
return Objects.equals(other.userName, userName);
}

@Override
public int hashCode() {
return Objects.hash(userName);
}

public static UserNameConf of(String userName) {
return new UserNameConf(userName);
}
}

private enum KeyElementType {
UGI,
USER_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,14 @@
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;

import java.io.IOException;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;

/**
* Pool of Hive Metastore clients.
*
Expand All @@ -38,26 +43,49 @@ public class HiveClientPool extends ClientPool.ClientPoolImpl<IMetaStoreClient,

private final HiveConf hiveConf;
private final String clientClassName;
private final UserGroupInformation ugi;

public HiveClientPool(int poolSize, Configuration conf, String clientClassName) {
this(poolSize, conf, clientClassName, null);
}

public HiveClientPool(
int poolSize, Configuration conf, String clientClassName, UserGroupInformation ugi) {
// Do not allow retry by default as we rely on RetryingHiveClient
super(poolSize, TTransportException.class, false);
this.hiveConf = new HiveConf(conf, HiveClientPool.class);
this.hiveConf.addResource(conf);
this.clientClassName = clientClassName;
this.ugi = ugi;
}

@Override
protected IMetaStoreClient newClient() {
return new RetryingMetaStoreClientFactory().createClient(hiveConf, clientClassName);
return this.ugi != null
? this.ugi.doAs(
(PrivilegedAction<IMetaStoreClient>)
() ->
new RetryingMetaStoreClientFactory()
.createClient(hiveConf, clientClassName))
: new RetryingMetaStoreClientFactory().createClient(hiveConf, clientClassName);
}

@Override
protected IMetaStoreClient reconnect(IMetaStoreClient client) {
try {
client.close();
client.reconnect();
} catch (MetaException e) {
if (this.ugi != null) {
this.ugi.doAs(
(PrivilegedExceptionAction<Void>)
() -> {
client.close();
client.reconnect();
return null;
});
} else {
client.close();
client.reconnect();
}
} catch (MetaException | IOException | InterruptedException e) {
throw new RuntimeException("Failed to reconnect to Hive Metastore", e);
}
return client;
Expand Down
Loading

0 comments on commit fcbee80

Please sign in to comment.