Skip to content

Commit

Permalink
[hive] bugfix jdbc paimon hive client pool (apache#3004)
Browse files Browse the repository at this point in the history
  • Loading branch information
坤泰 committed Aug 22, 2024
1 parent fcbee80 commit 358a07e
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.apache.paimon.hive.HiveCatalogOptions.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS;
import static org.apache.paimon.hive.HiveCatalogOptions.CLIENT_POOL_CACHE_KEYS;
Expand Down Expand Up @@ -82,32 +83,14 @@ public CachedClientPool(Configuration conf, Options options, String clientClassN
this.key = extractKey(options.get(CLIENT_POOL_CACHE_KEYS), conf, options);
this.clientClassName = clientClassName;
init();
// set ugi information to hms client
this.clientPool();
}

@VisibleForTesting
HiveClientPool clientPool() {
return clientPoolCache.get(
key,
k ->
new HiveClientPool(
clientPoolSize, conf, clientClassName, this.getCurrentUser(k)));
}

private UserGroupInformation getCurrentUser(Key key) {
try {
for (Object elem : key.elements) {
if (elem instanceof UserGroupInformationConf) {
return ((UserGroupInformationConf) elem).ugi;
}
if (elem instanceof UserNameConf) {
return UserGroupInformation.createProxyUser(
((UserNameConf) elem).userName, UserGroupInformation.getCurrentUser());
}
}
return UserGroupInformation.getCurrentUser();
} catch (Exception e) {
return null;
}
key, k -> new HiveClientPool(clientPoolSize, conf, clientClassName));
}

private synchronized void init() {
Expand Down Expand Up @@ -281,6 +264,11 @@ public int hashCode() {
}
return hashCode;
}

@Override
public String toString() {
return elements.stream().map(Object::toString).collect(Collectors.joining(","));
}
}

static class ConfElement {
Expand Down Expand Up @@ -353,35 +341,6 @@ public static UserGroupInformationConf of(UserGroupInformation ugi) {
}
}

static class UserNameConf {
private final String userName;

private UserNameConf(String userName) {
this.userName = userName;
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
UserNameConf other = (UserNameConf) obj;
return Objects.equals(other.userName, userName);
}

@Override
public int hashCode() {
return Objects.hash(userName);
}

public static UserNameConf of(String userName) {
return new UserNameConf(userName);
}
}

private enum KeyElementType {
UGI,
USER_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,9 @@
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;

import java.io.IOException;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;

/**
* Pool of Hive Metastore clients.
*
Expand All @@ -43,49 +38,32 @@ public class HiveClientPool extends ClientPool.ClientPoolImpl<IMetaStoreClient,

private final HiveConf hiveConf;
private final String clientClassName;
private final UserGroupInformation ugi;

public HiveClientPool(int poolSize, Configuration conf, String clientClassName) {
this(poolSize, conf, clientClassName, null);
}

public HiveClientPool(
int poolSize, Configuration conf, String clientClassName, UserGroupInformation ugi) {
// Do not allow retry by default as we rely on RetryingHiveClient
super(poolSize, TTransportException.class, false);
this.hiveConf = new HiveConf(conf, HiveClientPool.class);
this.hiveConf.addResource(conf);
this.clientClassName = clientClassName;
this.ugi = ugi;
// set ugi information to hms client
try {
this.run(client -> null);
} catch (Exception e) {
throw new RuntimeException("Failed to connect to Hive Metastore", e);
}
}

@Override
protected IMetaStoreClient newClient() {
return this.ugi != null
? this.ugi.doAs(
(PrivilegedAction<IMetaStoreClient>)
() ->
new RetryingMetaStoreClientFactory()
.createClient(hiveConf, clientClassName))
: new RetryingMetaStoreClientFactory().createClient(hiveConf, clientClassName);
return new RetryingMetaStoreClientFactory().createClient(hiveConf, clientClassName);
}

@Override
protected IMetaStoreClient reconnect(IMetaStoreClient client) {
try {
if (this.ugi != null) {
this.ugi.doAs(
(PrivilegedExceptionAction<Void>)
() -> {
client.close();
client.reconnect();
return null;
});
} else {
client.close();
client.reconnect();
}
} catch (MetaException | IOException | InterruptedException e) {
client.close();
client.reconnect();
} catch (MetaException e) {
throw new RuntimeException("Failed to reconnect to Hive Metastore", e);
}
return client;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,29 @@

package org.apache.paimon.hive.pool;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.hive.HiveCatalog;
import org.apache.paimon.options.Options;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.ql.security.authorization.plugin.AuthorizationMetaStoreFilterHook;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.thrift.TException;
import org.junit.Test;

import java.io.File;
import java.io.IOException;
import java.security.PrivilegedAction;
import java.util.List;
import java.util.UUID;

import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTORECONNECTURLKEY;
import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link CachedClientPool}. */
Expand Down Expand Up @@ -303,7 +311,7 @@ public void testCacheKeyOptionAllSame() {
}

@Test
public void testLoginPaimon() throws TException {
public void testLoginPaimon() throws Exception {
// user paimon login
Options options = new Options();
options.set("type", "paimon");
Expand All @@ -328,11 +336,11 @@ public void testLoginPaimon() throws TException {
HiveMetaStoreClient.class.getName()));

// contain default database
assertThat(cache.clientPool().newClient().getAllDatabases()).contains("default");
assertThat(cache.run(IMetaStoreClient::getAllDatabases)).contains("default");
}

@Test
public void testLoginRoot() throws TException {
public void testLoginRoot() throws Exception {
// user paimon login
Options options = new Options();
options.set("type", "paimon");
Expand All @@ -355,7 +363,72 @@ public void testLoginRoot() throws TException {
HiveMetaStoreClient.class.getName()));

// contain default database
assertThat(cache.clientPool().newClient().getAllDatabases()).contains("default");
assertThat(cache.run(IMetaStoreClient::getAllDatabases)).contains("default");
}

@Test
public void testLoginHive() throws Exception {
Catalog catalog =
UserGroupInformation.createRemoteUser("hive")
.doAs(
(PrivilegedAction<Catalog>)
() -> {
try {
// paimon options
Options options = new Options();
options.set("type", "paimon");
options.set("paimon.catalog.type", "hive");
options.set(
"hive.metastore.uris",
"thrift://30.150.24.155:9083");
options.set("client-pool-cache.keys", "ugi,conf:*");

// hive config
HiveConf hiveConf = new HiveConf();
hiveConf.set("current.user", "hive");
hiveConf.set(
"hive.metastore.filter.hook",
MockAuthorizationMetaStoreFilterHook.class
.getName());

// tempDir
File tempDir =
File.createTempFile(
"paimon-", "-hive.catalog");
tempDir.delete();
tempDir.mkdir();
tempDir.deleteOnExit();
String warehouse = tempDir.getAbsolutePath();

CatalogContext catalogContext =
CatalogContext.create(options);
FileIO fileIO =
FileIO.get(
new Path(warehouse),
catalogContext);

String jdoConnectionURL =
"jdbc:derby:memory:" + UUID.randomUUID();
hiveConf.setVar(
METASTORECONNECTURLKEY,
jdoConnectionURL + ";create=true");
String metastoreClientClass =
"org.apache.hadoop.hive.metastore.HiveMetaStoreClient";

return new HiveCatalog(
fileIO,
hiveConf,
metastoreClientClass,
options,
warehouse);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
;

// contain default database
assertThat(catalog.listDatabases()).contains("default");
}

/** Tests for {@link CachedClientPool}. */
Expand Down

0 comments on commit 358a07e

Please sign in to comment.