diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index 89fd52f6cfe6..3c0b0063b632 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -304,7 +304,8 @@ private FileStoreTable getDataTable(Identifier identifier) throws TableNotExistE getDataTableLocation(identifier), tableSchema, new CatalogEnvironment( - Lock.factory(lockFactory().orElse(null), identifier), + Lock.factory( + lockFactory().orElse(null), lockContext().orElse(null), identifier), metastoreClientFactory(identifier).orElse(null), lineageMetaFactory)); } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java index 005f5b16aad3..0168891b008a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java @@ -49,7 +49,12 @@ public interface Catalog extends AutoCloseable { * Get lock factory from catalog. Lock is used to support multiple concurrent writes on the * object store. */ - Optional lockFactory(); + Optional lockFactory(); + + /** Get lock context for lock factory to create a lock. */ + default Optional lockContext() { + return Optional.empty(); + } /** Get metastore client factory for the table specified by {@code identifier}. */ default Optional metastoreClientFactory(Identifier identifier) { diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLock.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLock.java index 278b3ad631af..0e547037e3a2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLock.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLock.java @@ -19,6 +19,7 @@ package org.apache.paimon.catalog; import org.apache.paimon.annotation.Public; +import org.apache.paimon.factories.Factory; import java.io.Closeable; import java.io.Serializable; @@ -36,7 +37,10 @@ public interface CatalogLock extends Closeable { T runWithLock(String database, String table, Callable callable) throws Exception; /** Factory to create {@link CatalogLock}. */ - interface Factory extends Serializable { - CatalogLock create(); + interface LockFactory extends Factory, Serializable { + CatalogLock create(LockContext context); } + + /** Context for lock factory to create lock. */ + interface LockContext extends Serializable {} } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java index e458dad7c34e..0c01e9cb7a71 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java @@ -57,7 +57,7 @@ public FileSystemCatalog(FileIO fileIO, Path warehouse, Options options) { } @Override - public Optional lockFactory() { + public Optional lockFactory() { return Optional.empty(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/Lock.java b/paimon-core/src/main/java/org/apache/paimon/operation/Lock.java index 499a7ca6a319..a9f27e70ae6a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/Lock.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/Lock.java @@ -43,10 +43,13 @@ interface Factory extends Serializable { Lock create(); } - static Factory factory(@Nullable CatalogLock.Factory lockFactory, Identifier tablePath) { + static Factory factory( + @Nullable CatalogLock.LockFactory lockFactory, + @Nullable CatalogLock.LockContext lockContext, + Identifier tablePath) { return lockFactory == null ? new EmptyFactory() - : new CatalogLockFactory(lockFactory, tablePath); + : new CatalogLockFactory(lockFactory, lockContext, tablePath); } static Factory emptyFactory() { @@ -58,17 +61,22 @@ class CatalogLockFactory implements Factory { private static final long serialVersionUID = 1L; - private final CatalogLock.Factory lockFactory; + private final CatalogLock.LockFactory lockFactory; + private final CatalogLock.LockContext lockContext; private final Identifier tablePath; - public CatalogLockFactory(CatalogLock.Factory lockFactory, Identifier tablePath) { + public CatalogLockFactory( + CatalogLock.LockFactory lockFactory, + CatalogLock.LockContext lockContext, + Identifier tablePath) { this.lockFactory = lockFactory; + this.lockContext = lockContext; this.tablePath = tablePath; } @Override public Lock create() { - return fromCatalog(lockFactory.create(), tablePath); + return fromCatalog(lockFactory.create(lockContext), tablePath); } } diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index 7d50b7e8b134..07463302c78e 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -148,10 +148,15 @@ public HiveCatalog( } @Override - public Optional lockFactory() { - return lockEnabled() - ? Optional.of(HiveCatalogLock.createFactory(hiveConf, clientClassName)) - : Optional.empty(); + public Optional lockFactory() { + return lockEnabled() ? Optional.of(HiveCatalogLock.createFactory()) : Optional.empty(); + } + + @Override + public Optional lockContext() { + return Optional.of( + new HiveCatalogLock.HiveLockContext( + new SerializableHiveConf(hiveConf), clientClassName)); } private boolean lockEnabled() { diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java index 0f6cd4837ff8..898f6bf57af1 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java @@ -39,6 +39,7 @@ import static org.apache.paimon.options.CatalogOptions.LOCK_ACQUIRE_TIMEOUT; import static org.apache.paimon.options.CatalogOptions.LOCK_CHECK_MAX_SLEEP; +import static org.apache.paimon.utils.Preconditions.checkArgument; /** Hive {@link CatalogLock}. */ public class HiveCatalogLock implements CatalogLock { @@ -112,30 +113,31 @@ public void close() { } /** Create a hive lock factory. */ - public static CatalogLock.Factory createFactory(HiveConf hiveConf, String clientClassName) { - return new HiveCatalogLockFactory(hiveConf, clientClassName); + public static LockFactory createFactory() { + return new HiveCatalogLockLockFactory(); } - private static class HiveCatalogLockFactory implements CatalogLock.Factory { + private static class HiveCatalogLockLockFactory implements LockFactory { private static final long serialVersionUID = 1L; - private final SerializableHiveConf hiveConf; - private final String clientClassName; - - public HiveCatalogLockFactory(HiveConf hiveConf, String clientClassName) { - this.hiveConf = new SerializableHiveConf(hiveConf); - this.clientClassName = clientClassName; - } + private static final String IDENTIFIER = "hive"; @Override - public CatalogLock create() { - HiveConf conf = hiveConf.conf(); + public CatalogLock create(LockContext context) { + checkArgument(context instanceof HiveLockContext); + HiveLockContext hiveLockContext = (HiveLockContext) context; + HiveConf conf = hiveLockContext.hiveConf.conf(); return new HiveCatalogLock( - HiveCatalog.createClient(conf, clientClassName), + HiveCatalog.createClient(conf, hiveLockContext.clientClassName), checkMaxSleep(conf), acquireTimeout(conf)); } + + @Override + public String identifier() { + return IDENTIFIER; + } } public static long checkMaxSleep(HiveConf conf) { @@ -153,4 +155,14 @@ public static long acquireTimeout(HiveConf conf) { TimeUtils.getStringInMillis(LOCK_ACQUIRE_TIMEOUT.defaultValue()))) .toMillis(); } + + static class HiveLockContext implements LockContext { + private final SerializableHiveConf hiveConf; + private final String clientClassName; + + public HiveLockContext(SerializableHiveConf hiveConf, String clientClassName) { + this.hiveConf = hiveConf; + this.clientClassName = clientClassName; + } + } } diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java index 051a23979023..9394355605b3 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java @@ -689,11 +689,9 @@ public void testAlterTable() throws Exception { @Test public void testHiveLock() throws InterruptedException { tEnv.executeSql("CREATE TABLE t (a INT)"); - CatalogLock.Factory lockFactory = - ((FlinkCatalog) tEnv.getCatalog(tEnv.getCurrentCatalog()).get()) - .catalog() - .lockFactory() - .get(); + Catalog catalog = + ((FlinkCatalog) tEnv.getCatalog(tEnv.getCurrentCatalog()).get()).catalog(); + CatalogLock.LockFactory lockFactory = catalog.lockFactory().get(); AtomicInteger count = new AtomicInteger(0); List threads = new ArrayList<>(); @@ -708,7 +706,7 @@ public void testHiveLock() throws InterruptedException { Thread thread = new Thread( () -> { - CatalogLock lock = lockFactory.create(); + CatalogLock lock = lockFactory.create(catalog.lockContext().get()); for (int j = 0; j < 10; j++) { try { lock.runWithLock("test_db", "t", unsafeIncrement);