diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java index 2499e075262e5..12536a930ae3e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java @@ -26,6 +26,7 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.operation.Lock; import org.apache.paimon.options.CatalogOptions; +import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.schema.SchemaManager; @@ -76,7 +77,7 @@ public class JdbcCatalog extends AbstractCatalog { protected JdbcCatalog( FileIO fileIO, String catalogKey, Map config, String warehouse) { - super(fileIO); + super(fileIO, Options.fromMap(config)); this.catalogKey = StringUtils.isBlank(catalogKey) ? "jdbc" : catalogKey; this.options = config; this.warehouse = warehouse; @@ -347,10 +348,8 @@ public boolean caseSensitive() { } @Override - public Optional lockFactory() { - return lockEnabled() - ? Optional.of(JdbcCatalogLock.createFactory(connections, catalogKey, options)) - : Optional.empty(); + public Optional lockContext() { + return Optional.of(new JdbcCatalogLock.JdbcLockContext(connections, catalogKey, options)); } private Lock lock(Identifier identifier) { diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java index ff438a8c8f85b..5e605923206b3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java @@ -23,6 +23,10 @@ import org.apache.paimon.catalog.CatalogFactory; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.options.Options; + +import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED; +import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE; /** Factory to create {@link JdbcCatalog}. */ public class JdbcCatalogFactory implements CatalogFactory { @@ -36,7 +40,13 @@ public String identifier() { @Override public Catalog create(FileIO fileIO, Path warehouse, CatalogContext context) { - String catalogKey = context.options().get(JdbcCatalogOptions.CATALOG_KEY); + Options options = context.options(); + String catalogKey = options.get(JdbcCatalogOptions.CATALOG_KEY); + if (options.get(LOCK_ENABLED)) { + if (!options.getOptional(LOCK_TYPE).isPresent()) { + options.set(LOCK_TYPE, JdbcCatalogLock.JdbcCatalogLockFactory.IDENTIFIER); + } + } return new JdbcCatalog(fileIO, catalogKey, context.options().toMap(), warehouse.toString()); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java index 94287cb6e0a05..85f15f7f9d533 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java @@ -86,26 +86,11 @@ public void close() throws IOException { // Do nothing } - /** Create a jdbc lock factory. */ - public static LockFactory createFactory( - JdbcClientPool connections, String catalogName, Map conf) { - return new JdbcCatalogLockFactory(connections, catalogName, conf); - } - - private static class JdbcCatalogLockFactory implements LockFactory { + /** Jdbc catalog lock factory. */ + public static class JdbcCatalogLockFactory implements LockFactory { private static final long serialVersionUID = 1L; - private static final String IDENTIFIER = "jdbc"; - private final JdbcClientPool connections; - private final String catalogName; - private final Map conf; - - public JdbcCatalogLockFactory( - JdbcClientPool connections, String catalogName, Map conf) { - this.connections = connections; - this.catalogName = catalogName; - this.conf = conf; - } + public static final String IDENTIFIER = "jdbc"; @Override public String identifier() { @@ -114,8 +99,25 @@ public String identifier() { @Override public CatalogLock create(LockContext context) { + JdbcLockContext lockContext = (JdbcLockContext) context; return new JdbcCatalogLock( - connections, catalogName, checkMaxSleep(conf), acquireTimeout(conf)); + lockContext.connections, + lockContext.catalogName, + checkMaxSleep(lockContext.conf), + acquireTimeout(lockContext.conf)); + } + } + + static class JdbcLockContext implements LockContext { + private final JdbcClientPool connections; + private final String catalogName; + private final Map conf; + + public JdbcLockContext( + JdbcClientPool connections, String catalogName, Map conf) { + this.connections = connections; + this.catalogName = catalogName; + this.conf = conf; } } diff --git a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index cc2b3f0631386..34de4106bada4 100644 --- a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -15,3 +15,4 @@ org.apache.paimon.catalog.FileSystemCatalogFactory org.apache.paimon.jdbc.JdbcCatalogFactory +org.apache.paimon.jdbc.JdbcCatalogLock$JdbcCatalogLockFactory diff --git a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java index 5cc79fc85da31..d03c64bd825ee 100644 --- a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java @@ -55,6 +55,7 @@ private JdbcCatalog initCatalog(Map props) { properties.put(JdbcCatalog.PROPERTY_PREFIX + "password", "password"); properties.put(CatalogOptions.WAREHOUSE.key(), warehouse); properties.put(CatalogOptions.LOCK_ENABLED.key(), "true"); + properties.put(CatalogOptions.LOCK_TYPE.key(), "jdbc"); properties.putAll(props); JdbcCatalog catalog = new JdbcCatalog(fileIO, "test-jdbc-catalog", properties, warehouse); assertThat(catalog.warehouse()).isEqualTo(warehouse);