From 8a0aec65ee5c91857fd15d254a34ad0b9daab8f7 Mon Sep 17 00:00:00 2001 From: Fang Yong Date: Fri, 15 Mar 2024 16:27:09 +0800 Subject: [PATCH] [core] Support configuring lock in paimon catalog (#2933) * [core] Support configuring lock in paimon catalog --- .../generated/catalog_configuration.html | 8 ++- .../apache/paimon/options/CatalogOptions.java | 8 ++- .../paimon/catalog/AbstractCatalog.java | 35 ++++++++++ .../paimon/catalog/FileSystemCatalog.java | 30 ++++++--- .../org/apache/paimon/jdbc/JdbcCatalog.java | 15 ++--- .../paimon/jdbc/JdbcCatalogFactory.java | 12 +++- .../apache/paimon/jdbc/JdbcCatalogLock.java | 40 ++++++------ .../org.apache.paimon.factories.Factory | 1 + .../apache/paimon/jdbc/JdbcCatalogTest.java | 1 + .../paimon/flink/FileSystemCatalogITCase.java | 65 ++++++++++++++++++- .../org.apache.paimon.factories.Factory | 5 +- .../org/apache/paimon/hive/HiveCatalog.java | 27 ++++---- .../apache/paimon/hive/HiveCatalogLock.java | 14 ++-- .../org.apache.paimon.factories.Factory | 3 + 14 files changed, 197 insertions(+), 67 deletions(-) diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html b/docs/layouts/shortcodes/generated/catalog_configuration.html index e685559447e2..cab6e731e851 100644 --- a/docs/layouts/shortcodes/generated/catalog_configuration.html +++ b/docs/layouts/shortcodes/generated/catalog_configuration.html @@ -62,11 +62,17 @@ Boolean Enable Catalog Lock. + +
lock.type
+ (none) + String + The Lock Type for Catalog, such as 'hive', 'zookeeper'. +
metastore
"filesystem" String - Metastore of paimon catalog, supports filesystem、hive and jdbc. + Metastore of paimon catalog, supports filesystem, hive and jdbc.
table.type
diff --git a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java index 42cd9e418844..f00a35a75094 100644 --- a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java @@ -40,7 +40,7 @@ public class CatalogOptions { .stringType() .defaultValue("filesystem") .withDescription( - "Metastore of paimon catalog, supports filesystem、hive and jdbc."); + "Metastore of paimon catalog, supports filesystem, hive and jdbc."); public static final ConfigOption URI = ConfigOptions.key("uri") @@ -60,6 +60,12 @@ public class CatalogOptions { .defaultValue(false) .withDescription("Enable Catalog Lock."); + public static final ConfigOption LOCK_TYPE = + ConfigOptions.key("lock.type") + .stringType() + .noDefaultValue() + .withDescription("The Lock Type for Catalog, such as 'hive', 'zookeeper'."); + public static final ConfigOption LOCK_CHECK_MAX_SLEEP = key("lock-check-max-sleep") .durationType() diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index 0dbbb1f40302..c69a72b0db6a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -46,10 +46,13 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.UUID; import java.util.stream.Collectors; import static org.apache.paimon.options.CatalogOptions.LINEAGE_META; +import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED; +import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE; import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -80,6 +83,30 @@ protected AbstractCatalog(FileIO fileIO, Options options) { this.tableDefaultOptions = convertToPropertiesPrefixKey(options.toMap(), TABLE_DEFAULT_OPTION_PREFIX); this.catalogOptions = options; + + if (lockEnabled()) { + checkArgument(options.contains(LOCK_TYPE), "No lock type when lock is enabled."); + } + } + + @Override + public Optional lockFactory() { + return lockEnabled() + ? Optional.of( + FactoryUtil.discoverFactory( + AbstractCatalog.class.getClassLoader(), + CatalogLock.LockFactory.class, + catalogOptions.get(LOCK_TYPE))) + : Optional.empty(); + } + + @Override + public Optional lockContext() { + return Optional.of(new OptionLockContext(catalogOptions)); + } + + protected boolean lockEnabled() { + return catalogOptions.get(LOCK_ENABLED); } @Override @@ -465,4 +492,12 @@ private void validateAutoCreateClose(Map options) { "The value of %s property should be %s.", CoreOptions.AUTO_CREATE.key(), Boolean.FALSE)); } + + static class OptionLockContext implements CatalogLock.LockContext { + private final Options catalogOptions; + + public OptionLockContext(Options catalogOptions) { + this.catalogOptions = catalogOptions; + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java index 0c01e9cb7a71..e71c92dc4007 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java @@ -21,6 +21,7 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; +import org.apache.paimon.operation.Lock; import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; @@ -34,7 +35,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.Callable; import static org.apache.paimon.catalog.FileSystemCatalogOptions.CASE_SENSITIVE; @@ -56,11 +56,6 @@ public FileSystemCatalog(FileIO fileIO, Path warehouse, Options options) { this.warehouse = warehouse; } - @Override - public Optional lockFactory() { - return Optional.empty(); - } - @Override public List listDatabases() { List databases = new ArrayList<>(); @@ -128,8 +123,7 @@ private boolean tableExists(Path tablePath) { @Override public TableSchema getDataTableSchema(Identifier identifier) throws TableNotExistException { - Path path = getDataTableLocation(identifier); - return new SchemaManager(fileIO, path) + return schemaManager(identifier) .latest() .orElseThrow(() -> new TableNotExistException(identifier)); } @@ -142,8 +136,24 @@ protected void dropTableImpl(Identifier identifier) { @Override public void createTableImpl(Identifier identifier, Schema schema) { + uncheck(() -> schemaManager(identifier).createTable(schema)); + } + + private SchemaManager schemaManager(Identifier identifier) { Path path = getDataTableLocation(identifier); - uncheck(() -> new SchemaManager(fileIO, path).createTable(schema)); + CatalogLock catalogLock = + lockFactory() + .map( + fac -> + fac.create( + lockContext() + .orElseThrow( + () -> + new RuntimeException( + "No lock context when lock is enabled.")))) + .orElse(null); + return new SchemaManager(fileIO, path) + .withLock(catalogLock == null ? null : Lock.fromCatalog(catalogLock, identifier)); } @Override @@ -156,7 +166,7 @@ public void renameTableImpl(Identifier fromTable, Identifier toTable) { @Override protected void alterTableImpl(Identifier identifier, List changes) throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException { - new SchemaManager(fileIO, getDataTableLocation(identifier)).commitChanges(changes); + schemaManager(identifier).commitChanges(changes); } private static T uncheck(Callable callable) { diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java index 61dc5959c5b5..689a93ee91f7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java @@ -26,6 +26,7 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.operation.Lock; import org.apache.paimon.options.CatalogOptions; +import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.schema.SchemaManager; @@ -55,7 +56,6 @@ import static org.apache.paimon.jdbc.JdbcUtils.execute; import static org.apache.paimon.jdbc.JdbcUtils.insertProperties; import static org.apache.paimon.jdbc.JdbcUtils.updateTable; -import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED; /* This file is based on source code from the Iceberg Project (http://iceberg.apache.org/), licensed by the Apache * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for @@ -76,7 +76,7 @@ public class JdbcCatalog extends AbstractCatalog { protected JdbcCatalog( FileIO fileIO, String catalogKey, Map config, String warehouse) { - super(fileIO); + super(fileIO, Options.fromMap(config)); this.catalogKey = catalogKey; this.options = config; this.warehouse = warehouse; @@ -347,15 +347,8 @@ public boolean caseSensitive() { } @Override - public Optional lockFactory() { - return lockEnabled() - ? Optional.of(JdbcCatalogLock.createFactory(connections, catalogKey, options)) - : Optional.empty(); - } - - private boolean lockEnabled() { - return Boolean.parseBoolean( - options.getOrDefault(LOCK_ENABLED.key(), LOCK_ENABLED.defaultValue().toString())); + public Optional lockContext() { + return Optional.of(new JdbcCatalogLock.JdbcLockContext(connections, catalogKey, options)); } private Lock lock(Identifier identifier) { diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java index ff438a8c8f85..5e605923206b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java @@ -23,6 +23,10 @@ import org.apache.paimon.catalog.CatalogFactory; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.options.Options; + +import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED; +import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE; /** Factory to create {@link JdbcCatalog}. */ public class JdbcCatalogFactory implements CatalogFactory { @@ -36,7 +40,13 @@ public String identifier() { @Override public Catalog create(FileIO fileIO, Path warehouse, CatalogContext context) { - String catalogKey = context.options().get(JdbcCatalogOptions.CATALOG_KEY); + Options options = context.options(); + String catalogKey = options.get(JdbcCatalogOptions.CATALOG_KEY); + if (options.get(LOCK_ENABLED)) { + if (!options.getOptional(LOCK_TYPE).isPresent()) { + options.set(LOCK_TYPE, JdbcCatalogLock.JdbcCatalogLockFactory.IDENTIFIER); + } + } return new JdbcCatalog(fileIO, catalogKey, context.options().toMap(), warehouse.toString()); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java index 94287cb6e0a0..85f15f7f9d53 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java @@ -86,26 +86,11 @@ public void close() throws IOException { // Do nothing } - /** Create a jdbc lock factory. */ - public static LockFactory createFactory( - JdbcClientPool connections, String catalogName, Map conf) { - return new JdbcCatalogLockFactory(connections, catalogName, conf); - } - - private static class JdbcCatalogLockFactory implements LockFactory { + /** Jdbc catalog lock factory. */ + public static class JdbcCatalogLockFactory implements LockFactory { private static final long serialVersionUID = 1L; - private static final String IDENTIFIER = "jdbc"; - private final JdbcClientPool connections; - private final String catalogName; - private final Map conf; - - public JdbcCatalogLockFactory( - JdbcClientPool connections, String catalogName, Map conf) { - this.connections = connections; - this.catalogName = catalogName; - this.conf = conf; - } + public static final String IDENTIFIER = "jdbc"; @Override public String identifier() { @@ -114,8 +99,25 @@ public String identifier() { @Override public CatalogLock create(LockContext context) { + JdbcLockContext lockContext = (JdbcLockContext) context; return new JdbcCatalogLock( - connections, catalogName, checkMaxSleep(conf), acquireTimeout(conf)); + lockContext.connections, + lockContext.catalogName, + checkMaxSleep(lockContext.conf), + acquireTimeout(lockContext.conf)); + } + } + + static class JdbcLockContext implements LockContext { + private final JdbcClientPool connections; + private final String catalogName; + private final Map conf; + + public JdbcLockContext( + JdbcClientPool connections, String catalogName, Map conf) { + this.connections = connections; + this.catalogName = catalogName; + this.conf = conf; } } diff --git a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index cc2b3f063138..34de4106bada 100644 --- a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -15,3 +15,4 @@ org.apache.paimon.catalog.FileSystemCatalogFactory org.apache.paimon.jdbc.JdbcCatalogFactory +org.apache.paimon.jdbc.JdbcCatalogLock$JdbcCatalogLockFactory diff --git a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java index 5cc79fc85da3..d03c64bd825e 100644 --- a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java @@ -55,6 +55,7 @@ private JdbcCatalog initCatalog(Map props) { properties.put(JdbcCatalog.PROPERTY_PREFIX + "password", "password"); properties.put(CatalogOptions.WAREHOUSE.key(), warehouse); properties.put(CatalogOptions.LOCK_ENABLED.key(), "true"); + properties.put(CatalogOptions.LOCK_TYPE.key(), "jdbc"); properties.putAll(props); JdbcCatalog catalog = new JdbcCatalog(fileIO, "test-jdbc-catalog", properties, warehouse); assertThat(catalog.warehouse()).isEqualTo(warehouse); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java index 71f93553d49d..b68d65dd2251 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java @@ -20,6 +20,7 @@ import org.apache.paimon.catalog.AbstractCatalog; import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogLock; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.util.AbstractTestBase; import org.apache.paimon.fs.Path; @@ -35,15 +36,19 @@ import org.junit.jupiter.api.Test; import java.io.File; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** ITCase for {@link FlinkCatalog}. */ public class FileSystemCatalogITCase extends AbstractTestBase { + private static final AtomicInteger LOCK_COUNT = new AtomicInteger(0); private static final String DB_NAME = "default"; @@ -113,7 +118,7 @@ public void testCatalogOptionsInheritAndOverride() throws Exception { + "'table-default.opt2'='value2', " + "'table-default.opt3'='value3', " + "'fs.allow-hadoop-fallback'='false'," - + "'lock.enabled'='true'" + + "'lock.enabled'='false'" + ")", path)); tEnv.useCatalog("fs_with_options"); @@ -146,6 +151,39 @@ public void testCatalogOptionsInheritAndOverride() throws Exception { assertThat(tableOptions).doesNotContainKey("lock.enabled"); } + @Test + void testCatalogWithLockForSchema() throws Exception { + LOCK_COUNT.set(0); + assertThatThrownBy( + () -> + tEnv.executeSql( + String.format( + "CREATE CATALOG fs_with_lock WITH (" + + "'type'='paimon', " + + "'warehouse'='%s', " + + "'lock.enabled'='true'" + + ")", + path)) + .await()) + .hasRootCauseMessage("No lock type when lock is enabled."); + tEnv.executeSql( + String.format( + "CREATE CATALOG fs_with_lock WITH (" + + "'type'='paimon', " + + "'warehouse'='%s', " + + "'lock.enabled'='true'," + + "'lock.type'='DUMMY'" + + ")", + path)) + .await(); + tEnv.useCatalog("fs_with_lock"); + tEnv.executeSql("CREATE TABLE table1 (a STRING, b STRING, c STRING)").await(); + tEnv.executeSql("CREATE TABLE table2 (a STRING, b STRING, c STRING)").await(); + tEnv.executeSql("CREATE TABLE table3 (a STRING, b STRING, c STRING)").await(); + tEnv.executeSql("DROP TABLE table3").await(); + assertThat(LOCK_COUNT.get()).isEqualTo(3); + } + private void innerTestWriteRead() throws Exception { tEnv.executeSql("INSERT INTO T VALUES ('1', '2', '3'), ('4', '5', '6')").await(); BlockingIterator iterator = @@ -163,4 +201,29 @@ private List collect(String sql) throws Exception { } return result; } + + /** Lock factory for file system catalog. */ + public static class FileSystemCatalogDummyLockFactory implements CatalogLock.LockFactory { + private static final String IDENTIFIER = "DUMMY"; + + @Override + public String identifier() { + return IDENTIFIER; + } + + @Override + public CatalogLock create(CatalogLock.LockContext context) { + return new CatalogLock() { + @Override + public T runWithLock(String database, String table, Callable callable) + throws Exception { + LOCK_COUNT.incrementAndGet(); + return callable.call(); + } + + @Override + public void close() throws IOException {} + }; + } + } } diff --git a/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory index 22e88ba48420..fcb6fe982943 100644 --- a/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -16,4 +16,7 @@ org.apache.paimon.flink.FlinkCatalogTest$TestingLogSoreRegisterFactory # Lineage meta factory -org.apache.paimon.flink.FlinkLineageITCase$TestingMemoryLineageMetaFactory \ No newline at end of file +org.apache.paimon.flink.FlinkLineageITCase$TestingMemoryLineageMetaFactory + +# Catalog lock factory +org.apache.paimon.flink.FileSystemCatalogITCase$FileSystemCatalogDummyLockFactory \ No newline at end of file diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index 024857976888..589e920370e0 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -77,6 +77,7 @@ import java.util.stream.Collectors; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE; +import static org.apache.paimon.hive.HiveCatalogLock.LOCK_IDENTIFIER; import static org.apache.paimon.hive.HiveCatalogLock.acquireTimeout; import static org.apache.paimon.hive.HiveCatalogLock.checkMaxSleep; import static org.apache.paimon.hive.HiveCatalogOptions.HADOOP_CONF_DIR; @@ -84,6 +85,7 @@ import static org.apache.paimon.hive.HiveCatalogOptions.IDENTIFIER; import static org.apache.paimon.hive.HiveCatalogOptions.LOCATION_IN_PROPERTIES; import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED; +import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE; import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE; import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -146,11 +148,6 @@ public HiveCatalog( this.client = createClient(hiveConf, clientClassName); } - @Override - public Optional lockFactory() { - return lockEnabled() ? Optional.of(HiveCatalogLock.createFactory()) : Optional.empty(); - } - @Override public Optional lockContext() { return Optional.of( @@ -158,11 +155,6 @@ public Optional lockContext() { new SerializableHiveConf(hiveConf), clientClassName)); } - private boolean lockEnabled() { - return Boolean.parseBoolean( - hiveConf.get(LOCK_ENABLED.key(), LOCK_ENABLED.defaultValue().toString())); - } - @Override public Optional metastoreClientFactory(Identifier identifier) { try { @@ -670,7 +662,8 @@ public static boolean isEmbeddedMetastore(HiveConf hiveConf) { public static Catalog createHiveCatalog(CatalogContext context) { HiveConf hiveConf = createHiveConf(context); - String warehouseStr = context.options().get(CatalogOptions.WAREHOUSE); + Options options = context.options(); + String warehouseStr = options.get(CatalogOptions.WAREHOUSE); if (warehouseStr == null) { warehouseStr = hiveConf.get(METASTOREWAREHOUSE.varname, METASTOREWAREHOUSE.defaultStrVal); @@ -687,11 +680,19 @@ public static Catalog createHiveCatalog(CatalogContext context) { } catch (IOException e) { throw new UncheckedIOException(e); } + + /** Hive catalog only support hive lock. */ + if (options.getOptional(LOCK_ENABLED).orElse(false)) { + Optional lockType = options.getOptional(LOCK_TYPE); + if (!lockType.isPresent()) { + options.set(LOCK_TYPE, LOCK_IDENTIFIER); + } + } return new HiveCatalog( fileIO, hiveConf, - context.options().get(HiveCatalogFactory.METASTORE_CLIENT_CLASS), - context.options(), + options.get(HiveCatalogFactory.METASTORE_CLIENT_CLASS), + options, warehouse.toUri().toString()); } diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java index 1635d80960f1..c49cd020c654 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java @@ -44,6 +44,8 @@ /** Hive {@link CatalogLock}. */ public class HiveCatalogLock implements CatalogLock { + static final String LOCK_IDENTIFIER = "hive"; + private final IMetaStoreClient client; private final long checkMaxSleep; private final long acquireTimeout; @@ -112,17 +114,11 @@ public void close() { this.client.close(); } - /** Create a hive lock factory. */ - public static LockFactory createFactory() { - return new HiveCatalogLockFactory(); - } - - private static class HiveCatalogLockFactory implements LockFactory { + /** Catalog lock factory for hive. */ + public static class HiveCatalogLockFactory implements LockFactory { private static final long serialVersionUID = 1L; - private static final String IDENTIFIER = "hive"; - @Override public CatalogLock create(LockContext context) { checkArgument(context instanceof HiveLockContext); @@ -136,7 +132,7 @@ public CatalogLock create(LockContext context) { @Override public String identifier() { - return IDENTIFIER; + return LOCK_IDENTIFIER; } } diff --git a/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index dcc7e6554426..d4af13cc08e6 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -14,3 +14,6 @@ # limitations under the License. org.apache.paimon.hive.HiveCatalogFactory + +# Hive catalog lock factory +org.apache.paimon.hive.HiveCatalogLock$HiveCatalogLockFactory