diff --git a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java index 0fba499dd13ba..c404469762aa5 100644 --- a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java @@ -59,6 +59,12 @@ public class CatalogOptions { .defaultValue(false) .withDescription("Enable Catalog Lock."); + public static final ConfigOption LOCK_TYPE = + ConfigOptions.key("lock.type") + .stringType() + .noDefaultValue() + .withDescription("The Lock Type for Catalog, such as 'hive', 'zookeeper'."); + public static final ConfigOption LOCK_CHECK_MAX_SLEEP = key("lock-check-max-sleep") .durationType() diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index 3c0b0063b6327..602da69496847 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -46,10 +46,13 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.UUID; import java.util.stream.Collectors; import static org.apache.paimon.options.CatalogOptions.LINEAGE_META; +import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED; +import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE; import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -80,6 +83,30 @@ protected AbstractCatalog(FileIO fileIO, Options options) { this.tableDefaultOptions = convertToPropertiesPrefixKey(options.toMap(), TABLE_DEFAULT_OPTION_PREFIX); this.catalogOptions = options; + + if (lockEnabled()) { + checkArgument(options.contains(LOCK_TYPE), "No lock type when lock is enabled."); + } + } + + @Override + public Optional lockFactory() { + return lockEnabled() + ? Optional.of( + FactoryUtil.discoverFactory( + AbstractCatalog.class.getClassLoader(), + CatalogLock.LockFactory.class, + catalogOptions.get(LOCK_TYPE))) + : Optional.empty(); + } + + @Override + public Optional lockContext() { + return Optional.of(new OptionLockContext(catalogOptions)); + } + + protected boolean lockEnabled() { + return catalogOptions.get(LOCK_ENABLED); } @Override @@ -465,4 +492,12 @@ private void validateAutoCreateClose(Map options) { "The value of %s property should be %s.", CoreOptions.AUTO_CREATE.key(), Boolean.FALSE)); } + + static class OptionLockContext implements CatalogLock.LockContext { + private final Options catalogOptions; + + public OptionLockContext(Options catalogOptions) { + this.catalogOptions = catalogOptions; + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java index 0c01e9cb7a71b..e71c92dc4007f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java @@ -21,6 +21,7 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; +import org.apache.paimon.operation.Lock; import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; @@ -34,7 +35,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.Callable; import static org.apache.paimon.catalog.FileSystemCatalogOptions.CASE_SENSITIVE; @@ -56,11 +56,6 @@ public FileSystemCatalog(FileIO fileIO, Path warehouse, Options options) { this.warehouse = warehouse; } - @Override - public Optional lockFactory() { - return Optional.empty(); - } - @Override public List listDatabases() { List databases = new ArrayList<>(); @@ -128,8 +123,7 @@ private boolean tableExists(Path tablePath) { @Override public TableSchema getDataTableSchema(Identifier identifier) throws TableNotExistException { - Path path = getDataTableLocation(identifier); - return new SchemaManager(fileIO, path) + return schemaManager(identifier) .latest() .orElseThrow(() -> new TableNotExistException(identifier)); } @@ -142,8 +136,24 @@ protected void dropTableImpl(Identifier identifier) { @Override public void createTableImpl(Identifier identifier, Schema schema) { + uncheck(() -> schemaManager(identifier).createTable(schema)); + } + + private SchemaManager schemaManager(Identifier identifier) { Path path = getDataTableLocation(identifier); - uncheck(() -> new SchemaManager(fileIO, path).createTable(schema)); + CatalogLock catalogLock = + lockFactory() + .map( + fac -> + fac.create( + lockContext() + .orElseThrow( + () -> + new RuntimeException( + "No lock context when lock is enabled.")))) + .orElse(null); + return new SchemaManager(fileIO, path) + .withLock(catalogLock == null ? null : Lock.fromCatalog(catalogLock, identifier)); } @Override @@ -156,7 +166,7 @@ public void renameTableImpl(Identifier fromTable, Identifier toTable) { @Override protected void alterTableImpl(Identifier identifier, List changes) throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException { - new SchemaManager(fileIO, getDataTableLocation(identifier)).commitChanges(changes); + schemaManager(identifier).commitChanges(changes); } private static T uncheck(Callable callable) { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java index 71f93553d49d9..b68d65dd22510 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java @@ -20,6 +20,7 @@ import org.apache.paimon.catalog.AbstractCatalog; import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogLock; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.util.AbstractTestBase; import org.apache.paimon.fs.Path; @@ -35,15 +36,19 @@ import org.junit.jupiter.api.Test; import java.io.File; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** ITCase for {@link FlinkCatalog}. */ public class FileSystemCatalogITCase extends AbstractTestBase { + private static final AtomicInteger LOCK_COUNT = new AtomicInteger(0); private static final String DB_NAME = "default"; @@ -113,7 +118,7 @@ public void testCatalogOptionsInheritAndOverride() throws Exception { + "'table-default.opt2'='value2', " + "'table-default.opt3'='value3', " + "'fs.allow-hadoop-fallback'='false'," - + "'lock.enabled'='true'" + + "'lock.enabled'='false'" + ")", path)); tEnv.useCatalog("fs_with_options"); @@ -146,6 +151,39 @@ public void testCatalogOptionsInheritAndOverride() throws Exception { assertThat(tableOptions).doesNotContainKey("lock.enabled"); } + @Test + void testCatalogWithLockForSchema() throws Exception { + LOCK_COUNT.set(0); + assertThatThrownBy( + () -> + tEnv.executeSql( + String.format( + "CREATE CATALOG fs_with_lock WITH (" + + "'type'='paimon', " + + "'warehouse'='%s', " + + "'lock.enabled'='true'" + + ")", + path)) + .await()) + .hasRootCauseMessage("No lock type when lock is enabled."); + tEnv.executeSql( + String.format( + "CREATE CATALOG fs_with_lock WITH (" + + "'type'='paimon', " + + "'warehouse'='%s', " + + "'lock.enabled'='true'," + + "'lock.type'='DUMMY'" + + ")", + path)) + .await(); + tEnv.useCatalog("fs_with_lock"); + tEnv.executeSql("CREATE TABLE table1 (a STRING, b STRING, c STRING)").await(); + tEnv.executeSql("CREATE TABLE table2 (a STRING, b STRING, c STRING)").await(); + tEnv.executeSql("CREATE TABLE table3 (a STRING, b STRING, c STRING)").await(); + tEnv.executeSql("DROP TABLE table3").await(); + assertThat(LOCK_COUNT.get()).isEqualTo(3); + } + private void innerTestWriteRead() throws Exception { tEnv.executeSql("INSERT INTO T VALUES ('1', '2', '3'), ('4', '5', '6')").await(); BlockingIterator iterator = @@ -163,4 +201,29 @@ private List collect(String sql) throws Exception { } return result; } + + /** Lock factory for file system catalog. */ + public static class FileSystemCatalogDummyLockFactory implements CatalogLock.LockFactory { + private static final String IDENTIFIER = "DUMMY"; + + @Override + public String identifier() { + return IDENTIFIER; + } + + @Override + public CatalogLock create(CatalogLock.LockContext context) { + return new CatalogLock() { + @Override + public T runWithLock(String database, String table, Callable callable) + throws Exception { + LOCK_COUNT.incrementAndGet(); + return callable.call(); + } + + @Override + public void close() throws IOException {} + }; + } + } } diff --git a/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory index 22e88ba484201..fcb6fe982943f 100644 --- a/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -16,4 +16,7 @@ org.apache.paimon.flink.FlinkCatalogTest$TestingLogSoreRegisterFactory # Lineage meta factory -org.apache.paimon.flink.FlinkLineageITCase$TestingMemoryLineageMetaFactory \ No newline at end of file +org.apache.paimon.flink.FlinkLineageITCase$TestingMemoryLineageMetaFactory + +# Catalog lock factory +org.apache.paimon.flink.FileSystemCatalogITCase$FileSystemCatalogDummyLockFactory \ No newline at end of file diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index 07463302c78e0..5fec1438b54de 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -77,13 +77,14 @@ import java.util.stream.Collectors; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE; +import static org.apache.paimon.hive.HiveCatalogLock.LOCK_IDENTIFIER; import static org.apache.paimon.hive.HiveCatalogLock.acquireTimeout; import static org.apache.paimon.hive.HiveCatalogLock.checkMaxSleep; import static org.apache.paimon.hive.HiveCatalogOptions.HADOOP_CONF_DIR; import static org.apache.paimon.hive.HiveCatalogOptions.HIVE_CONF_DIR; import static org.apache.paimon.hive.HiveCatalogOptions.IDENTIFIER; import static org.apache.paimon.hive.HiveCatalogOptions.LOCATION_IN_PROPERTIES; -import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED; +import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE; import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE; import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -144,6 +145,18 @@ public HiveCatalog( locationHelper = new StorageLocationHelper(); } + /** Hive catalog only support hive lock. */ + if (lockEnabled()) { + Optional lockType = catalogOptions.getOptional(LOCK_TYPE); + if (lockType.isPresent()) { + checkArgument( + LOCK_IDENTIFIER.equals(lockType.get()), + "Hive catalog only support hive lock type"); + } else { + catalogOptions.set(LOCK_TYPE, LOCK_IDENTIFIER); + } + } + this.client = createClient(hiveConf, clientClassName); } @@ -159,11 +172,6 @@ public Optional lockContext() { new SerializableHiveConf(hiveConf), clientClassName)); } - private boolean lockEnabled() { - return Boolean.parseBoolean( - hiveConf.get(LOCK_ENABLED.key(), LOCK_ENABLED.defaultValue().toString())); - } - @Override public Optional metastoreClientFactory(Identifier identifier) { try { @@ -677,7 +685,8 @@ public static boolean isEmbeddedMetastore(HiveConf hiveConf) { public static Catalog createHiveCatalog(CatalogContext context) { HiveConf hiveConf = createHiveConf(context); - String warehouseStr = context.options().get(CatalogOptions.WAREHOUSE); + Options options = context.options(); + String warehouseStr = options.get(CatalogOptions.WAREHOUSE); if (warehouseStr == null) { warehouseStr = hiveConf.get(METASTOREWAREHOUSE.varname, METASTOREWAREHOUSE.defaultStrVal); @@ -697,8 +706,8 @@ public static Catalog createHiveCatalog(CatalogContext context) { return new HiveCatalog( fileIO, hiveConf, - context.options().get(HiveCatalogFactory.METASTORE_CLIENT_CLASS), - context.options(), + options.get(HiveCatalogFactory.METASTORE_CLIENT_CLASS), + options, warehouse.toUri().toString()); } diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java index 1635d80960f1c..f6b34513dbec0 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java @@ -44,6 +44,8 @@ /** Hive {@link CatalogLock}. */ public class HiveCatalogLock implements CatalogLock { + static final String LOCK_IDENTIFIER = "hive"; + private final IMetaStoreClient client; private final long checkMaxSleep; private final long acquireTimeout; @@ -121,8 +123,6 @@ private static class HiveCatalogLockFactory implements LockFactory { private static final long serialVersionUID = 1L; - private static final String IDENTIFIER = "hive"; - @Override public CatalogLock create(LockContext context) { checkArgument(context instanceof HiveLockContext); @@ -136,7 +136,7 @@ public CatalogLock create(LockContext context) { @Override public String identifier() { - return IDENTIFIER; + return LOCK_IDENTIFIER; } }