From 1a527f31bd1aa6f6a423d7d892fc1780b33c3594 Mon Sep 17 00:00:00 2001 From: sunxiaojian Date: Mon, 25 Mar 2024 18:34:16 +0800 Subject: [PATCH] HiveCatalog supports client pool --- .../generated/catalog_configuration.html | 12 ++ .../generated/jdbc_catalog_configuration.html | 6 - .../org/apache/paimon/client/ClientPool.java | 68 +++++++- .../apache/paimon/options/CatalogOptions.java | 12 ++ .../org/apache/paimon/jdbc/JdbcCatalog.java | 3 +- .../paimon/jdbc/JdbcCatalogFactory.java | 3 +- .../paimon/jdbc/JdbcCatalogOptions.java | 6 - .../paimon/hive/HiveCachedClientPool.java | 99 +++++++++++ .../org/apache/paimon/hive/HiveCatalog.java | 155 +++++++++++++----- .../apache/paimon/hive/HiveCatalogLock.java | 31 +++- .../paimon/hive/HiveCatalogLockContext.java | 16 +- .../paimon/hive/HiveCatalogLockFactory.java | 4 +- .../apache/paimon/hive/HiveClientPool.java | 72 ++++++++ .../paimon/hive/HiveMetastoreClient.java | 66 +++++--- .../paimon/hive/migrate/HiveMigrator.java | 67 +++++--- .../apache/paimon/hive/HiveCatalogTest.java | 27 +-- .../apache/paimon/hive/HiveLocationTest.java | 15 +- 17 files changed, 532 insertions(+), 130 deletions(-) create mode 100644 paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCachedClientPool.java create mode 100644 paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveClientPool.java diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html b/docs/layouts/shortcodes/generated/catalog_configuration.html index cab6e731e851..94f456843fce 100644 --- a/docs/layouts/shortcodes/generated/catalog_configuration.html +++ b/docs/layouts/shortcodes/generated/catalog_configuration.html @@ -26,6 +26,18 @@ + +
catalog-key
+ (none) + String + Custom catalog key. + + +
client-pool-cache-eviction-interval-ms
+ 300000 + Long + Client pool cache eviction interval ms. +
client-pool-size
2 diff --git a/docs/layouts/shortcodes/generated/jdbc_catalog_configuration.html b/docs/layouts/shortcodes/generated/jdbc_catalog_configuration.html index c2d6e287f31e..14637897d946 100644 --- a/docs/layouts/shortcodes/generated/jdbc_catalog_configuration.html +++ b/docs/layouts/shortcodes/generated/jdbc_catalog_configuration.html @@ -26,12 +26,6 @@ - -
catalog-key
- "jdbc" - String - Custom jdbc catalog store key. -
lock-key-max-length
255 diff --git a/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java b/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java index deddf75a2452..34c59494653b 100644 --- a/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java +++ b/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java @@ -18,12 +18,21 @@ package org.apache.paimon.client; +import org.apache.paimon.options.CatalogOptions; +import org.apache.paimon.options.Options; +import org.apache.paimon.utils.StringUtils; + +import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Closeable; +import java.io.IOException; +import java.io.Serializable; import java.util.ArrayDeque; import java.util.Deque; +import java.util.List; import static org.apache.paimon.utils.Preconditions.checkState; @@ -43,7 +52,8 @@ interface Action { R run(Action action, boolean retry) throws E, InterruptedException; /** Default implementation for {@link ClientPool}. */ - abstract class ClientPoolImpl implements Closeable, ClientPool { + abstract class ClientPoolImpl + implements Closeable, Serializable, ClientPool { private static final Logger LOG = LoggerFactory.getLogger(ClientPoolImpl.class); private final int poolSize; @@ -169,4 +179,60 @@ public boolean isClosed() { return closed; } } + + /** Cached client pool for {@link ClientPool}. */ + abstract class CachedClientPool + implements Closeable, Serializable, ClientPool { + + protected static final String CONF_KEY_PREFIX = "confKey:"; + protected final long evictionInterval; + protected final String key; + protected final String metadata; + private final Options options; + + public CachedClientPool(Options options) { + this.options = options; + this.evictionInterval = + options.get(CatalogOptions.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS); + this.metadata = options.get(CatalogOptions.METASTORE); + this.key = extractKey(options); + init(); + } + + protected Options options() { + return options; + } + + protected abstract void init(); + + protected abstract ClientPool clientPool(); + + @Override + public R run(Action action) throws E, InterruptedException { + return clientPool().run(action); + } + + @Override + public R run(Action action, boolean retry) throws E, InterruptedException { + return clientPool().run(action, retry); + } + + private String extractKey(Options options) { + List elements = Lists.newArrayList(); + elements.add(options.get(CatalogOptions.URI)); + String metastore = options.get(CatalogOptions.METASTORE); + elements.add(metastore); + String catalogKey = options.getOptional(CatalogOptions.CATALOG_KEY).orElse(metastore); + elements.add(catalogKey); + elements.addAll(extractKeyElement()); + return CONF_KEY_PREFIX.concat(StringUtils.join(elements, ".")); + } + + protected abstract List extractKeyElement(); + + @Override + public void close() throws IOException { + // Do nothing, will automatically clean up + } + } } diff --git a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java index f00a35a75094..ab31784602db 100644 --- a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java @@ -110,4 +110,16 @@ public class CatalogOptions { TextElement.text( "\"custom\": You can implement LineageMetaFactory and LineageMeta to store lineage information in customized storage.")) .build()); + + public static final ConfigOption CATALOG_KEY = + ConfigOptions.key("catalog-key") + .stringType() + .noDefaultValue() + .withDescription("Custom catalog key."); + + public static final ConfigOption CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS = + ConfigOptions.key("client-pool-cache-eviction-interval-ms") + .longType() + .defaultValue(5 * 60 * 1000L) + .withDescription("Client pool cache eviction interval ms."); } diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java index 7e7718b5bee9..faa18915a0d4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java @@ -33,6 +33,7 @@ import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.utils.Preconditions; +import org.apache.paimon.utils.StringUtils; import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; @@ -77,7 +78,7 @@ public class JdbcCatalog extends AbstractCatalog { protected JdbcCatalog(FileIO fileIO, String catalogKey, Options options, String warehouse) { super(fileIO, options); - this.catalogKey = catalogKey; + this.catalogKey = StringUtils.isBlank(catalogKey) ? "jdbc" : catalogKey; this.options = options; this.warehouse = warehouse; Preconditions.checkNotNull(options, "Invalid catalog properties: null"); diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java index 6c3c1d0e41cc..26e334ff7d7d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java @@ -23,6 +23,7 @@ import org.apache.paimon.catalog.CatalogFactory; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; /** Factory to create {@link JdbcCatalog}. */ @@ -38,7 +39,7 @@ public String identifier() { @Override public Catalog create(FileIO fileIO, Path warehouse, CatalogContext context) { Options options = context.options(); - String catalogKey = options.get(JdbcCatalogOptions.CATALOG_KEY); + String catalogKey = options.get(CatalogOptions.CATALOG_KEY); return new JdbcCatalog(fileIO, catalogKey, context.options(), warehouse.toString()); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogOptions.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogOptions.java index 407dbbc3bf5b..44ba2d729fea 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogOptions.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogOptions.java @@ -25,12 +25,6 @@ /** Options for jdbc catalog. */ public final class JdbcCatalogOptions { - public static final ConfigOption CATALOG_KEY = - ConfigOptions.key("catalog-key") - .stringType() - .defaultValue("jdbc") - .withDescription("Custom jdbc catalog store key."); - public static final ConfigOption LOCK_KEY_MAX_LENGTH = ConfigOptions.key("lock-key-max-length") .intType() diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCachedClientPool.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCachedClientPool.java new file mode 100644 index 000000000000..0e55ba323ecd --- /dev/null +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCachedClientPool.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.hive; + +import org.apache.paimon.client.ClientPool; +import org.apache.paimon.options.Options; + +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Scheduler; +import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; + +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.thrift.TException; + +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +/** + * Cache HiveClientPool, share connection pool between multiple tasks to prevent excessive + * MetadataClient requests. + */ +public class HiveCachedClientPool + extends ClientPool.CachedClientPool { + + protected static Cache clientPoolCache; + private final SerializableHiveConf hiveConf; + private final String clientClassName; + private final int poolSize; + + public HiveCachedClientPool( + int poolSize, SerializableHiveConf hiveConf, String clientClassName, Options options) { + super(options); + this.poolSize = poolSize; + this.hiveConf = hiveConf; + this.clientClassName = clientClassName; + } + + @Override + protected synchronized void init() { + if (clientPoolCache == null) { + clientPoolCache = + Caffeine.newBuilder() + .expireAfterAccess(evictionInterval, TimeUnit.MILLISECONDS) + .removalListener( + (ignored, value, cause) -> ((ClientPoolImpl) value).close()) + .scheduler( + Scheduler.forScheduledExecutorService( + new ScheduledThreadPoolExecutor( + 1, + new ThreadFactory() { + final ThreadFactory defaultFactory = + Executors.defaultThreadFactory(); + + @Override + public Thread newThread(Runnable r) { + Thread thread = + defaultFactory.newThread(r); + thread.setDaemon(true); + return thread; + } + }))) + .build(); + } + } + + @Override + public HiveClientPool clientPool() { + return (HiveClientPool) + clientPoolCache.get( + key, k -> new HiveClientPool(poolSize, hiveConf, clientClassName)); + } + + @Override + protected List extractKeyElement() { + List elements = Lists.newArrayList(); + elements.add(options().get(HiveCatalogFactory.METASTORE_CLIENT_CLASS)); + return elements; + } +} diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index 372bfedefb88..8e51de72acea 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -26,6 +26,7 @@ import org.apache.paimon.catalog.CatalogLockContext; import org.apache.paimon.catalog.CatalogLockFactory; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.client.ClientPool; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.metastore.MetastoreClient; @@ -112,12 +113,13 @@ public class HiveCatalog extends AbstractCatalog { public static final String HIVE_SITE_FILE = "hive-site.xml"; private final HiveConf hiveConf; - private final String clientClassName; - private final IMetaStoreClient client; + private final ClientPool clients; private final String warehouse; private final LocationHelper locationHelper; + private final String clientClassName; + public HiveCatalog(FileIO fileIO, HiveConf hiveConf, String clientClassName, String warehouse) { this(fileIO, hiveConf, clientClassName, new Options(), warehouse); } @@ -130,9 +132,8 @@ public HiveCatalog( String warehouse) { super(fileIO, options); this.hiveConf = hiveConf; - this.clientClassName = clientClassName; this.warehouse = warehouse; - + this.clientClassName = clientClassName; boolean needLocationInProperties = hiveConf.getBoolean( LOCATION_IN_PROPERTIES.key(), LOCATION_IN_PROPERTIES.defaultValue()); @@ -144,7 +145,14 @@ public HiveCatalog( locationHelper = new StorageLocationHelper(); } - this.client = createClient(hiveConf, clientClassName); + int clientPoolSize = options.get(CatalogOptions.CLIENT_POOL_SIZE); + this.clients = createClients(hiveConf, clientClassName, options, clientPoolSize); + } + + public static HiveCachedClientPool createClients( + HiveConf hiveConf, String clientClassName, Options options, int clientPoolSize) { + return new HiveCachedClientPool( + clientPoolSize, new SerializableHiveConf(hiveConf), clientClassName, options); } @Override @@ -156,7 +164,7 @@ public Optional defaultLockFactory() { public Optional lockContext() { return Optional.of( new HiveCatalogLockContext( - new SerializableHiveConf(hiveConf), clientClassName, catalogOptions)); + clients, new SerializableHiveConf(hiveConf), catalogOptions)); } @Override @@ -164,7 +172,11 @@ public Optional metastoreClientFactory(Identifier ident try { return Optional.of( new HiveMetastoreClient.Factory( - identifier, getDataTableSchema(identifier), hiveConf, clientClassName)); + new SerializableHiveConf(hiveConf), + identifier, + getDataTableSchema(identifier), + clientClassName, + catalogOptions)); } catch (TableNotExistException e) { throw new RuntimeException( "Table " + identifier + " does not exist. This is unexpected.", e); @@ -176,9 +188,10 @@ public Path getDataTableLocation(Identifier identifier) { try { String databaseName = identifier.getDatabaseName(); String tableName = identifier.getObjectName(); - if (client.tableExists(databaseName, tableName)) { + if (clients.run(client -> client.tableExists(databaseName, tableName))) { String location = - locationHelper.getTableLocation(client.getTable(databaseName, tableName)); + locationHelper.getTableLocation( + clients.run(client -> client.getTable(databaseName, tableName))); if (location != null) { return new Path(location); } @@ -186,7 +199,8 @@ public Path getDataTableLocation(Identifier identifier) { // If the table does not exist, // we should use the database path to generate the table path. String dbLocation = - locationHelper.getDatabaseLocation(client.getDatabase(databaseName)); + locationHelper.getDatabaseLocation( + clients.run(client -> client.getDatabase(databaseName))); if (dbLocation != null) { return new Path(dbLocation, tableName); } @@ -195,28 +209,35 @@ public Path getDataTableLocation(Identifier identifier) { return super.getDataTableLocation(identifier); } catch (TException e) { throw new RuntimeException("Can not get table " + identifier + " from metastore.", e); + } catch (InterruptedException e) { + throw convertedInterruptedException( + "Interrupted in call to get data table location", e); } } @Override public List listDatabases() { try { - return client.getAllDatabases(); + return clients.run(client -> client.getAllDatabases()); } catch (TException e) { throw new RuntimeException("Failed to list all databases", e); + } catch (InterruptedException e) { + throw convertedInterruptedException("Interrupted in call to list all database", e); } } @Override protected boolean databaseExistsImpl(String databaseName) { try { - client.getDatabase(databaseName); + clients.run(client -> client.getDatabase(databaseName)); return true; } catch (NoSuchObjectException e) { return false; } catch (TException e) { throw new RuntimeException( "Failed to determine if database " + databaseName + " exists", e); + } catch (InterruptedException e) { + throw convertedInterruptedException("Interrupted in call to get database", e); } } @@ -230,9 +251,15 @@ protected void createDatabaseImpl(String name, Map properties) { : new Path(database.getLocationUri()); locationHelper.createPathIfRequired(databasePath, fileIO); locationHelper.specifyDatabaseLocation(databasePath, database); - client.createDatabase(database); + clients.run( + client -> { + client.createDatabase(database); + return null; + }); } catch (TException | IOException e) { throw new RuntimeException("Failed to create database " + name, e); + } catch (InterruptedException e) { + throw convertedInterruptedException("Interrupted in call to create database", e); } } @@ -257,10 +284,13 @@ private Database convertToHiveDatabase(String name, Map properti @Override public Map loadDatabasePropertiesImpl(String name) { try { - return convertToProperties(client.getDatabase(name)); + return convertToProperties(clients.run(client -> client.getDatabase(name))); } catch (TException e) { throw new RuntimeException( String.format("Failed to get database %s properties", name), e); + } catch (InterruptedException e) { + throw convertedInterruptedException( + "Interrupted in call to load database properties", e); } } @@ -278,19 +308,25 @@ private Map convertToProperties(Database database) { @Override protected void dropDatabaseImpl(String name) { try { - Database database = client.getDatabase(name); + Database database = clients.run(client -> client.getDatabase(name)); String location = locationHelper.getDatabaseLocation(database); locationHelper.dropPathIfRequired(new Path(location), fileIO); - client.dropDatabase(name, true, false, true); + clients.run( + client -> { + client.dropDatabase(name, true, false, true); + return null; + }); } catch (TException | IOException e) { throw new RuntimeException("Failed to drop database " + name, e); + } catch (InterruptedException e) { + throw convertedInterruptedException("Interrupted in call to drop database", e); } } @Override protected List listTablesImpl(String databaseName) { try { - return client.getAllTables(databaseName).stream() + return clients.run(client -> client.getAllTables(databaseName)).stream() .filter( tableName -> { Identifier identifier = new Identifier(databaseName, tableName); @@ -299,6 +335,8 @@ protected List listTablesImpl(String databaseName) { .collect(Collectors.toList()); } catch (TException e) { throw new RuntimeException("Failed to list all tables in database " + databaseName, e); + } catch (InterruptedException e) { + throw convertedInterruptedException("Interrupted in call to list tables", e); } } @@ -310,13 +348,20 @@ public boolean tableExists(Identifier identifier) { Table table; try { - table = client.getTable(identifier.getDatabaseName(), identifier.getObjectName()); + table = + clients.run( + client -> + client.getTable( + identifier.getDatabaseName(), + identifier.getObjectName())); } catch (NoSuchObjectException e) { return false; } catch (TException e) { throw new RuntimeException( "Cannot determine if table " + identifier.getFullName() + " is a paimon table.", e); + } catch (InterruptedException e) { + throw convertedInterruptedException("Interrupted in call to get table", e); } return isPaimonTable(table) || LegacyHiveClasses.isPaimonTable(table); @@ -342,8 +387,16 @@ public TableSchema getDataTableSchema(Identifier identifier) throws TableNotExis @Override protected void dropTableImpl(Identifier identifier) { try { - client.dropTable( - identifier.getDatabaseName(), identifier.getObjectName(), true, false, true); + clients.run( + client -> { + client.dropTable( + identifier.getDatabaseName(), + identifier.getObjectName(), + true, + false, + true); + return null; + }); // When drop a Hive external table, only the hive metadata is deleted and the data files // are not deleted. @@ -368,6 +421,8 @@ protected void dropTableImpl(Identifier identifier) { } } catch (TException e) { throw new RuntimeException("Failed to drop table " + identifier.getFullName(), e); + } catch (InterruptedException e) { + throw convertedInterruptedException("Interrupted in call to drop table", e); } } @@ -392,7 +447,11 @@ protected void createTableImpl(Identifier identifier, Schema schema) { convertToPropertiesPrefixKey(tableSchema.options(), HIVE_PREFIX)); try { updateHmsTable(table, identifier, tableSchema); - client.createTable(table); + clients.run( + client -> { + client.createTable(table); + return null; + }); } catch (Exception e) { Path path = getDataTableLocation(identifier); try { @@ -409,10 +468,14 @@ protected void renameTableImpl(Identifier fromTable, Identifier toTable) { try { String fromDB = fromTable.getDatabaseName(); String fromTableName = fromTable.getObjectName(); - Table table = client.getTable(fromDB, fromTableName); + Table table = clients.run(client -> client.getTable(fromDB, fromTableName)); table.setDbName(toTable.getDatabaseName()); table.setTableName(toTable.getObjectName()); - client.alter_table(fromDB, fromTableName, table); + clients.run( + client -> { + client.alter_table(fromDB, fromTableName, table); + return null; + }); Path fromPath = getDataTableLocation(fromTable); if (new SchemaManager(fileIO, fromPath).listAllIds().size() > 0) { @@ -431,10 +494,17 @@ protected void renameTableImpl(Identifier fromTable, Identifier toTable) { // update location locationHelper.specifyTableLocation(table, toPath.toString()); - client.alter_table(toTable.getDatabaseName(), toTable.getObjectName(), table); + clients.run( + client -> { + client.alter_table( + toTable.getDatabaseName(), toTable.getObjectName(), table); + return null; + }); } } catch (TException e) { throw new RuntimeException("Failed to rename table " + fromTable.getFullName(), e); + } catch (InterruptedException e) { + throw new RuntimeException(e); } } @@ -448,11 +518,23 @@ protected void alterTableImpl(Identifier identifier, List changes) try { // sync to hive hms - Table table = client.getTable(identifier.getDatabaseName(), identifier.getObjectName()); + Table table = + clients.run( + client -> + client.getTable( + identifier.getDatabaseName(), + identifier.getObjectName())); updateHmsTablePars(table, schema); updateHmsTable(table, identifier, schema); - client.alter_table( - identifier.getDatabaseName(), identifier.getObjectName(), table, true); + clients.run( + client -> { + client.alter_table( + identifier.getDatabaseName(), + identifier.getObjectName(), + table, + true); + return null; + }); } catch (Exception te) { schemaManager.deleteSchema(schema.id()); throw new RuntimeException(te); @@ -465,9 +547,7 @@ public boolean caseSensitive() { } @Override - public void close() throws Exception { - client.close(); - } + public void close() throws Exception {} @Override public String warehouse() { @@ -569,8 +649,8 @@ private void updateHmsTablePars(Table table, TableSchema schema) { } @VisibleForTesting - public IMetaStoreClient getHmsClient() { - return client; + public ClientPool getHmsClient() { + return clients; } private FieldSchema convertToFieldSchema(DataField dataField) { @@ -591,14 +671,10 @@ private Lock lock(Identifier identifier) { } HiveCatalogLock lock = - new HiveCatalogLock(client, checkMaxSleep(hiveConf), acquireTimeout(hiveConf)); + new HiveCatalogLock(clients, checkMaxSleep(hiveConf), acquireTimeout(hiveConf)); return Lock.fromCatalog(lock, identifier); } - static IMetaStoreClient createClient(HiveConf hiveConf, String clientClassName) { - return new RetryingMetaStoreClientFactory().createClient(hiveConf, clientClassName); - } - public static HiveConf createHiveConf( @Nullable String hiveConfDir, @Nullable String hadoopConfDir, @@ -758,4 +834,9 @@ public static Configuration getHadoopConfiguration(String hadoopConfDir) { public static String possibleHiveConfPath() { return System.getenv("HIVE_CONF_DIR"); } + + private RuntimeException convertedInterruptedException(String message, InterruptedException e) { + Thread.currentThread().interrupt(); + return new RuntimeException(message, e); + } } diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java index 8c3d3829ea28..1a076bfa57cc 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java @@ -19,6 +19,7 @@ package org.apache.paimon.hive; import org.apache.paimon.catalog.CatalogLock; +import org.apache.paimon.client.ClientPool; import org.apache.paimon.utils.TimeUtils; import org.apache.hadoop.hive.conf.HiveConf; @@ -45,12 +46,15 @@ public class HiveCatalogLock implements CatalogLock { static final String LOCK_IDENTIFIER = "hive"; - private final IMetaStoreClient client; + private final ClientPool clients; private final long checkMaxSleep; private final long acquireTimeout; - public HiveCatalogLock(IMetaStoreClient client, long checkMaxSleep, long acquireTimeout) { - this.client = client; + public HiveCatalogLock( + ClientPool clients, + long checkMaxSleep, + long acquireTimeout) { + this.clients = clients; this.checkMaxSleep = checkMaxSleep; this.acquireTimeout = acquireTimeout; } @@ -76,7 +80,8 @@ private long lock(String database, String table) Collections.singletonList(lockComponent), System.getProperty("user.name"), InetAddress.getLocalHost().getHostName()); - LockResponse lockResponse = this.client.lock(lockRequest); + LockResponse lockResponse = clients.run(client -> client.lock(lockRequest)); + long lockId = lockResponse.getLockid(); long nextSleep = 50; long startRetry = System.currentTimeMillis(); @@ -87,7 +92,7 @@ private long lock(String database, String table) } Thread.sleep(nextSleep); - lockResponse = client.checkLock(lockResponse.getLockid()); + lockResponse = clients.run(client -> client.checkLock(lockId)); if (System.currentTimeMillis() - startRetry > acquireTimeout) { break; } @@ -96,7 +101,11 @@ private long lock(String database, String table) if (lockResponse.getState() != LockState.ACQUIRED) { if (lockResponse.getState() == LockState.WAITING) { - client.unlock(lockResponse.getLockid()); + clients.run( + client -> { + client.unlock(lockId); + return null; + }); } throw new RuntimeException( "Acquire lock failed with time: " + Duration.ofMillis(retryDuration)); @@ -104,13 +113,17 @@ private long lock(String database, String table) return lockResponse.getLockid(); } - private void unlock(long lockId) throws TException { - client.unlock(lockId); + private void unlock(long lockId) throws TException, InterruptedException { + clients.run( + client -> { + client.unlock(lockId); + return null; + }); } @Override public void close() { - this.client.close(); + /** Do nothing. */ } public static long checkMaxSleep(HiveConf conf) { diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLockContext.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLockContext.java index ecffd7f1e633..60ae93457431 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLockContext.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLockContext.java @@ -19,19 +19,25 @@ package org.apache.paimon.hive; import org.apache.paimon.catalog.CatalogLockContext; +import org.apache.paimon.client.ClientPool; import org.apache.paimon.options.Options; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.thrift.TException; + /** Hive {@link CatalogLockContext}. */ public class HiveCatalogLockContext implements CatalogLockContext { private final SerializableHiveConf hiveConf; - private final String clientClassName; private final Options options; + private final ClientPool clients; public HiveCatalogLockContext( - SerializableHiveConf hiveConf, String clientClassName, Options options) { + ClientPool clients, + SerializableHiveConf hiveConf, + Options options) { + this.clients = clients; this.hiveConf = hiveConf; - this.clientClassName = clientClassName; this.options = options; } @@ -44,7 +50,7 @@ public SerializableHiveConf hiveConf() { return hiveConf; } - public String clientClassName() { - return clientClassName; + public ClientPool getClients() { + return clients; } } diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLockFactory.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLockFactory.java index 7c05ce3ee520..b749e2f661f3 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLockFactory.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLockFactory.java @@ -40,9 +40,7 @@ public CatalogLock createLock(CatalogLockContext context) { HiveCatalogLockContext hiveLockContext = (HiveCatalogLockContext) context; HiveConf conf = hiveLockContext.hiveConf().conf(); return new HiveCatalogLock( - HiveCatalog.createClient(conf, hiveLockContext.clientClassName()), - checkMaxSleep(conf), - acquireTimeout(conf)); + hiveLockContext.getClients(), checkMaxSleep(conf), acquireTimeout(conf)); } @Override diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveClientPool.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveClientPool.java new file mode 100644 index 000000000000..06ede7bbdcad --- /dev/null +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveClientPool.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.hive; + +import org.apache.paimon.client.ClientPool; + +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.thrift.TException; +import org.apache.thrift.transport.TTransportException; + +/** Client pool for hive. */ +public class HiveClientPool extends ClientPool.ClientPoolImpl { + + private final SerializableHiveConf hiveConf; + private final String clientClassName; + private final RetryingMetaStoreClientFactory retryingMetaStoreClientFactory; + + public HiveClientPool(int poolSize, SerializableHiveConf hiveConf, String clientClassName) { + super(poolSize, TTransportException.class, true); + this.hiveConf = hiveConf; + this.clientClassName = clientClassName; + this.retryingMetaStoreClientFactory = new RetryingMetaStoreClientFactory(); + } + + @Override + protected IMetaStoreClient newClient() { + return retryingMetaStoreClientFactory.createClient(hiveConf.conf(), clientClassName); + } + + @Override + protected IMetaStoreClient reconnect(IMetaStoreClient client) { + try { + client.close(); + client.reconnect(); + } catch (MetaException e) { + throw new RuntimeException("Failed to reconnect to Hive Metastore", e); + } + return client; + } + + @Override + protected boolean isConnectionException(Exception e) { + return super.isConnectionException(e) + || (e != null + && e instanceof MetaException + && e.getMessage() + .contains( + "Got exception: org.apache.thrift.transport.TTransportException")); + } + + @Override + protected void close(IMetaStoreClient client) { + client.close(); + } +} diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java index 031b1848a01e..19efc613e892 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java @@ -20,17 +20,20 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.client.ClientPool; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.metastore.MetastoreClient; +import org.apache.paimon.options.CatalogOptions; +import org.apache.paimon.options.Options; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.utils.PartitionPathUtils; import org.apache.paimon.utils.RowDataPartitionComputer; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.thrift.TException; import java.util.ArrayList; import java.util.LinkedHashMap; @@ -42,10 +45,13 @@ public class HiveMetastoreClient implements MetastoreClient { private final Identifier identifier; private final RowDataPartitionComputer partitionComputer; - private final IMetaStoreClient client; + private final ClientPool clients; private final StorageDescriptor sd; - private HiveMetastoreClient(Identifier identifier, TableSchema schema, IMetaStoreClient client) + private HiveMetastoreClient( + Identifier identifier, + TableSchema schema, + ClientPool clients) throws Exception { this.identifier = identifier; this.partitionComputer = @@ -54,8 +60,14 @@ private HiveMetastoreClient(Identifier identifier, TableSchema schema, IMetaStor schema.logicalPartitionType(), schema.partitionKeys().toArray(new String[0])); - this.client = client; - this.sd = client.getTable(identifier.getDatabaseName(), identifier.getObjectName()).getSd(); + this.clients = clients; + this.sd = + clients.run( + client -> + client.getTable( + identifier.getDatabaseName(), + identifier.getObjectName()) + .getSd()); } @Override @@ -67,8 +79,12 @@ public void addPartition(BinaryRow partition) throws Exception { public void addPartition(LinkedHashMap partitionSpec) throws Exception { List partitionValues = new ArrayList<>(partitionSpec.values()); try { - client.getPartition( - identifier.getDatabaseName(), identifier.getObjectName(), partitionValues); + clients.run( + client -> + client.getPartition( + identifier.getDatabaseName(), + identifier.getObjectName(), + partitionValues)); // do nothing if the partition already exists } catch (NoSuchObjectException e) { // partition not found, create new partition @@ -87,7 +103,7 @@ public void addPartition(LinkedHashMap partitionSpec) throws Exc hivePartition.setCreateTime(currentTime); hivePartition.setLastAccessTime(currentTime); - client.add_partition(hivePartition); + clients.run(client -> client.add_partition(hivePartition)); } } @@ -95,11 +111,13 @@ public void addPartition(LinkedHashMap partitionSpec) throws Exc public void deletePartition(LinkedHashMap partitionSpec) throws Exception { List partitionValues = new ArrayList<>(partitionSpec.values()); try { - client.dropPartition( - identifier.getDatabaseName(), - identifier.getObjectName(), - partitionValues, - false); + clients.run( + client -> + client.dropPartition( + identifier.getDatabaseName(), + identifier.getObjectName(), + partitionValues, + false)); } catch (NoSuchObjectException e) { // do nothing if the partition not exists } @@ -107,7 +125,7 @@ public void deletePartition(LinkedHashMap partitionSpec) throws @Override public void close() throws Exception { - client.close(); + // Do nothing } /** Factory to create {@link HiveMetastoreClient}. */ @@ -115,28 +133,36 @@ public static class Factory implements MetastoreClient.Factory { private static final long serialVersionUID = 1L; + private final SerializableHiveConf hiveConf; private final Identifier identifier; private final TableSchema schema; - private final SerializableHiveConf hiveConf; private final String clientClassName; + private final Options options; public Factory( + SerializableHiveConf hiveConf, Identifier identifier, TableSchema schema, - HiveConf hiveConf, - String clientClassName) { + String clientClassName, + Options options) { + this.hiveConf = hiveConf; this.identifier = identifier; this.schema = schema; - this.hiveConf = new SerializableHiveConf(hiveConf); this.clientClassName = clientClassName; + this.options = options; } @Override public MetastoreClient create() { - HiveConf conf = hiveConf.conf(); try { return new HiveMetastoreClient( - identifier, schema, HiveCatalog.createClient(conf, clientClassName)); + identifier, + schema, + HiveCatalog.createClients( + hiveConf.conf(), + clientClassName, + options, + options.get(CatalogOptions.CLIENT_POOL_SIZE))); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java index fd31ec3ec0d7..89026252fc63 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java @@ -20,6 +20,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.client.ClientPool; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.BinaryWriter; import org.apache.paimon.fs.FileIO; @@ -72,7 +73,7 @@ public class HiveMigrator implements Migrator { private final FileIO fileIO; private final HiveCatalog hiveCatalog; - private final IMetaStoreClient client; + private final ClientPool clients; private final String sourceDatabase; private final String sourceTable; private final String targetDatabase; @@ -88,7 +89,7 @@ public HiveMigrator( Map options) { this.hiveCatalog = hiveCatalog; this.fileIO = hiveCatalog.fileIO(); - this.client = hiveCatalog.getHmsClient(); + this.clients = hiveCatalog.getHmsClient(); this.sourceDatabase = sourceDatabase; this.sourceTable = sourceTable; this.targetDatabase = targetDatabase; @@ -98,31 +99,35 @@ public HiveMigrator( public static List databaseMigrators( HiveCatalog hiveCatalog, String sourceDatabase, Map options) { - IMetaStoreClient client = hiveCatalog.getHmsClient(); + ClientPool clients = hiveCatalog.getHmsClient(); try { - return client.getAllTables(sourceDatabase).stream() - .map( - sourceTable -> - new HiveMigrator( - hiveCatalog, - sourceDatabase, - sourceTable, - sourceDatabase, - sourceTable + PAIMON_SUFFIX, - options)) - .collect(Collectors.toList()); + return clients.run( + client -> + client.getAllTables(sourceDatabase).stream() + .map( + sourceTable -> + new HiveMigrator( + hiveCatalog, + sourceDatabase, + sourceTable, + sourceDatabase, + sourceTable + PAIMON_SUFFIX, + options)) + .collect(Collectors.toList())); } catch (TException e) { throw new RuntimeException(e); + } catch (InterruptedException e) { + throw new RuntimeException(e); } } @Override public void executeMigrate() throws Exception { - if (!client.tableExists(sourceDatabase, sourceTable)) { + if (!clients.run(client -> client.tableExists(sourceDatabase, sourceTable))) { throw new RuntimeException("Source hive table does not exist"); } - Table sourceHiveTable = client.getTable(sourceDatabase, sourceTable); + Table sourceHiveTable = clients.run(client -> client.getTable(sourceDatabase, sourceTable)); Map properties = new HashMap<>(sourceHiveTable.getParameters()); checkPrimaryKey(); @@ -132,7 +137,7 @@ public void executeMigrate() throws Exception { if (!alreadyExist) { Schema schema = from( - client.getSchema(sourceDatabase, sourceTable), + clients.run(client -> client.getSchema(sourceDatabase, sourceTable)), sourceHiveTable.getPartitionKeys(), properties); hiveCatalog.createTable(identifier, schema, false); @@ -143,7 +148,10 @@ public void executeMigrate() throws Exception { checkPaimonTable(paimonTable); List partitionsNames = - client.listPartitionNames(sourceDatabase, sourceTable, Short.MAX_VALUE); + clients.run( + client -> + client.listPartitionNames( + sourceDatabase, sourceTable, Short.MAX_VALUE)); checkCompatible(sourceHiveTable, paimonTable); List tasks = new ArrayList<>(); @@ -155,7 +163,7 @@ public void executeMigrate() throws Exception { } else { tasks.addAll( importPartitionedTableTask( - client, + clients, fileIO, partitionsNames, sourceHiveTable, @@ -203,7 +211,11 @@ public void executeMigrate() throws Exception { } // if all success, drop the origin table - client.dropTable(sourceDatabase, sourceTable, true, true); + clients.run( + client -> { + client.dropTable(sourceDatabase, sourceTable, true, true); + return null; + }); } @Override @@ -216,7 +228,7 @@ public void renameTable(boolean ignoreIfNotExists) throws Exception { private void checkPrimaryKey() throws Exception { PrimaryKeysRequest primaryKeysRequest = new PrimaryKeysRequest(sourceDatabase, sourceTable); - if (!client.getPrimaryKeys(primaryKeysRequest).isEmpty()) { + if (!clients.run(client -> client.getPrimaryKeys(primaryKeysRequest).isEmpty())) { throw new IllegalArgumentException("Can't migrate primary key table yet."); } } @@ -264,7 +276,7 @@ public Schema from( } private List importPartitionedTableTask( - IMetaStoreClient client, + ClientPool clients, FileIO fileIO, List partitionNames, Table sourceTable, @@ -283,9 +295,14 @@ private List importPartitionedTableTask( for (String partitionName : partitionNames) { Partition partition = - client.getPartition( - sourceTable.getDbName(), sourceTable.getTableName(), partitionName); - Map values = client.partitionNameToSpec(partitionName); + clients.run( + client -> + client.getPartition( + sourceTable.getDbName(), + sourceTable.getTableName(), + partitionName)); + Map values = + clients.run(client -> client.partitionNameToSpec(partitionName)); String format = parseFormat(partition.getSd().getSerdeInfo().toString()); String location = partition.getSd().getLocation(); BinaryRow partitionRow = diff --git a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java index f88969de7425..9a676d8fbef6 100644 --- a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java +++ b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java @@ -20,6 +20,7 @@ import org.apache.paimon.catalog.CatalogTestBase; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.client.ClientPool; import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; @@ -34,10 +35,9 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.thrift.TException; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.mockito.MockedStatic; -import org.mockito.Mockito; import java.lang.reflect.Field; import java.util.Arrays; @@ -66,10 +66,11 @@ public void setUp() throws Exception { hiveConf.setVar(METASTORECONNECTURLKEY, jdoConnectionURL + ";create=true"); HiveMetaStoreClient metaStoreClient = new HiveMetaStoreClient(hiveConf); String metastoreClientClass = "org.apache.hadoop.hive.metastore.HiveMetaStoreClient"; - try (MockedStatic mocked = Mockito.mockStatic(HiveCatalog.class)) { - mocked.when(() -> HiveCatalog.createClient(hiveConf, metastoreClientClass)) - .thenReturn(metaStoreClient); - } + + // try (MockedStatic mocked = Mockito.mockStatic(HiveCatalog.class)) { + // mocked.when(() -> HiveCatalog.createClient(hiveConf, metastoreClientClass)) + // .thenReturn(metaStoreClient); + // } catalog = new HiveCatalog(fileIO, hiveConf, metastoreClientClass, warehouse); } @@ -203,10 +204,11 @@ public void testAddHiveTableParameters() { addHiveTableParametersSchema, false); - Field clientField = HiveCatalog.class.getDeclaredField("client"); + Field clientField = HiveCatalog.class.getDeclaredField("clients"); clientField.setAccessible(true); - IMetaStoreClient client = (IMetaStoreClient) clientField.get(catalog); - Table table = client.getTable(databaseName, tableName); + ClientPool clients = + (ClientPool) clientField.get(catalog); + Table table = clients.run(client -> client.getTable(databaseName, tableName)); Map tableProperties = table.getParameters(); // Verify the transformed parameters @@ -258,10 +260,11 @@ public void testAlterHiveTableParameters() { Arrays.asList(schemaChange1, schemaChange2), false); - Field clientField = HiveCatalog.class.getDeclaredField("client"); + Field clientField = HiveCatalog.class.getDeclaredField("clients"); clientField.setAccessible(true); - IMetaStoreClient client = (IMetaStoreClient) clientField.get(catalog); - Table table = client.getTable(databaseName, tableName); + ClientPool clients = + (ClientPool) clientField.get(catalog); + Table table = clients.run(client -> client.getTable(databaseName, tableName)); Map tableProperties = table.getParameters(); assertThat(tableProperties).containsEntry("table.owner", "Hms"); diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveLocationTest.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveLocationTest.java index deba6c3955c4..3c0d9ab1ec83 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveLocationTest.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveLocationTest.java @@ -21,6 +21,7 @@ import org.apache.paimon.catalog.AbstractCatalog; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.client.ClientPool; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.hive.annotation.Minio; @@ -41,7 +42,9 @@ import com.klarna.hiverunner.HiveShell; import com.klarna.hiverunner.annotations.HiveSQL; import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.thrift.TException; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -75,7 +78,7 @@ public class HiveLocationTest { private HiveCatalog catalog; - private IMetaStoreClient hmsClient; + private ClientPool hmsClient; private String objectStorepath; @@ -145,7 +148,8 @@ public void testCatalogDBLocation() throws Exception { List paths = new ArrayList<>(); for (String db : dbs) { catalog.createDatabase(db, true); - assertThat(hmsClient.getDatabase(db)).isNotNull(); + Database database = hmsClient.run(client -> client.getDatabase(db)); + assertThat(database).isNotNull(); Path actual = catalog.newDatabasePath(db); Path expected = new Path(this.objectStorepath + "/" + db + ".db"); @@ -192,8 +196,11 @@ public void testCatalogTableLocation() throws Exception { false); Table hmsClientTablea = - hmsClient.getTable( - tableIdentifier.getDatabaseName(), tableIdentifier.getObjectName()); + hmsClient.run( + client -> + client.getTable( + tableIdentifier.getDatabaseName(), + tableIdentifier.getObjectName())); String location = hmsClientTablea.getParameters().get(LocationKeyExtractor.TBPROPERTIES_LOCATION_KEY); String expected = this.objectStorepath + "/" + db + ".db" + "/" + table;