Setting the client's pool cache eviction interval(ms).
+
+
+
+
client-pool-cache.keys
+
(none)
+
String
+
Specify client cache key, multiple elements separated by commas.
"ugi": the Hadoop UserGroupInformation instance that represents the current user using the cache.
"user_name" similar to UGI but only includes the user's name determined by UserGroupInformation#getUserName.
"conf": name of an arbitrary configuration. The value of the configuration will be extracted from catalog properties and added to the cache key. A conf element should start with a "conf:" prefix which is followed by the configuration name. E.g. specifying "conf:a.b.c" will add "a.b.c" to the key, and so that configurations with different default catalog wouldn't share the same client pool. Multiple conf elements can be specified.
+
hadoop-conf-dir
(none)
diff --git a/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java b/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java
index deddf75a2452..b437e68935e2 100644
--- a/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java
+++ b/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java
@@ -33,15 +33,24 @@
/** Client pool for using multiple clients to execute actions. */
public interface ClientPool {
- /** Action interface for client. */
+ /** Action interface with return object for client. */
interface Action {
R run(C client) throws E;
}
+ /** Action interface with return void for client. */
+ interface ExecuteAction {
+ void run(C client) throws E;
+ }
+
R run(Action action) throws E, InterruptedException;
R run(Action action, boolean retry) throws E, InterruptedException;
+ void execute(ExecuteAction action) throws E, InterruptedException;
+
+ void execute(ExecuteAction action, boolean retry) throws E, InterruptedException;
+
/** Default implementation for {@link ClientPool}. */
abstract class ClientPoolImpl implements Closeable, ClientPool {
private static final Logger LOG = LoggerFactory.getLogger(ClientPoolImpl.class);
@@ -93,6 +102,36 @@ public R run(Action action, boolean retry) throws E, InterruptedExc
}
}
+ @Override
+ public void execute(ExecuteAction action) throws E, InterruptedException {
+ execute(action, retryByDefault);
+ }
+
+ @Override
+ public void execute(ExecuteAction action, boolean retry)
+ throws E, InterruptedException {
+ C client = get();
+ try {
+ action.run(client);
+ } catch (Exception exc) {
+ if (retry && isConnectionException(exc)) {
+ try {
+ client = reconnect(client);
+ } catch (Exception ignored) {
+ // if reconnection throws any exception, rethrow the original failure
+ throw reconnectExc.cast(exc);
+ }
+
+ action.run(client);
+ }
+
+ throw exc;
+
+ } finally {
+ release(client);
+ }
+ }
+
protected abstract C newClient();
protected abstract C reconnect(C client);
diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 6a1bb9d0acd1..cb651f22da8e 100644
--- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -26,9 +26,11 @@
import org.apache.paimon.catalog.CatalogLockContext;
import org.apache.paimon.catalog.CatalogLockFactory;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.client.ClientPool;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.hive.pool.CachedClientPool;
import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.CatalogOptions;
@@ -118,7 +120,8 @@ public class HiveCatalog extends AbstractCatalog {
private final HiveConf hiveConf;
private final String clientClassName;
- private final IMetaStoreClient client;
+ private final Options options;
+ private final ClientPool clients;
private final String warehouse;
private final LocationHelper locationHelper;
@@ -136,6 +139,7 @@ public HiveCatalog(
super(fileIO, options);
this.hiveConf = hiveConf;
this.clientClassName = clientClassName;
+ this.options = options;
this.warehouse = warehouse;
boolean needLocationInProperties =
@@ -149,7 +153,7 @@ public HiveCatalog(
locationHelper = new StorageLocationHelper();
}
- this.client = createClient(hiveConf, clientClassName);
+ this.clients = new CachedClientPool(hiveConf, options, clientClassName);
}
@Override
@@ -169,7 +173,11 @@ public Optional metastoreClientFactory(Identifier ident
try {
return Optional.of(
new HiveMetastoreClient.Factory(
- identifier, getDataTableSchema(identifier), hiveConf, clientClassName));
+ identifier,
+ getDataTableSchema(identifier),
+ hiveConf,
+ clientClassName,
+ options));
} catch (TableNotExistException e) {
throw new RuntimeException(
"Table " + identifier + " does not exist. This is unexpected.", e);
@@ -181,47 +189,63 @@ public Path getDataTableLocation(Identifier identifier) {
try {
String databaseName = identifier.getDatabaseName();
String tableName = identifier.getObjectName();
- if (client.tableExists(databaseName, tableName)) {
- String location =
- locationHelper.getTableLocation(client.getTable(databaseName, tableName));
- if (location != null) {
- return new Path(location);
- }
- } else {
- // If the table does not exist,
- // we should use the database path to generate the table path.
- String dbLocation =
- locationHelper.getDatabaseLocation(client.getDatabase(databaseName));
- if (dbLocation != null) {
- return new Path(dbLocation, tableName);
- }
- }
-
- return super.getDataTableLocation(identifier);
+ Optional tablePath =
+ clients.run(
+ client -> {
+ if (client.tableExists(databaseName, tableName)) {
+ String location =
+ locationHelper.getTableLocation(
+ client.getTable(databaseName, tableName));
+ if (location != null) {
+ return Optional.of(new Path(location));
+ }
+ } else {
+ // If the table does not exist,
+ // we should use the database path to generate the table path.
+ String dbLocation =
+ locationHelper.getDatabaseLocation(
+ client.getDatabase(databaseName));
+ if (dbLocation != null) {
+ return Optional.of(new Path(dbLocation, tableName));
+ }
+ }
+ return Optional.empty();
+ });
+ return tablePath.orElse(super.getDataTableLocation(identifier));
} catch (TException e) {
throw new RuntimeException("Can not get table " + identifier + " from metastore.", e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(
+ "Interrupted in call to getDataTableLocation " + identifier, e);
}
}
@Override
public List listDatabases() {
try {
- return client.getAllDatabases();
+ return clients.run(IMetaStoreClient::getAllDatabases);
} catch (TException e) {
throw new RuntimeException("Failed to list all databases", e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted in call to listDatabases", e);
}
}
@Override
protected boolean databaseExistsImpl(String databaseName) {
try {
- client.getDatabase(databaseName);
+ clients.run(client -> client.getDatabase(databaseName));
return true;
} catch (NoSuchObjectException e) {
return false;
} catch (TException e) {
throw new RuntimeException(
"Failed to determine if database " + databaseName + " exists", e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted in call to databaseExists " + databaseName, e);
}
}
@@ -235,9 +259,12 @@ protected void createDatabaseImpl(String name, Map properties) {
: new Path(database.getLocationUri());
locationHelper.createPathIfRequired(databasePath, fileIO);
locationHelper.specifyDatabaseLocation(databasePath, database);
- client.createDatabase(database);
+ clients.execute(client -> client.createDatabase(database));
} catch (TException | IOException e) {
throw new RuntimeException("Failed to create database " + name, e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted in call to createDatabase " + name, e);
}
}
@@ -262,10 +289,13 @@ private Database convertToHiveDatabase(String name, Map properti
@Override
public Map loadDatabasePropertiesImpl(String name) {
try {
- return convertToProperties(client.getDatabase(name));
+ return convertToProperties(clients.run(client -> client.getDatabase(name)));
} catch (TException e) {
throw new RuntimeException(
String.format("Failed to get database %s properties", name), e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted in call to loadDatabaseProperties " + name, e);
}
}
@@ -275,11 +305,12 @@ public void dropPartition(Identifier identifier, Map partitionSp
TableSchema tableSchema = getDataTableSchema(identifier);
if (!tableSchema.partitionKeys().isEmpty()
&& new CoreOptions(tableSchema.options()).partitionedTableInMetastore()) {
+
try {
// Do not close client, it is for HiveCatalog
@SuppressWarnings("resource")
HiveMetastoreClient metastoreClient =
- new HiveMetastoreClient(identifier, tableSchema, client);
+ new HiveMetastoreClient(identifier, tableSchema, clients);
metastoreClient.deletePartition(new LinkedHashMap<>(partitionSpec));
} catch (Exception e) {
throw new RuntimeException(e);
@@ -302,27 +333,36 @@ private Map convertToProperties(Database database) {
@Override
protected void dropDatabaseImpl(String name) {
try {
- Database database = client.getDatabase(name);
+ Database database = clients.run(client -> client.getDatabase(name));
String location = locationHelper.getDatabaseLocation(database);
locationHelper.dropPathIfRequired(new Path(location), fileIO);
- client.dropDatabase(name, true, false, true);
+ clients.execute(client -> client.dropDatabase(name, true, false, true));
} catch (TException | IOException e) {
throw new RuntimeException("Failed to drop database " + name, e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted in call to dropDatabase " + name, e);
}
}
@Override
protected List listTablesImpl(String databaseName) {
try {
- return client.getAllTables(databaseName).stream()
- .filter(
- tableName -> {
- Identifier identifier = new Identifier(databaseName, tableName);
- return tableExists(identifier);
- })
- .collect(Collectors.toList());
+ return clients.run(
+ client ->
+ client.getAllTables(databaseName).stream()
+ .filter(
+ tableName -> {
+ Identifier identifier =
+ new Identifier(databaseName, tableName);
+ return tableExists(identifier);
+ })
+ .collect(Collectors.toList()));
} catch (TException e) {
throw new RuntimeException("Failed to list all tables in database " + databaseName, e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted in call to listTables " + databaseName, e);
}
}
@@ -334,13 +374,22 @@ public boolean tableExists(Identifier identifier) {
Table table;
try {
- table = client.getTable(identifier.getDatabaseName(), identifier.getObjectName());
+ table =
+ clients.run(
+ client ->
+ client.getTable(
+ identifier.getDatabaseName(),
+ identifier.getObjectName()));
} catch (NoSuchObjectException e) {
return false;
} catch (TException e) {
throw new RuntimeException(
"Cannot determine if table " + identifier.getFullName() + " is a paimon table.",
e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(
+ "Interrupted in call to tableExists " + identifier.getFullName(), e);
}
return isPaimonTable(table);
@@ -374,8 +423,14 @@ private boolean usingExternalTable() {
@Override
protected void dropTableImpl(Identifier identifier) {
try {
- client.dropTable(
- identifier.getDatabaseName(), identifier.getObjectName(), true, false, true);
+ clients.execute(
+ client ->
+ client.dropTable(
+ identifier.getDatabaseName(),
+ identifier.getObjectName(),
+ true,
+ false,
+ true));
// When drop a Hive external table, only the hive metadata is deleted and the data files
// are not deleted.
@@ -396,6 +451,10 @@ protected void dropTableImpl(Identifier identifier) {
}
} catch (TException e) {
throw new RuntimeException("Failed to drop table " + identifier.getFullName(), e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(
+ "Interrupted in call to dropTable " + identifier.getFullName(), e);
}
}
@@ -415,7 +474,7 @@ protected void createTableImpl(Identifier identifier, Schema schema) {
}
try {
- client.createTable(createHiveTable(identifier, tableSchema));
+ clients.execute(client -> client.createTable(createHiveTable(identifier, tableSchema)));
} catch (Exception e) {
Path path = getDataTableLocation(identifier);
try {
@@ -441,10 +500,10 @@ protected void renameTableImpl(Identifier fromTable, Identifier toTable) {
try {
String fromDB = fromTable.getDatabaseName();
String fromTableName = fromTable.getObjectName();
- Table table = client.getTable(fromDB, fromTableName);
+ Table table = clients.run(client -> client.getTable(fromDB, fromTableName));
table.setDbName(toTable.getDatabaseName());
table.setTableName(toTable.getObjectName());
- client.alter_table(fromDB, fromTableName, table);
+ clients.execute(client -> client.alter_table(fromDB, fromTableName, table));
Path fromPath = getDataTableLocation(fromTable);
if (!new SchemaManager(fileIO, fromPath).listAllIds().isEmpty()) {
@@ -463,10 +522,16 @@ protected void renameTableImpl(Identifier fromTable, Identifier toTable) {
// update location
locationHelper.specifyTableLocation(table, toPath.toString());
- client.alter_table(toTable.getDatabaseName(), toTable.getObjectName(), table);
+ clients.execute(
+ client ->
+ client.alter_table(
+ toTable.getDatabaseName(), toTable.getObjectName(), table));
}
} catch (TException e) {
throw new RuntimeException("Failed to rename table " + fromTable.getFullName(), e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted in call to renameTable", e);
}
}
@@ -479,7 +544,12 @@ protected void alterTableImpl(Identifier identifier, List changes)
TableSchema schema = schemaManager.commitChanges(changes);
try {
- Table table = client.getTable(identifier.getDatabaseName(), identifier.getObjectName());
+ Table table =
+ clients.run(
+ client ->
+ client.getTable(
+ identifier.getDatabaseName(),
+ identifier.getObjectName()));
alterTableToHms(table, identifier, schema);
} catch (Exception te) {
schemaManager.deleteSchema(schema.id());
@@ -488,10 +558,16 @@ protected void alterTableImpl(Identifier identifier, List changes)
}
private void alterTableToHms(Table table, Identifier identifier, TableSchema newSchema)
- throws TException {
+ throws TException, InterruptedException {
updateHmsTablePars(table, newSchema);
updateHmsTable(table, identifier, newSchema);
- client.alter_table(identifier.getDatabaseName(), identifier.getObjectName(), table, true);
+ clients.execute(
+ client ->
+ client.alter_table(
+ identifier.getDatabaseName(),
+ identifier.getObjectName(),
+ table,
+ true));
}
@Override
@@ -524,11 +600,8 @@ public void repairDatabase(String databaseName) {
// tables from file system
List tables;
try {
- tables =
- listTablesInFileSystem(
- new Path(
- locationHelper.getDatabaseLocation(
- client.getDatabase(databaseName))));
+ Database database = clients.run(client -> client.getDatabase(databaseName));
+ tables = listTablesInFileSystem(new Path(locationHelper.getDatabaseLocation(database)));
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -554,7 +627,11 @@ public void repairTable(Identifier identifier) throws TableNotExistException {
try {
try {
Table table =
- client.getTable(identifier.getDatabaseName(), identifier.getObjectName());
+ clients.run(
+ client ->
+ client.getTable(
+ identifier.getDatabaseName(),
+ identifier.getObjectName()));
checkArgument(
isPaimonTable(table),
"Table %s is not a paimon table in hive metastore.",
@@ -564,7 +641,7 @@ public void repairTable(Identifier identifier) throws TableNotExistException {
}
} catch (NoSuchObjectException e) {
// hive table does not exist.
- client.createTable(newTable);
+ clients.execute(client -> client.createTable(newTable));
}
// repair partitions
@@ -572,7 +649,7 @@ public void repairTable(Identifier identifier) throws TableNotExistException {
// Do not close client, it is for HiveCatalog
@SuppressWarnings("resource")
HiveMetastoreClient metastoreClient =
- new HiveMetastoreClient(identifier, tableSchema, client);
+ new HiveMetastoreClient(identifier, tableSchema, clients);
List partitions =
getTable(identifier).newReadBuilder().newScan().listPartitions();
for (BinaryRow partition : partitions) {
@@ -586,7 +663,7 @@ public void repairTable(Identifier identifier) throws TableNotExistException {
@Override
public void close() throws Exception {
- client.close();
+ // do nothing
}
@Override
@@ -690,7 +767,11 @@ private void updateHmsTablePars(Table table, TableSchema schema) {
@VisibleForTesting
public IMetaStoreClient getHmsClient() {
- return client;
+ try {
+ return clients.run(client -> client);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to close hms client:", e);
+ }
}
private FieldSchema convertToFieldSchema(DataField dataField) {
@@ -711,14 +792,10 @@ private Lock lock(Identifier identifier) {
}
HiveCatalogLock lock =
- new HiveCatalogLock(client, checkMaxSleep(hiveConf), acquireTimeout(hiveConf));
+ new HiveCatalogLock(clients, checkMaxSleep(hiveConf), acquireTimeout(hiveConf));
return Lock.fromCatalog(lock, identifier);
}
- static IMetaStoreClient createClient(HiveConf hiveConf, String clientClassName) {
- return new RetryingMetaStoreClientFactory().createClient(hiveConf, clientClassName);
- }
-
public static HiveConf createHiveConf(
@Nullable String hiveConfDir,
@Nullable String hadoopConfDir,
diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java
index 8c3d3829ea28..7bdea2d28f4f 100644
--- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java
+++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java
@@ -19,6 +19,9 @@
package org.apache.paimon.hive;
import org.apache.paimon.catalog.CatalogLock;
+import org.apache.paimon.client.ClientPool;
+import org.apache.paimon.hive.pool.CachedClientPool;
+import org.apache.paimon.options.Options;
import org.apache.paimon.utils.TimeUtils;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -45,12 +48,15 @@ public class HiveCatalogLock implements CatalogLock {
static final String LOCK_IDENTIFIER = "hive";
- private final IMetaStoreClient client;
+ private final ClientPool clients;
private final long checkMaxSleep;
private final long acquireTimeout;
- public HiveCatalogLock(IMetaStoreClient client, long checkMaxSleep, long acquireTimeout) {
- this.client = client;
+ public HiveCatalogLock(
+ ClientPool clients,
+ long checkMaxSleep,
+ long acquireTimeout) {
+ this.clients = clients;
this.checkMaxSleep = checkMaxSleep;
this.acquireTimeout = acquireTimeout;
}
@@ -76,7 +82,7 @@ private long lock(String database, String table)
Collections.singletonList(lockComponent),
System.getProperty("user.name"),
InetAddress.getLocalHost().getHostName());
- LockResponse lockResponse = this.client.lock(lockRequest);
+ LockResponse lockResponse = clients.run(client -> client.lock(lockRequest));
long nextSleep = 50;
long startRetry = System.currentTimeMillis();
@@ -87,7 +93,8 @@ private long lock(String database, String table)
}
Thread.sleep(nextSleep);
- lockResponse = client.checkLock(lockResponse.getLockid());
+ final LockResponse tempLockResponse = lockResponse;
+ lockResponse = clients.run(client -> client.checkLock(tempLockResponse.getLockid()));
if (System.currentTimeMillis() - startRetry > acquireTimeout) {
break;
}
@@ -96,7 +103,8 @@ private long lock(String database, String table)
if (lockResponse.getState() != LockState.ACQUIRED) {
if (lockResponse.getState() == LockState.WAITING) {
- client.unlock(lockResponse.getLockid());
+ final LockResponse tempLockResponse = lockResponse;
+ clients.execute(client -> client.unlock(tempLockResponse.getLockid()));
}
throw new RuntimeException(
"Acquire lock failed with time: " + Duration.ofMillis(retryDuration));
@@ -104,13 +112,18 @@ private long lock(String database, String table)
return lockResponse.getLockid();
}
- private void unlock(long lockId) throws TException {
- client.unlock(lockId);
+ private void unlock(long lockId) throws TException, InterruptedException {
+ clients.execute(client -> client.unlock(lockId));
}
@Override
public void close() {
- this.client.close();
+ // do nothing
+ }
+
+ public static ClientPool createClients(
+ HiveConf conf, Options options, String clientClassName) {
+ return new CachedClientPool(conf, options, clientClassName);
}
public static long checkMaxSleep(HiveConf conf) {
diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLockFactory.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLockFactory.java
index 7c05ce3ee520..764308a31be8 100644
--- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLockFactory.java
+++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLockFactory.java
@@ -27,6 +27,7 @@
import static org.apache.paimon.hive.HiveCatalogLock.LOCK_IDENTIFIER;
import static org.apache.paimon.hive.HiveCatalogLock.acquireTimeout;
import static org.apache.paimon.hive.HiveCatalogLock.checkMaxSleep;
+import static org.apache.paimon.hive.HiveCatalogLock.createClients;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Catalog lock factory for hive. */
@@ -40,7 +41,7 @@ public CatalogLock createLock(CatalogLockContext context) {
HiveCatalogLockContext hiveLockContext = (HiveCatalogLockContext) context;
HiveConf conf = hiveLockContext.hiveConf().conf();
return new HiveCatalogLock(
- HiveCatalog.createClient(conf, hiveLockContext.clientClassName()),
+ createClients(conf, hiveLockContext.options(), hiveLockContext.clientClassName()),
checkMaxSleep(conf),
acquireTimeout(conf));
}
diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogOptions.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogOptions.java
index 0f1b4d454e25..38f73bc6bd65 100644
--- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogOptions.java
+++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogOptions.java
@@ -20,6 +20,11 @@
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.options.description.Description;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.paimon.options.description.TextElement.text;
/** Options for hive catalog. */
public final class HiveCatalogOptions {
@@ -52,5 +57,33 @@ public final class HiveCatalogOptions {
+ "If you don't want to access the location by the filesystem of hive when using a object storage such as s3,oss\n"
+ "you can set this option to true.\n");
+ public static final ConfigOption CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS =
+ ConfigOptions.key("client-pool-cache.eviction-interval-ms")
+ .longType()
+ .defaultValue(TimeUnit.MINUTES.toMillis(5))
+ .withDescription("Setting the client's pool cache eviction interval(ms).\n");
+
+ public static final ConfigOption CLIENT_POOL_CACHE_KEYS =
+ ConfigOptions.key("client-pool-cache.keys")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "Specify client cache key, multiple elements separated by commas.")
+ .linebreak()
+ .list(
+ text(
+ "\"ugi\": the Hadoop UserGroupInformation instance that represents the current user using the cache."))
+ .list(
+ text(
+ "\"user_name\" similar to UGI but only includes the user's name determined by UserGroupInformation#getUserName."))
+ .list(
+ text(
+ "\"conf\": name of an arbitrary configuration. "
+ + "The value of the configuration will be extracted from catalog properties and added to the cache key. A conf element should start with a \"conf:\" prefix which is followed by the configuration name. "
+ + "E.g. specifying \"conf:a.b.c\" will add \"a.b.c\" to the key, and so that configurations with different default catalog wouldn't share the same client pool. Multiple conf elements can be specified."))
+ .build());
+
private HiveCatalogOptions() {}
}
diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
index 6f3473e8eac7..0192e0022fb0 100644
--- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
+++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
@@ -20,8 +20,11 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.client.ClientPool;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.hive.pool.CachedClientPool;
import org.apache.paimon.metastore.MetastoreClient;
+import org.apache.paimon.options.Options;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.PartitionPathUtils;
@@ -32,6 +35,7 @@
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.PartitionEventType;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.thrift.TException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
@@ -43,11 +47,14 @@ public class HiveMetastoreClient implements MetastoreClient {
private final Identifier identifier;
private final InternalRowPartitionComputer partitionComputer;
- private final IMetaStoreClient client;
+ private final ClientPool clients;
private final StorageDescriptor sd;
- public HiveMetastoreClient(Identifier identifier, TableSchema schema, IMetaStoreClient client)
- throws Exception {
+ HiveMetastoreClient(
+ Identifier identifier,
+ TableSchema schema,
+ ClientPool clients)
+ throws TException, InterruptedException {
this.identifier = identifier;
this.partitionComputer =
new InternalRowPartitionComputer(
@@ -55,8 +62,15 @@ public HiveMetastoreClient(Identifier identifier, TableSchema schema, IMetaStore
schema.logicalPartitionType(),
schema.partitionKeys().toArray(new String[0]));
- this.client = client;
- this.sd = client.getTable(identifier.getDatabaseName(), identifier.getObjectName()).getSd();
+ this.clients = clients;
+ this.sd =
+ this.clients
+ .run(
+ client ->
+ client.getTable(
+ identifier.getDatabaseName(),
+ identifier.getObjectName()))
+ .getSd();
}
@Override
@@ -68,8 +82,12 @@ public void addPartition(BinaryRow partition) throws Exception {
public void addPartition(LinkedHashMap partitionSpec) throws Exception {
List partitionValues = new ArrayList<>(partitionSpec.values());
try {
- client.getPartition(
- identifier.getDatabaseName(), identifier.getObjectName(), partitionValues);
+ clients.execute(
+ client ->
+ client.getPartition(
+ identifier.getDatabaseName(),
+ identifier.getObjectName(),
+ partitionValues));
// do nothing if the partition already exists
} catch (NoSuchObjectException e) {
// partition not found, create new partition
@@ -88,7 +106,7 @@ public void addPartition(LinkedHashMap partitionSpec) throws Exc
hivePartition.setCreateTime(currentTime);
hivePartition.setLastAccessTime(currentTime);
- client.add_partition(hivePartition);
+ clients.execute(client -> client.add_partition(hivePartition));
}
}
@@ -96,11 +114,13 @@ public void addPartition(LinkedHashMap partitionSpec) throws Exc
public void deletePartition(LinkedHashMap partitionSpec) throws Exception {
List partitionValues = new ArrayList<>(partitionSpec.values());
try {
- client.dropPartition(
- identifier.getDatabaseName(),
- identifier.getObjectName(),
- partitionValues,
- false);
+ clients.execute(
+ client ->
+ client.dropPartition(
+ identifier.getDatabaseName(),
+ identifier.getObjectName(),
+ partitionValues,
+ false));
} catch (NoSuchObjectException e) {
// do nothing if the partition not exists
}
@@ -109,11 +129,13 @@ public void deletePartition(LinkedHashMap partitionSpec) throws
@Override
public void markDone(LinkedHashMap partitionSpec) throws Exception {
try {
- client.markPartitionForEvent(
- identifier.getDatabaseName(),
- identifier.getObjectName(),
- partitionSpec,
- PartitionEventType.LOAD_DONE);
+ clients.execute(
+ client ->
+ client.markPartitionForEvent(
+ identifier.getDatabaseName(),
+ identifier.getObjectName(),
+ partitionSpec,
+ PartitionEventType.LOAD_DONE));
} catch (NoSuchObjectException e) {
// do nothing if the partition not exists
}
@@ -121,11 +143,11 @@ public void markDone(LinkedHashMap partitionSpec) throws Excepti
@Override
public void close() throws Exception {
- client.close();
+ // do nothing
}
- public IMetaStoreClient client() {
- return this.client;
+ public IMetaStoreClient client() throws TException, InterruptedException {
+ return clients.run(client -> client);
}
/** Factory to create {@link HiveMetastoreClient}. */
@@ -137,16 +159,19 @@ public static class Factory implements MetastoreClient.Factory {
private final TableSchema schema;
private final SerializableHiveConf hiveConf;
private final String clientClassName;
+ private final Options options;
public Factory(
Identifier identifier,
TableSchema schema,
HiveConf hiveConf,
- String clientClassName) {
+ String clientClassName,
+ Options options) {
this.identifier = identifier;
this.schema = schema;
this.hiveConf = new SerializableHiveConf(hiveConf);
this.clientClassName = clientClassName;
+ this.options = options;
}
@Override
@@ -154,9 +179,15 @@ public MetastoreClient create() {
HiveConf conf = hiveConf.conf();
try {
return new HiveMetastoreClient(
- identifier, schema, HiveCatalog.createClient(conf, clientClassName));
- } catch (Exception e) {
- throw new RuntimeException(e);
+ identifier, schema, new CachedClientPool(conf, options, clientClassName));
+ } catch (TException e) {
+ throw new RuntimeException(
+ "Can not get table " + identifier + " info from metastore.", e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(
+ "Interrupted in call to new HiveMetastoreClient for table " + identifier,
+ e);
}
}
}
diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/pool/CachedClientPool.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/pool/CachedClientPool.java
new file mode 100644
index 000000000000..3b2cf1540b4e
--- /dev/null
+++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/pool/CachedClientPool.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hive.pool;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.client.ClientPool;
+import org.apache.paimon.hive.HiveCatalogOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Scheduler;
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
+import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;
+import org.apache.paimon.shade.guava30.com.google.common.collect.Sets;
+import org.apache.paimon.shade.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift.TException;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.paimon.hive.HiveCatalogOptions.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS;
+import static org.apache.paimon.hive.HiveCatalogOptions.CLIENT_POOL_CACHE_KEYS;
+import static org.apache.paimon.options.CatalogOptions.CLIENT_POOL_SIZE;
+
+/**
+ * A ClientPool that caches the underlying HiveClientPool instances.
+ *
+ *
Mostly copied from iceberg.
+ */
+public class CachedClientPool implements ClientPool {
+
+ private static final String CONF_ELEMENT_PREFIX = "conf:";
+
+ private static Cache clientPoolCache;
+
+ private final Configuration conf;
+ private final int clientPoolSize;
+ private final long evictionInterval;
+ private final Key key;
+ private final String clientClassName;
+
+ public CachedClientPool(Configuration conf, Options options, String clientClassName) {
+ this.conf = conf;
+ this.clientPoolSize = options.get(CLIENT_POOL_SIZE);
+ this.evictionInterval = options.get(CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS);
+ this.key = extractKey(options.get(CLIENT_POOL_CACHE_KEYS), conf);
+ this.clientClassName = clientClassName;
+ init();
+ }
+
+ @VisibleForTesting
+ HiveClientPool clientPool() {
+ return clientPoolCache.get(
+ key, k -> new HiveClientPool(clientPoolSize, conf, clientClassName));
+ }
+
+ private synchronized void init() {
+ if (clientPoolCache == null) {
+ // Since Caffeine does not ensure that removalListener will be involved after expiration
+ // We use a scheduler with one thread to clean up expired clients.
+ clientPoolCache =
+ Caffeine.newBuilder()
+ .expireAfterAccess(evictionInterval, TimeUnit.MILLISECONDS)
+ .removalListener(
+ (ignored, value, cause) -> {
+ if (value != null) {
+ ((HiveClientPool) value).close();
+ }
+ })
+ .scheduler(
+ Scheduler.forScheduledExecutorService(
+ Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("hive-metastore-cleaner")
+ .build())))
+ .build();
+ }
+ }
+
+ @VisibleForTesting
+ static Cache clientPoolCache() {
+ return clientPoolCache;
+ }
+
+ @Override
+ public R run(Action action)
+ throws TException, InterruptedException {
+ return clientPool().run(action);
+ }
+
+ @Override
+ public R run(Action action, boolean retry)
+ throws TException, InterruptedException {
+ return clientPool().run(action, retry);
+ }
+
+ @Override
+ public void execute(ExecuteAction action)
+ throws TException, InterruptedException {
+ clientPool().execute(action);
+ }
+
+ @Override
+ public void execute(ExecuteAction action, boolean retry)
+ throws TException, InterruptedException {
+ clientPool().execute(action, retry);
+ }
+
+ @VisibleForTesting
+ static Key extractKey(String cacheKeys, Configuration conf) {
+ // generate key elements in a certain order, so that the Key instances are comparable
+ List