From 47409b1eaee3a49103a2f716e92da2966380cbb0 Mon Sep 17 00:00:00 2001 From: zhuangchong Date: Tue, 26 Mar 2024 19:11:27 +0800 Subject: [PATCH] add hive client pool, co-author: @XuQianJin-Stars --- .../org/apache/paimon/client/ClientPool.java | 41 ++- .../org/apache/paimon/hive/HiveCatalog.java | 178 +++++++++---- .../paimon/hive/HiveCatalogFactory.java | 5 - .../apache/paimon/hive/HiveCatalogLock.java | 34 ++- .../paimon/hive/HiveCatalogOptions.java | 33 +++ .../paimon/hive/HiveMetastoreClient.java | 65 +++-- .../paimon/hive/pool/CachedClientPool.java | 248 ++++++++++++++++++ .../paimon/hive/pool/HiveClientPool.java | 84 ++++++ .../apache/paimon/hive/HiveCatalogTest.java | 26 +- .../paimon/hive/pool/TestHiveClientPool.java | 190 ++++++++++++++ .../apache/paimon/hive/TestHiveMetastore.java | 1 - 11 files changed, 803 insertions(+), 102 deletions(-) create mode 100644 paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/pool/CachedClientPool.java create mode 100644 paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/pool/HiveClientPool.java create mode 100644 paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/pool/TestHiveClientPool.java diff --git a/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java b/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java index deddf75a2452..b437e68935e2 100644 --- a/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java +++ b/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java @@ -33,15 +33,24 @@ /** Client pool for using multiple clients to execute actions. */ public interface ClientPool { - /** Action interface for client. */ + /** Action interface with return object for client. */ interface Action { R run(C client) throws E; } + /** Action interface with return void for client. */ + interface ExecuteAction { + void run(C client) throws E; + } + R run(Action action) throws E, InterruptedException; R run(Action action, boolean retry) throws E, InterruptedException; + void execute(ExecuteAction action) throws E, InterruptedException; + + void execute(ExecuteAction action, boolean retry) throws E, InterruptedException; + /** Default implementation for {@link ClientPool}. */ abstract class ClientPoolImpl implements Closeable, ClientPool { private static final Logger LOG = LoggerFactory.getLogger(ClientPoolImpl.class); @@ -93,6 +102,36 @@ public R run(Action action, boolean retry) throws E, InterruptedExc } } + @Override + public void execute(ExecuteAction action) throws E, InterruptedException { + execute(action, retryByDefault); + } + + @Override + public void execute(ExecuteAction action, boolean retry) + throws E, InterruptedException { + C client = get(); + try { + action.run(client); + } catch (Exception exc) { + if (retry && isConnectionException(exc)) { + try { + client = reconnect(client); + } catch (Exception ignored) { + // if reconnection throws any exception, rethrow the original failure + throw reconnectExc.cast(exc); + } + + action.run(client); + } + + throw exc; + + } finally { + release(client); + } + } + protected abstract C newClient(); protected abstract C reconnect(C client); diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index 589e920370e0..5a2356944fd6 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -25,8 +25,10 @@ import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogLock; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.client.ClientPool; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.hive.pool.CachedClientPool; import org.apache.paimon.metastore.MetastoreClient; import org.apache.paimon.operation.Lock; import org.apache.paimon.options.CatalogOptions; @@ -114,7 +116,8 @@ public class HiveCatalog extends AbstractCatalog { private final HiveConf hiveConf; private final String clientClassName; - private final IMetaStoreClient client; + private final Options options; + private final ClientPool clients; private final String warehouse; private final LocationHelper locationHelper; @@ -132,6 +135,7 @@ public HiveCatalog( super(fileIO, options); this.hiveConf = hiveConf; this.clientClassName = clientClassName; + this.options = options; this.warehouse = warehouse; boolean needLocationInProperties = @@ -145,14 +149,14 @@ public HiveCatalog( locationHelper = new StorageLocationHelper(); } - this.client = createClient(hiveConf, clientClassName); + this.clients = new CachedClientPool(hiveConf, options, clientClassName); } @Override public Optional lockContext() { return Optional.of( new HiveCatalogLock.HiveLockContext( - new SerializableHiveConf(hiveConf), clientClassName)); + new SerializableHiveConf(hiveConf), clientClassName, options)); } @Override @@ -160,7 +164,11 @@ public Optional metastoreClientFactory(Identifier ident try { return Optional.of( new HiveMetastoreClient.Factory( - identifier, getDataTableSchema(identifier), hiveConf, clientClassName)); + identifier, + getDataTableSchema(identifier), + hiveConf, + clientClassName, + options)); } catch (TableNotExistException e) { throw new RuntimeException( "Table " + identifier + " does not exist. This is unexpected.", e); @@ -172,47 +180,63 @@ public Path getDataTableLocation(Identifier identifier) { try { String databaseName = identifier.getDatabaseName(); String tableName = identifier.getObjectName(); - if (client.tableExists(databaseName, tableName)) { - String location = - locationHelper.getTableLocation(client.getTable(databaseName, tableName)); - if (location != null) { - return new Path(location); - } - } else { - // If the table does not exist, - // we should use the database path to generate the table path. - String dbLocation = - locationHelper.getDatabaseLocation(client.getDatabase(databaseName)); - if (dbLocation != null) { - return new Path(dbLocation, tableName); - } - } - - return super.getDataTableLocation(identifier); + Optional tablePath = + clients.run( + client -> { + if (client.tableExists(databaseName, tableName)) { + String location = + locationHelper.getTableLocation( + client.getTable(databaseName, tableName)); + if (location != null) { + return Optional.of(new Path(location)); + } + } else { + // If the table does not exist, + // we should use the database path to generate the table path. + String dbLocation = + locationHelper.getDatabaseLocation( + client.getDatabase(databaseName)); + if (dbLocation != null) { + return Optional.of(new Path(dbLocation, tableName)); + } + } + return Optional.empty(); + }); + return tablePath.orElse(super.getDataTableLocation(identifier)); } catch (TException e) { throw new RuntimeException("Can not get table " + identifier + " from metastore.", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException( + "Interrupted in call to getDataTableLocation " + identifier, e); } } @Override public List listDatabases() { try { - return client.getAllDatabases(); + return clients.run(IMetaStoreClient::getAllDatabases); } catch (TException e) { throw new RuntimeException("Failed to list all databases", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in call to listDatabases", e); } } @Override protected boolean databaseExistsImpl(String databaseName) { try { - client.getDatabase(databaseName); + clients.run(client -> client.getDatabase(databaseName)); return true; } catch (NoSuchObjectException e) { return false; } catch (TException e) { throw new RuntimeException( "Failed to determine if database " + databaseName + " exists", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in call to databaseExists " + databaseName, e); } } @@ -226,9 +250,12 @@ protected void createDatabaseImpl(String name, Map properties) { : new Path(database.getLocationUri()); locationHelper.createPathIfRequired(databasePath, fileIO); locationHelper.specifyDatabaseLocation(databasePath, database); - client.createDatabase(database); + clients.execute(client -> client.createDatabase(database)); } catch (TException | IOException e) { throw new RuntimeException("Failed to create database " + name, e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in call to createDatabase " + name, e); } } @@ -253,10 +280,13 @@ private Database convertToHiveDatabase(String name, Map properti @Override public Map loadDatabasePropertiesImpl(String name) { try { - return convertToProperties(client.getDatabase(name)); + return convertToProperties(clients.run(client -> client.getDatabase(name))); } catch (TException e) { throw new RuntimeException( String.format("Failed to get database %s properties", name), e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in call to loadDatabaseProperties " + name, e); } } @@ -274,27 +304,36 @@ private Map convertToProperties(Database database) { @Override protected void dropDatabaseImpl(String name) { try { - Database database = client.getDatabase(name); + Database database = clients.run(client -> client.getDatabase(name)); String location = locationHelper.getDatabaseLocation(database); locationHelper.dropPathIfRequired(new Path(location), fileIO); - client.dropDatabase(name, true, false, true); + clients.execute(client -> client.dropDatabase(name, true, false, true)); } catch (TException | IOException e) { throw new RuntimeException("Failed to drop database " + name, e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in call to dropDatabase " + name, e); } } @Override protected List listTablesImpl(String databaseName) { try { - return client.getAllTables(databaseName).stream() - .filter( - tableName -> { - Identifier identifier = new Identifier(databaseName, tableName); - return tableExists(identifier); - }) - .collect(Collectors.toList()); + return clients.run( + client -> + client.getAllTables(databaseName).stream() + .filter( + tableName -> { + Identifier identifier = + new Identifier(databaseName, tableName); + return tableExists(identifier); + }) + .collect(Collectors.toList())); } catch (TException e) { throw new RuntimeException("Failed to list all tables in database " + databaseName, e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in call to listTables " + databaseName, e); } } @@ -306,13 +345,22 @@ public boolean tableExists(Identifier identifier) { Table table; try { - table = client.getTable(identifier.getDatabaseName(), identifier.getObjectName()); + table = + clients.run( + client -> + client.getTable( + identifier.getDatabaseName(), + identifier.getObjectName())); } catch (NoSuchObjectException e) { return false; } catch (TException e) { throw new RuntimeException( "Cannot determine if table " + identifier.getFullName() + " is a paimon table.", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException( + "Interrupted in call to tableExists " + identifier.getFullName(), e); } return isPaimonTable(table) || LegacyHiveClasses.isPaimonTable(table); @@ -338,8 +386,14 @@ public TableSchema getDataTableSchema(Identifier identifier) throws TableNotExis @Override protected void dropTableImpl(Identifier identifier) { try { - client.dropTable( - identifier.getDatabaseName(), identifier.getObjectName(), true, false, true); + clients.execute( + client -> + client.dropTable( + identifier.getDatabaseName(), + identifier.getObjectName(), + true, + false, + true)); // When drop a Hive external table, only the hive metadata is deleted and the data files // are not deleted. @@ -364,6 +418,10 @@ protected void dropTableImpl(Identifier identifier) { } } catch (TException e) { throw new RuntimeException("Failed to drop table " + identifier.getFullName(), e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException( + "Interrupted in call to dropTable " + identifier.getFullName(), e); } } @@ -388,7 +446,7 @@ protected void createTableImpl(Identifier identifier, Schema schema) { convertToPropertiesPrefixKey(tableSchema.options(), HIVE_PREFIX)); try { updateHmsTable(table, identifier, tableSchema); - client.createTable(table); + clients.execute(client -> client.createTable(table)); } catch (Exception e) { Path path = getDataTableLocation(identifier); try { @@ -405,13 +463,13 @@ protected void renameTableImpl(Identifier fromTable, Identifier toTable) { try { String fromDB = fromTable.getDatabaseName(); String fromTableName = fromTable.getObjectName(); - Table table = client.getTable(fromDB, fromTableName); + Table table = clients.run(client -> client.getTable(fromDB, fromTableName)); table.setDbName(toTable.getDatabaseName()); table.setTableName(toTable.getObjectName()); - client.alter_table(fromDB, fromTableName, table); + clients.execute(client -> client.alter_table(fromDB, fromTableName, table)); Path fromPath = getDataTableLocation(fromTable); - if (new SchemaManager(fileIO, fromPath).listAllIds().size() > 0) { + if (!new SchemaManager(fileIO, fromPath).listAllIds().isEmpty()) { // Rename the file system's table directory. Maintain consistency between tables in // the file system and tables in the Hive Metastore. Path toPath = getDataTableLocation(toTable); @@ -427,10 +485,16 @@ protected void renameTableImpl(Identifier fromTable, Identifier toTable) { // update location locationHelper.specifyTableLocation(table, toPath.toString()); - client.alter_table(toTable.getDatabaseName(), toTable.getObjectName(), table); + clients.execute( + client -> + client.alter_table( + toTable.getDatabaseName(), toTable.getObjectName(), table)); } } catch (TException e) { throw new RuntimeException("Failed to rename table " + fromTable.getFullName(), e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in call to renameTable", e); } } @@ -444,11 +508,21 @@ protected void alterTableImpl(Identifier identifier, List changes) try { // sync to hive hms - Table table = client.getTable(identifier.getDatabaseName(), identifier.getObjectName()); + Table table = + clients.run( + client -> + client.getTable( + identifier.getDatabaseName(), + identifier.getObjectName())); updateHmsTablePars(table, schema); updateHmsTable(table, identifier, schema); - client.alter_table( - identifier.getDatabaseName(), identifier.getObjectName(), table, true); + clients.execute( + client -> + client.alter_table( + identifier.getDatabaseName(), + identifier.getObjectName(), + table, + true)); } catch (Exception te) { schemaManager.deleteSchema(schema.id()); throw new RuntimeException(te); @@ -462,7 +536,7 @@ public boolean caseSensitive() { @Override public void close() throws Exception { - client.close(); + clients.execute(IMetaStoreClient::close); } @Override @@ -566,7 +640,11 @@ private void updateHmsTablePars(Table table, TableSchema schema) { @VisibleForTesting public IMetaStoreClient getHmsClient() { - return client; + try { + return clients.run(client -> client); + } catch (Exception e) { + throw new RuntimeException("Failed to close hms client:", e); + } } private FieldSchema convertToFieldSchema(DataField dataField) { @@ -587,14 +665,10 @@ private Lock lock(Identifier identifier) { } HiveCatalogLock lock = - new HiveCatalogLock(client, checkMaxSleep(hiveConf), acquireTimeout(hiveConf)); + new HiveCatalogLock(clients, checkMaxSleep(hiveConf), acquireTimeout(hiveConf)); return Lock.fromCatalog(lock, identifier); } - static IMetaStoreClient createClient(HiveConf hiveConf, String clientClassName) { - return new RetryingMetaStoreClientFactory().createClient(hiveConf, clientClassName); - } - public static HiveConf createHiveConf( @Nullable String hiveConfDir, @Nullable String hadoopConfDir, diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogFactory.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogFactory.java index cc51df265095..95da0037168c 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogFactory.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogFactory.java @@ -24,16 +24,11 @@ import org.apache.paimon.options.ConfigOption; import org.apache.paimon.options.ConfigOptions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import static org.apache.paimon.hive.HiveCatalogOptions.IDENTIFIER; /** Factory to create {@link HiveCatalog}. */ public class HiveCatalogFactory implements CatalogFactory { - private static final Logger LOG = LoggerFactory.getLogger(HiveCatalogFactory.class); - public static final ConfigOption METASTORE_CLIENT_CLASS = ConfigOptions.key("metastore.client.class") .stringType() diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java index c49cd020c654..44d6b8c988f5 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java @@ -19,6 +19,9 @@ package org.apache.paimon.hive; import org.apache.paimon.catalog.CatalogLock; +import org.apache.paimon.client.ClientPool; +import org.apache.paimon.hive.pool.CachedClientPool; +import org.apache.paimon.options.Options; import org.apache.paimon.utils.TimeUtils; import org.apache.hadoop.hive.conf.HiveConf; @@ -46,12 +49,15 @@ public class HiveCatalogLock implements CatalogLock { static final String LOCK_IDENTIFIER = "hive"; - private final IMetaStoreClient client; + private final ClientPool clients; private final long checkMaxSleep; private final long acquireTimeout; - public HiveCatalogLock(IMetaStoreClient client, long checkMaxSleep, long acquireTimeout) { - this.client = client; + public HiveCatalogLock( + ClientPool clients, + long checkMaxSleep, + long acquireTimeout) { + this.clients = clients; this.checkMaxSleep = checkMaxSleep; this.acquireTimeout = acquireTimeout; } @@ -77,7 +83,7 @@ private long lock(String database, String table) Collections.singletonList(lockComponent), System.getProperty("user.name"), InetAddress.getLocalHost().getHostName()); - LockResponse lockResponse = this.client.lock(lockRequest); + LockResponse lockResponse = clients.run(client -> client.lock(lockRequest)); long nextSleep = 50; long startRetry = System.currentTimeMillis(); @@ -88,7 +94,8 @@ private long lock(String database, String table) } Thread.sleep(nextSleep); - lockResponse = client.checkLock(lockResponse.getLockid()); + final LockResponse tempLockResponse = lockResponse; + lockResponse = clients.run(client -> client.checkLock(tempLockResponse.getLockid())); if (System.currentTimeMillis() - startRetry > acquireTimeout) { break; } @@ -97,7 +104,8 @@ private long lock(String database, String table) if (lockResponse.getState() != LockState.ACQUIRED) { if (lockResponse.getState() == LockState.WAITING) { - client.unlock(lockResponse.getLockid()); + final LockResponse tempLockResponse = lockResponse; + clients.execute(client -> client.unlock(tempLockResponse.getLockid())); } throw new RuntimeException( "Acquire lock failed with time: " + Duration.ofMillis(retryDuration)); @@ -105,13 +113,13 @@ private long lock(String database, String table) return lockResponse.getLockid(); } - private void unlock(long lockId) throws TException { - client.unlock(lockId); + private void unlock(long lockId) throws TException, InterruptedException { + clients.execute(client -> client.unlock(lockId)); } @Override public void close() { - this.client.close(); + // do nothing } /** Catalog lock factory for hive. */ @@ -125,7 +133,8 @@ public CatalogLock create(LockContext context) { HiveLockContext hiveLockContext = (HiveLockContext) context; HiveConf conf = hiveLockContext.hiveConf.conf(); return new HiveCatalogLock( - HiveCatalog.createClient(conf, hiveLockContext.clientClassName), + new CachedClientPool( + conf, hiveLockContext.options, hiveLockContext.clientClassName), checkMaxSleep(conf), acquireTimeout(conf)); } @@ -155,10 +164,13 @@ public static long acquireTimeout(HiveConf conf) { static class HiveLockContext implements LockContext { private final SerializableHiveConf hiveConf; private final String clientClassName; + private final Options options; - public HiveLockContext(SerializableHiveConf hiveConf, String clientClassName) { + public HiveLockContext( + SerializableHiveConf hiveConf, String clientClassName, Options options) { this.hiveConf = hiveConf; this.clientClassName = clientClassName; + this.options = options; } } } diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogOptions.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogOptions.java index 0f1b4d454e25..38f73bc6bd65 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogOptions.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogOptions.java @@ -20,6 +20,11 @@ import org.apache.paimon.options.ConfigOption; import org.apache.paimon.options.ConfigOptions; +import org.apache.paimon.options.description.Description; + +import java.util.concurrent.TimeUnit; + +import static org.apache.paimon.options.description.TextElement.text; /** Options for hive catalog. */ public final class HiveCatalogOptions { @@ -52,5 +57,33 @@ public final class HiveCatalogOptions { + "If you don't want to access the location by the filesystem of hive when using a object storage such as s3,oss\n" + "you can set this option to true.\n"); + public static final ConfigOption CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS = + ConfigOptions.key("client-pool-cache.eviction-interval-ms") + .longType() + .defaultValue(TimeUnit.MINUTES.toMillis(5)) + .withDescription("Setting the client's pool cache eviction interval(ms).\n"); + + public static final ConfigOption CLIENT_POOL_CACHE_KEYS = + ConfigOptions.key("client-pool-cache.keys") + .stringType() + .noDefaultValue() + .withDescription( + Description.builder() + .text( + "Specify client cache key, multiple elements separated by commas.") + .linebreak() + .list( + text( + "\"ugi\": the Hadoop UserGroupInformation instance that represents the current user using the cache.")) + .list( + text( + "\"user_name\" similar to UGI but only includes the user's name determined by UserGroupInformation#getUserName.")) + .list( + text( + "\"conf\": name of an arbitrary configuration. " + + "The value of the configuration will be extracted from catalog properties and added to the cache key. A conf element should start with a \"conf:\" prefix which is followed by the configuration name. " + + "E.g. specifying \"conf:a.b.c\" will add \"a.b.c\" to the key, and so that configurations with different default catalog wouldn't share the same client pool. Multiple conf elements can be specified.")) + .build()); + private HiveCatalogOptions() {} } diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java index 031b1848a01e..e96cc66bc93f 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java @@ -20,8 +20,11 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.client.ClientPool; import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.hive.pool.CachedClientPool; import org.apache.paimon.metastore.MetastoreClient; +import org.apache.paimon.options.Options; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.utils.PartitionPathUtils; import org.apache.paimon.utils.RowDataPartitionComputer; @@ -31,6 +34,7 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.thrift.TException; import java.util.ArrayList; import java.util.LinkedHashMap; @@ -42,11 +46,14 @@ public class HiveMetastoreClient implements MetastoreClient { private final Identifier identifier; private final RowDataPartitionComputer partitionComputer; - private final IMetaStoreClient client; + private final ClientPool clients; private final StorageDescriptor sd; - private HiveMetastoreClient(Identifier identifier, TableSchema schema, IMetaStoreClient client) - throws Exception { + private HiveMetastoreClient( + Identifier identifier, + TableSchema schema, + ClientPool clients) + throws TException, InterruptedException { this.identifier = identifier; this.partitionComputer = new RowDataPartitionComputer( @@ -54,8 +61,15 @@ private HiveMetastoreClient(Identifier identifier, TableSchema schema, IMetaStor schema.logicalPartitionType(), schema.partitionKeys().toArray(new String[0])); - this.client = client; - this.sd = client.getTable(identifier.getDatabaseName(), identifier.getObjectName()).getSd(); + this.clients = clients; + this.sd = + this.clients + .run( + client -> + client.getTable( + identifier.getDatabaseName(), + identifier.getObjectName())) + .getSd(); } @Override @@ -67,8 +81,12 @@ public void addPartition(BinaryRow partition) throws Exception { public void addPartition(LinkedHashMap partitionSpec) throws Exception { List partitionValues = new ArrayList<>(partitionSpec.values()); try { - client.getPartition( - identifier.getDatabaseName(), identifier.getObjectName(), partitionValues); + clients.execute( + client -> + client.getPartition( + identifier.getDatabaseName(), + identifier.getObjectName(), + partitionValues)); // do nothing if the partition already exists } catch (NoSuchObjectException e) { // partition not found, create new partition @@ -87,7 +105,7 @@ public void addPartition(LinkedHashMap partitionSpec) throws Exc hivePartition.setCreateTime(currentTime); hivePartition.setLastAccessTime(currentTime); - client.add_partition(hivePartition); + clients.execute(client -> client.add_partition(hivePartition)); } } @@ -95,11 +113,13 @@ public void addPartition(LinkedHashMap partitionSpec) throws Exc public void deletePartition(LinkedHashMap partitionSpec) throws Exception { List partitionValues = new ArrayList<>(partitionSpec.values()); try { - client.dropPartition( - identifier.getDatabaseName(), - identifier.getObjectName(), - partitionValues, - false); + clients.execute( + client -> + client.dropPartition( + identifier.getDatabaseName(), + identifier.getObjectName(), + partitionValues, + false)); } catch (NoSuchObjectException e) { // do nothing if the partition not exists } @@ -107,7 +127,7 @@ public void deletePartition(LinkedHashMap partitionSpec) throws @Override public void close() throws Exception { - client.close(); + // do nothing } /** Factory to create {@link HiveMetastoreClient}. */ @@ -119,16 +139,19 @@ public static class Factory implements MetastoreClient.Factory { private final TableSchema schema; private final SerializableHiveConf hiveConf; private final String clientClassName; + private final Options options; public Factory( Identifier identifier, TableSchema schema, HiveConf hiveConf, - String clientClassName) { + String clientClassName, + Options options) { this.identifier = identifier; this.schema = schema; this.hiveConf = new SerializableHiveConf(hiveConf); this.clientClassName = clientClassName; + this.options = options; } @Override @@ -136,9 +159,15 @@ public MetastoreClient create() { HiveConf conf = hiveConf.conf(); try { return new HiveMetastoreClient( - identifier, schema, HiveCatalog.createClient(conf, clientClassName)); - } catch (Exception e) { - throw new RuntimeException(e); + identifier, schema, new CachedClientPool(conf, options, clientClassName)); + } catch (TException e) { + throw new RuntimeException( + "Can not get table " + identifier + " info from metastore.", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException( + "Interrupted in call to new HiveMetastoreClient for table " + identifier, + e); } } } diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/pool/CachedClientPool.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/pool/CachedClientPool.java new file mode 100644 index 000000000000..e79ed623bcaa --- /dev/null +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/pool/CachedClientPool.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.hive.pool; + +import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.client.ClientPool; +import org.apache.paimon.hive.HiveCatalogOptions; +import org.apache.paimon.options.Options; +import org.apache.paimon.utils.Preconditions; + +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Scheduler; +import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; +import org.apache.paimon.shade.guava30.com.google.common.collect.Maps; +import org.apache.paimon.shade.guava30.com.google.common.collect.Sets; +import org.apache.paimon.shade.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.thrift.TException; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.apache.paimon.hive.HiveCatalogOptions.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS; +import static org.apache.paimon.hive.HiveCatalogOptions.CLIENT_POOL_CACHE_KEYS; +import static org.apache.paimon.options.CatalogOptions.CLIENT_POOL_SIZE; + +/** + * A ClientPool that caches the underlying HiveClientPool instances. + * + *

Mostly copied from iceberg. + */ +public class CachedClientPool implements ClientPool { + + private static final String CONF_ELEMENT_PREFIX = "conf:"; + + private static Cache clientPoolCache; + + private final Configuration conf; + private final int clientPoolSize; + private final long evictionInterval; + private final Key key; + private final String clientClassName; + + public CachedClientPool(Configuration conf, Options options, String clientClassName) { + this.conf = conf; + this.clientPoolSize = options.get(CLIENT_POOL_SIZE); + this.evictionInterval = options.get(CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS); + this.key = extractKey(options.get(CLIENT_POOL_CACHE_KEYS), conf); + this.clientClassName = clientClassName; + init(); + } + + @VisibleForTesting + HiveClientPool clientPool() { + return clientPoolCache.get( + key, k -> new HiveClientPool(clientPoolSize, conf, clientClassName)); + } + + private synchronized void init() { + if (clientPoolCache == null) { + // Since Caffeine does not ensure that removalListener will be involved after expiration + // We use a scheduler with one thread to clean up expired clients. + clientPoolCache = + Caffeine.newBuilder() + .expireAfterAccess(evictionInterval, TimeUnit.MILLISECONDS) + .removalListener( + (ignored, value, cause) -> ((HiveClientPool) value).close()) + .scheduler( + Scheduler.forScheduledExecutorService( + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("hive-metastore-cleaner") + .build()))) + .build(); + } + } + + @VisibleForTesting + static Cache clientPoolCache() { + return clientPoolCache; + } + + @Override + public R run(Action action) + throws TException, InterruptedException { + return clientPool().run(action); + } + + @Override + public R run(Action action, boolean retry) + throws TException, InterruptedException { + return clientPool().run(action, retry); + } + + @Override + public void execute(ExecuteAction action) + throws TException, InterruptedException { + clientPool().execute(action); + } + + @Override + public void execute(ExecuteAction action, boolean retry) + throws TException, InterruptedException { + clientPool().execute(action, retry); + } + + @VisibleForTesting + static Key extractKey(String cacheKeys, Configuration conf) { + // generate key elements in a certain order, so that the Key instances are comparable + List elements = Lists.newArrayList(); + elements.add(conf.get(HiveConf.ConfVars.METASTOREURIS.varname, "")); + elements.add(HiveCatalogOptions.IDENTIFIER); + if (cacheKeys == null || cacheKeys.isEmpty()) { + return Key.of(elements); + } + + Set types = Sets.newTreeSet(Comparator.comparingInt(Enum::ordinal)); + Map confElements = Maps.newTreeMap(); + for (String element : cacheKeys.split(",", -1)) { + String trimmed = element.trim(); + if (trimmed.toLowerCase(Locale.ROOT).startsWith(CONF_ELEMENT_PREFIX)) { + String key = trimmed.substring(CONF_ELEMENT_PREFIX.length()); + + Preconditions.checkArgument( + !confElements.containsKey(key), + "Conf key element %s already specified", + key); + confElements.put(key, conf.get(key)); + } else { + KeyElementType type = KeyElementType.valueOf(trimmed.toUpperCase()); + switch (type) { + case UGI: + case USER_NAME: + Preconditions.checkArgument( + !types.contains(type), + "%s key element already specified", + type.name()); + types.add(type); + break; + default: + throw new RuntimeException("Unknown key element %s" + trimmed); + } + } + } + for (KeyElementType type : types) { + switch (type) { + case UGI: + try { + elements.add(UserGroupInformation.getCurrentUser()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + break; + case USER_NAME: + try { + elements.add(UserGroupInformation.getCurrentUser().getUserName()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + break; + default: + throw new RuntimeException("Unexpected key element " + type.name()); + } + } + for (String key : confElements.keySet()) { + elements.add(ConfElement.of(key, confElements.get(key))); + } + return Key.of(elements); + } + + static class Key { + private final List elements; + + private Key(List elements) { + this.elements = Collections.unmodifiableList(new ArrayList<>(elements)); + } + + public List elements() { + return elements; + } + + public static Key of(List elements) { + return new Key(elements); + } + } + + static class ConfElement { + private final String key; + private final String value; + + private ConfElement(String key, String value) { + this.key = key; + this.value = value; + } + + public String key() { + return key; + } + + @Nullable + public String value() { + return value; + } + + public static ConfElement of(String key, String value) { + return new ConfElement(key, value); + } + } + + private enum KeyElementType { + UGI, + USER_NAME, + CONF + } +} diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/pool/HiveClientPool.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/pool/HiveClientPool.java new file mode 100644 index 000000000000..6dd086950771 --- /dev/null +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/pool/HiveClientPool.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.hive.pool; + +import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.client.ClientPool; +import org.apache.paimon.hive.RetryingMetaStoreClientFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.thrift.TException; +import org.apache.thrift.transport.TTransportException; + +/** + * Pool of Hive Metastore clients. + * + *

Mostly copied from iceberg. + */ +public class HiveClientPool extends ClientPool.ClientPoolImpl { + + private final HiveConf hiveConf; + private final String clientClassName; + + public HiveClientPool(int poolSize, Configuration conf, String clientClassName) { + // Do not allow retry by default as we rely on RetryingHiveClient + super(poolSize, TTransportException.class, false); + this.hiveConf = new HiveConf(conf, HiveClientPool.class); + this.hiveConf.addResource(conf); + this.clientClassName = clientClassName; + } + + @Override + protected IMetaStoreClient newClient() { + return new RetryingMetaStoreClientFactory().createClient(hiveConf, clientClassName); + } + + @Override + protected IMetaStoreClient reconnect(IMetaStoreClient client) { + try { + client.close(); + client.reconnect(); + } catch (MetaException e) { + throw new RuntimeException("Failed to reconnect to Hive Metastore", e); + } + return client; + } + + @Override + protected boolean isConnectionException(Exception e) { + return super.isConnectionException(e) + || (e instanceof MetaException + && e.getMessage() + .contains( + "Got exception: org.apache.thrift.transport.TTransportException")); + } + + @Override + protected void close(IMetaStoreClient client) { + client.close(); + } + + @VisibleForTesting + HiveConf hiveConf() { + return hiveConf; + } +} diff --git a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java index f88969de7425..6b13a80e801a 100644 --- a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java +++ b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java @@ -20,6 +20,7 @@ import org.apache.paimon.catalog.CatalogTestBase; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.client.ClientPool; import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; @@ -31,13 +32,11 @@ import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.thrift.TException; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.mockito.MockedStatic; -import org.mockito.Mockito; import java.lang.reflect.Field; import java.util.Arrays; @@ -64,12 +63,7 @@ public void setUp() throws Exception { HiveConf hiveConf = new HiveConf(); String jdoConnectionURL = "jdbc:derby:memory:" + UUID.randomUUID(); hiveConf.setVar(METASTORECONNECTURLKEY, jdoConnectionURL + ";create=true"); - HiveMetaStoreClient metaStoreClient = new HiveMetaStoreClient(hiveConf); String metastoreClientClass = "org.apache.hadoop.hive.metastore.HiveMetaStoreClient"; - try (MockedStatic mocked = Mockito.mockStatic(HiveCatalog.class)) { - mocked.when(() -> HiveCatalog.createClient(hiveConf, metastoreClientClass)) - .thenReturn(metaStoreClient); - } catalog = new HiveCatalog(fileIO, hiveConf, metastoreClientClass, warehouse); } @@ -203,10 +197,12 @@ public void testAddHiveTableParameters() { addHiveTableParametersSchema, false); - Field clientField = HiveCatalog.class.getDeclaredField("client"); + Field clientField = HiveCatalog.class.getDeclaredField("clients"); clientField.setAccessible(true); - IMetaStoreClient client = (IMetaStoreClient) clientField.get(catalog); - Table table = client.getTable(databaseName, tableName); + @SuppressWarnings("unchecked") + ClientPool clients = + (ClientPool) clientField.get(catalog); + Table table = clients.run(client -> client.getTable(databaseName, tableName)); Map tableProperties = table.getParameters(); // Verify the transformed parameters @@ -258,10 +254,12 @@ public void testAlterHiveTableParameters() { Arrays.asList(schemaChange1, schemaChange2), false); - Field clientField = HiveCatalog.class.getDeclaredField("client"); + Field clientField = HiveCatalog.class.getDeclaredField("clients"); clientField.setAccessible(true); - IMetaStoreClient client = (IMetaStoreClient) clientField.get(catalog); - Table table = client.getTable(databaseName, tableName); + @SuppressWarnings("unchecked") + ClientPool clients = + (ClientPool) clientField.get(catalog); + Table table = clients.run(client -> client.getTable(databaseName, tableName)); Map tableProperties = table.getParameters(); assertThat(tableProperties).containsEntry("table.owner", "Hms"); diff --git a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/pool/TestHiveClientPool.java b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/pool/TestHiveClientPool.java new file mode 100644 index 000000000000..a5daefae7940 --- /dev/null +++ b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/pool/TestHiveClientPool.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.hive.pool; + +import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.Function; +import org.apache.hadoop.hive.metastore.api.FunctionType; +import org.apache.hadoop.hive.metastore.api.GetAllFunctionsResponse; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.PrincipalType; +import org.apache.thrift.transport.TTransportException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * Test Hive Client Pool. + * + *

Mostly copied from iceberg. + */ +public class TestHiveClientPool { + + private static final String HIVE_SITE_CONTENT = + "\n" + + "\n" + + "\n" + + " \n" + + " hive.metastore.sasl.enabled\n" + + " true\n" + + " \n" + + "\n"; + + HiveClientPool clients; + + @BeforeEach + public void before() { + String metastoreClientClass = "org.apache.hadoop.hive.metastore.HiveMetaStoreClient"; + HiveClientPool clientPool = + new HiveClientPool(2, new Configuration(), metastoreClientClass); + clients = Mockito.spy(clientPool); + } + + @AfterEach + public void after() { + clients.close(); + clients = null; + } + + @Test + public void testConf() { + HiveConf conf = createHiveConf(); + conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, "warehouse"); + + String metastoreClientClass = "org.apache.hadoop.hive.metastore.HiveMetaStoreClient"; + HiveClientPool clientPool = new HiveClientPool(10, conf, metastoreClientClass); + HiveConf clientConf = clientPool.hiveConf(); + + assertThat(clientConf.get(HiveConf.ConfVars.METASTOREWAREHOUSE.varname)) + .isEqualTo(conf.get(HiveConf.ConfVars.METASTOREWAREHOUSE.varname)); + assertThat(clientPool.poolSize()).isEqualTo(10); + + // 'hive.metastore.sasl.enabled' should be 'true' as defined in xml + assertThat(clientConf.get(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname)) + .isEqualTo(conf.get(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname)); + assertThat(clientConf.getBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL)).isTrue(); + } + + private HiveConf createHiveConf() { + HiveConf hiveConf = new HiveConf(); + try (InputStream inputStream = + new ByteArrayInputStream(HIVE_SITE_CONTENT.getBytes(StandardCharsets.UTF_8))) { + hiveConf.addResource(inputStream, "for_test"); + } catch (IOException e) { + throw new RuntimeException(e); + } + return hiveConf; + } + + @Test + public void testNewClientFailure() { + Mockito.doThrow(new RuntimeException("Connection exception")).when(clients).newClient(); + assertThatThrownBy(() -> clients.run(Object::toString)) + .isInstanceOf(RuntimeException.class) + .hasMessage("Connection exception"); + } + + @Test + public void testGetTablesFailsForNonReconnectableException() throws Exception { + HiveMetaStoreClient hmsClient = Mockito.mock(HiveMetaStoreClient.class); + Mockito.doReturn(hmsClient).when(clients).newClient(); + Mockito.doThrow(new MetaException("Another meta exception")) + .when(hmsClient) + .getTables(Mockito.anyString(), Mockito.anyString()); + assertThatThrownBy(() -> clients.run(client -> client.getTables("default", "t"))) + .isInstanceOf(MetaException.class) + .hasMessage("Another meta exception"); + } + + @Test + public void testConnectionFailureRestoreForMetaException() throws Exception { + HiveMetaStoreClient hmsClient = newClient(); + + // Throwing an exception may trigger the client to reconnect. + String metaMessage = "Got exception: org.apache.thrift.transport.TTransportException"; + Mockito.doThrow(new MetaException(metaMessage)).when(hmsClient).getAllDatabases(); + + // Create a new client when the reconnect method is called. + HiveMetaStoreClient newClient = reconnect(hmsClient); + + List databases = Lists.newArrayList("db1", "db2"); + + Mockito.doReturn(databases).when(newClient).getAllDatabases(); + // The return is OK when the reconnect method is called. + assertThat((List) clients.run(client -> client.getAllDatabases(), true)) + .isEqualTo(databases); + + // Verify that the method is called. + Mockito.verify(clients).reconnect(hmsClient); + Mockito.verify(clients, Mockito.never()).reconnect(newClient); + } + + @Test + public void testConnectionFailureRestoreForTTransportException() throws Exception { + HiveMetaStoreClient hmsClient = newClient(); + Mockito.doThrow(new TTransportException()).when(hmsClient).getAllFunctions(); + + // Create a new client when getAllFunctions() failed. + HiveMetaStoreClient newClient = reconnect(hmsClient); + + GetAllFunctionsResponse response = new GetAllFunctionsResponse(); + response.addToFunctions( + new Function( + "concat", + "db1", + "classname", + "root", + PrincipalType.USER, + 100, + FunctionType.JAVA, + null)); + Mockito.doReturn(response).when(newClient).getAllFunctions(); + assertThat((GetAllFunctionsResponse) clients.run(client -> client.getAllFunctions(), true)) + .isEqualTo(response); + + Mockito.verify(clients).reconnect(hmsClient); + Mockito.verify(clients, Mockito.never()).reconnect(newClient); + } + + private HiveMetaStoreClient newClient() { + HiveMetaStoreClient hmsClient = Mockito.mock(HiveMetaStoreClient.class); + Mockito.doReturn(hmsClient).when(clients).newClient(); + return hmsClient; + } + + private HiveMetaStoreClient reconnect(HiveMetaStoreClient obsoleteClient) { + HiveMetaStoreClient newClient = Mockito.mock(HiveMetaStoreClient.class); + Mockito.doReturn(newClient).when(clients).reconnect(obsoleteClient); + return newClient; + } +} diff --git a/paimon-hive/paimon-hive-common/src/test/java/org/apache/paimon/hive/TestHiveMetastore.java b/paimon-hive/paimon-hive-common/src/test/java/org/apache/paimon/hive/TestHiveMetastore.java index 7364f3598716..0cdbb3197b01 100644 --- a/paimon-hive/paimon-hive-common/src/test/java/org/apache/paimon/hive/TestHiveMetastore.java +++ b/paimon-hive/paimon-hive-common/src/test/java/org/apache/paimon/hive/TestHiveMetastore.java @@ -217,7 +217,6 @@ private void initConf(HiveConf conf, int port) { conf.set( HiveConf.ConfVars.METASTORE_DISALLOW_INCOMPATIBLE_COL_TYPE_CHANGES.varname, "false"); - conf.set("iceberg.hive.client-pool-size", "2"); conf.set( HiveConf.ConfVars.HIVE_IN_TEST.varname, HiveConf.ConfVars.HIVE_IN_TEST.getDefaultValue());