diff --git a/docs/content/how-to/creating-catalogs.md b/docs/content/how-to/creating-catalogs.md index 536a6b165ff71..c894f00e0e523 100644 --- a/docs/content/how-to/creating-catalogs.md +++ b/docs/content/how-to/creating-catalogs.md @@ -53,6 +53,17 @@ USE CATALOG my_catalog; You can define any default table options with the prefix `table-default.` for tables created in the catalog. + +The FileSystem catalog supports jdbc lock and can take effect through the following configuration: + +> ```shell +> 'lock.uri' = 'jdbc:mysql://:/' +> 'lock.jdbc.user' = '...', +> 'lock.jdbc.password' = '...', +> ``` + + + {{< /tab >}} {{< tab "Spark3" >}} diff --git a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java index f00a35a750940..1fd7d59f964c1 100644 --- a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java @@ -64,7 +64,8 @@ public class CatalogOptions { ConfigOptions.key("lock.type") .stringType() .noDefaultValue() - .withDescription("The Lock Type for Catalog, such as 'hive', 'zookeeper'."); + .withDescription( + "The Lock Type for Catalog, such as 'hive', 'zookeeper', 'jdbc'."); public static final ConfigOption LOCK_CHECK_MAX_SLEEP = key("lock-check-max-sleep") diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index 4a4fb04fd4e2d..e3ec650568378 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -62,6 +62,7 @@ public abstract class AbstractCatalog implements Catalog { public static final String DB_SUFFIX = ".db"; protected static final String TABLE_DEFAULT_OPTION_PREFIX = "table-default."; protected static final String DB_LOCATION_PROP = "location"; + protected static final String LOCK_PROP_PREFIX = "lock."; protected final FileIO fileIO; protected final Map tableDefaultOptions; @@ -105,6 +106,30 @@ public Optional defaultLockFactory() { return Optional.empty(); } + @Override + public Optional lockContextFactory() { + String lock = catalogOptions.get(LOCK_TYPE); + if (lock == null) { + return Optional.empty(); + } + return Optional.of( + FactoryUtil.discoverFactory( + AbstractCatalog.class.getClassLoader(), + CatalogLockContextFactory.class, + lock)); + } + + public Options extractLockConfiguration(Map properties) { + Map result = new HashMap<>(); + properties.forEach( + (key, value) -> { + if (key.startsWith(LOCK_PROP_PREFIX)) { + result.put(key.substring(LOCK_PROP_PREFIX.length()), value); + } + }); + return Options.fromMap(result); + } + @Override public Optional lockContext() { return Optional.of(CatalogLockContext.fromOptions(catalogOptions)); diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java index 99b71e8deb1e4..976e0b705b51d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java @@ -51,6 +51,11 @@ public interface Catalog extends AutoCloseable { */ Optional lockFactory(); + /** Get lock context factory for lock factory to create a lock. */ + default Optional lockContextFactory() { + return Optional.empty(); + } + /** Get lock context for lock factory to create a lock. */ default Optional lockContext() { return Optional.empty(); diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLockContextFactory.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLockContextFactory.java new file mode 100644 index 0000000000000..2bd5bd7126417 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLockContextFactory.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.catalog; + +import org.apache.paimon.factories.Factory; +import org.apache.paimon.options.Options; + +import java.io.Serializable; + +/** Context for lock context factory to create lock. */ +public interface CatalogLockContextFactory extends Factory, Serializable { + CatalogLockContext createLockContext(Options lockOptions, boolean closeConnectionsUsed); +} diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java index 8ffe0f2719165..b33b890ac6d99 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java @@ -35,6 +35,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.Callable; import static org.apache.paimon.catalog.FileSystemCatalogOptions.CASE_SENSITIVE; @@ -198,4 +199,13 @@ public String warehouse() { public boolean caseSensitive() { return catalogOptions.get(CASE_SENSITIVE); } + + @Override + public Optional lockContext() { + return lockContextFactory() + .map( + factory -> + factory.createLockContext( + extractLockConfiguration(catalogOptions.toMap()), true)); + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java index 8f4b6eede7e55..9fe6c51497016 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java @@ -22,6 +22,8 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.table.TableType; +import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED; +import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE; import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE; /** Factory to create {@link FileSystemCatalog}. */ @@ -40,6 +42,9 @@ public Catalog create(FileIO fileIO, Path warehouse, CatalogContext context) { throw new IllegalArgumentException( "Only managed table is supported in File system catalog."); } + if (context.options().get(LOCK_ENABLED) && context.options().get(LOCK_TYPE) == null) { + throw new IllegalArgumentException("Please configure the lock type correctly."); + } return new FileSystemCatalog(fileIO, warehouse, context.options()); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java index 7e7718b5bee96..053e64fd5e05f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java @@ -350,7 +350,7 @@ public Optional defaultLockFactory() { @Override public Optional lockContext() { - return Optional.of(new JdbcCatalogLockContext(connections, catalogKey, options)); + return Optional.of(new JdbcCatalogLockContext(connections, options, false)); } private Lock lock(Identifier identifier) { diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java index 307f92f0a570f..b2072d98fa30c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java @@ -36,6 +36,7 @@ public class JdbcCatalogLock implements CatalogLock { private final long checkMaxSleep; private final long acquireTimeout; private final String catalogKey; + private final boolean closeConnectionsUsed; public JdbcCatalogLock( JdbcClientPool connections, @@ -46,6 +47,20 @@ public JdbcCatalogLock( this.checkMaxSleep = checkMaxSleep; this.acquireTimeout = acquireTimeout; this.catalogKey = catalogKey; + this.closeConnectionsUsed = false; + } + + public JdbcCatalogLock( + JdbcClientPool connections, + String catalogKey, + long checkMaxSleep, + long acquireTimeout, + boolean closeConnectionsUsed) { + this.connections = connections; + this.checkMaxSleep = checkMaxSleep; + this.acquireTimeout = acquireTimeout; + this.catalogKey = catalogKey; + this.closeConnectionsUsed = closeConnectionsUsed; } @Override @@ -83,7 +98,9 @@ private void lock(String lockUniqueName) throws SQLException, InterruptedExcepti @Override public void close() throws IOException { - // Do nothing + if (closeConnectionsUsed) { + connections.close(); + } } public static long checkMaxSleep(Map conf) { diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLockContext.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLockContext.java index e56b3474ccb3e..74f14a34b7a1c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLockContext.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLockContext.java @@ -19,19 +19,29 @@ package org.apache.paimon.jdbc; import org.apache.paimon.catalog.CatalogLockContext; +import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; +import java.sql.SQLException; + /** Jdbc lock context. */ public class JdbcCatalogLockContext implements CatalogLockContext { - private final JdbcClientPool connections; + private JdbcClientPool connections; + private final boolean closeConnectionsUsed; private final String catalogKey; private final Options options; - public JdbcCatalogLockContext(JdbcClientPool connections, String catalogKey, Options options) { - this.connections = connections; - this.catalogKey = catalogKey; + public JdbcCatalogLockContext(Options options, boolean closeConnectionsUsed) { this.options = options; + this.catalogKey = options.get(JdbcCatalogOptions.CATALOG_KEY); + this.closeConnectionsUsed = closeConnectionsUsed; + } + + JdbcCatalogLockContext( + JdbcClientPool connections, Options options, boolean closeConnectionsUsed) { + this(options, closeConnectionsUsed); + this.connections = connections; } @Override @@ -39,11 +49,29 @@ public Options options() { return options; } - public JdbcClientPool connections() { - return connections; - } - public String catalogKey() { return catalogKey; } + + public boolean isCloseConnectionsUsed() { + return closeConnectionsUsed; + } + + public JdbcClientPool clientPool() { + if (this.connections == null) { + this.connections = + new JdbcClientPool( + options.get(CatalogOptions.CLIENT_POOL_SIZE), + options.get(CatalogOptions.URI.key()), + options.toMap()); + try { + JdbcUtils.createDistributedLockTable(connections, options); + } catch (SQLException e) { + throw new RuntimeException("Cannot initialize JDBC distributed lock.", e); + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted in call to initialize", e); + } + } + return connections; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLockContextFactory.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLockContextFactory.java new file mode 100644 index 0000000000000..0da82c31738d4 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLockContextFactory.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.jdbc; + +import org.apache.paimon.catalog.CatalogLockContext; +import org.apache.paimon.catalog.CatalogLockContextFactory; +import org.apache.paimon.options.Options; + +/** Factory for jdbc catalog lock context. */ +public class JdbcCatalogLockContextFactory implements CatalogLockContextFactory { + + @Override + public String identifier() { + return JdbcCatalogLockFactory.IDENTIFIER; + } + + @Override + public CatalogLockContext createLockContext(Options lockOptions, boolean closeConnectionsUsed) { + return new JdbcCatalogLockContext(lockOptions, closeConnectionsUsed); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLockFactory.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLockFactory.java index ce0a2d24eea51..9eb1914d7bb02 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLockFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLockFactory.java @@ -44,9 +44,10 @@ public CatalogLock createLock(CatalogLockContext context) { JdbcCatalogLockContext lockContext = (JdbcCatalogLockContext) context; Map optionsMap = lockContext.options().toMap(); return new JdbcCatalogLock( - lockContext.connections(), + lockContext.clientPool(), lockContext.catalogKey(), checkMaxSleep(optionsMap), - acquireTimeout(optionsMap)); + acquireTimeout(optionsMap), + lockContext.isCloseConnectionsUsed()); } } diff --git a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index 0f87c96b0d4e7..bf7c9333bc229 100644 --- a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -16,3 +16,4 @@ org.apache.paimon.catalog.FileSystemCatalogFactory org.apache.paimon.jdbc.JdbcCatalogFactory org.apache.paimon.jdbc.JdbcCatalogLockFactory +org.apache.paimon.jdbc.JdbcCatalogLockContextFactory diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java new file mode 100644 index 0000000000000..ed597979a309b --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.catalog; + +import org.apache.paimon.fs.Path; +import org.apache.paimon.jdbc.JdbcCatalog; +import org.apache.paimon.options.CatalogOptions; +import org.apache.paimon.options.Options; + +import org.apache.paimon.shade.guava30.com.google.common.collect.Maps; + +import org.junit.jupiter.api.BeforeEach; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link FileSystemCatalog}. */ +public class FileSystemCatalogTest extends CatalogTestBase { + + @BeforeEach + public void setUp() throws Exception { + super.setUp(); + catalog = initCatalog(Maps.newHashMap()); + } + + private FileSystemCatalog initCatalog(Map props) { + Map properties = Maps.newHashMap(); + properties.put( + AbstractCatalog.LOCK_PROP_PREFIX + CatalogOptions.URI.key(), + "jdbc:sqlite:file::memory:?ic" + UUID.randomUUID().toString().replace("-", "")); + + properties.put( + AbstractCatalog.LOCK_PROP_PREFIX + JdbcCatalog.PROPERTY_PREFIX + "username", + "user"); + properties.put( + AbstractCatalog.LOCK_PROP_PREFIX + JdbcCatalog.PROPERTY_PREFIX + "password", + "password"); + properties.put(CatalogOptions.WAREHOUSE.key(), warehouse); + properties.put(CatalogOptions.LOCK_ENABLED.key(), "true"); + properties.put(CatalogOptions.LOCK_TYPE.key(), "jdbc"); + properties.putAll(props); + FileSystemCatalog catalog = + new FileSystemCatalog(fileIO, new Path(warehouse), Options.fromMap(properties)); + return catalog; + } + + @Override + public void testListDatabasesWhenNoDatabases() { + List databases = catalog.listDatabases(); + assertThat(databases).isEqualTo(new ArrayList<>()); + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java index 50d28cd112fc4..8106316bb3f4e 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java @@ -22,10 +22,12 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogLock; import org.apache.paimon.catalog.CatalogLockContext; +import org.apache.paimon.catalog.CatalogLockContextFactory; import org.apache.paimon.catalog.CatalogLockFactory; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.util.AbstractTestBase; import org.apache.paimon.fs.Path; +import org.apache.paimon.options.Options; import org.apache.paimon.utils.BlockingIterator; import org.apache.flink.api.common.restartstrategy.RestartStrategies; @@ -195,7 +197,7 @@ private List collect(String sql) throws Exception { /** Lock factory for file system catalog. */ public static class FileSystemCatalogDummyLockFactory implements CatalogLockFactory { - private static final String IDENTIFIER = "DUMMY"; + public static final String IDENTIFIER = "DUMMY"; @Override public String identifier() { @@ -217,4 +219,19 @@ public void close() throws IOException {} }; } } + + /** Lock context factory for file system catalog. */ + public static class DummyCatalogLockContextFactory implements CatalogLockContextFactory { + + @Override + public String identifier() { + return FileSystemCatalogDummyLockFactory.IDENTIFIER; + } + + @Override + public CatalogLockContext createLockContext( + Options lockOptions, boolean closeConnectionsUsed) { + return CatalogLockContext.fromOptions(lockOptions); + } + } } diff --git a/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory index fcb6fe982943f..170577467c10c 100644 --- a/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -19,4 +19,8 @@ org.apache.paimon.flink.FlinkCatalogTest$TestingLogSoreRegisterFactory org.apache.paimon.flink.FlinkLineageITCase$TestingMemoryLineageMetaFactory # Catalog lock factory -org.apache.paimon.flink.FileSystemCatalogITCase$FileSystemCatalogDummyLockFactory \ No newline at end of file +org.apache.paimon.flink.FileSystemCatalogITCase$FileSystemCatalogDummyLockFactory + + +# Catalog lock context factory +org.apache.paimon.flink.FileSystemCatalogITCase$DummyCatalogLockContextFactory diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index 372bfedefb888..68ca48bd790ed 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -689,14 +689,17 @@ public static Catalog createHiveCatalog(CatalogContext context) { } public static HiveConf createHiveConf(CatalogContext context) { - String uri = context.options().get(CatalogOptions.URI); - String hiveConfDir = context.options().get(HIVE_CONF_DIR); - String hadoopConfDir = context.options().get(HADOOP_CONF_DIR); - HiveConf hiveConf = - HiveCatalog.createHiveConf(hiveConfDir, hadoopConfDir, context.hadoopConf()); + return createHiveConf(context.options(), context.hadoopConf()); + } + + public static HiveConf createHiveConf(Options options, Configuration hadoopConf) { + String uri = options.get(CatalogOptions.URI); + String hiveConfDir = options.get(HIVE_CONF_DIR); + String hadoopConfDir = options.get(HADOOP_CONF_DIR); + HiveConf hiveConf = HiveCatalog.createHiveConf(hiveConfDir, hadoopConfDir, hadoopConf); // always using user-set parameters overwrite hive-site.xml parameters - context.options().toMap().forEach(hiveConf::set); + options.toMap().forEach(hiveConf::set); if (uri != null) { hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, uri); } diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLockContext.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLockContext.java index ecffd7f1e633c..9d96edb247a38 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLockContext.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLockContext.java @@ -20,6 +20,7 @@ import org.apache.paimon.catalog.CatalogLockContext; import org.apache.paimon.options.Options; +import org.apache.paimon.utils.HadoopUtils; /** Hive {@link CatalogLockContext}. */ public class HiveCatalogLockContext implements CatalogLockContext { @@ -28,6 +29,15 @@ public class HiveCatalogLockContext implements CatalogLockContext { private final String clientClassName; private final Options options; + public HiveCatalogLockContext(Options options) { + this.hiveConf = + new SerializableHiveConf( + HiveCatalog.createHiveConf( + options, HadoopUtils.getHadoopConfiguration(options))); + this.clientClassName = options.get(HiveCatalogFactory.METASTORE_CLIENT_CLASS); + this.options = options; + } + public HiveCatalogLockContext( SerializableHiveConf hiveConf, String clientClassName, Options options) { this.hiveConf = hiveConf; diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLockContextFactory.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLockContextFactory.java new file mode 100644 index 0000000000000..95f253e56c99f --- /dev/null +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLockContextFactory.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.hive; + +import org.apache.paimon.catalog.CatalogLockContext; +import org.apache.paimon.catalog.CatalogLockContextFactory; +import org.apache.paimon.options.Options; + +/** Hive {@link CatalogLockContextFactory}. */ +public class HiveCatalogLockContextFactory implements CatalogLockContextFactory { + @Override + public String identifier() { + return HiveCatalogLock.LOCK_IDENTIFIER; + } + + @Override + public CatalogLockContext createLockContext(Options lockOptions, boolean closeConnectionsUsed) { + return new HiveCatalogLockContext(lockOptions); + } +} diff --git a/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index baab921841290..e02ec0750d430 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -15,3 +15,4 @@ org.apache.paimon.hive.HiveCatalogFactory org.apache.paimon.hive.HiveCatalogLockFactory +org.apache.paimon.hive.HiveCatalogLockContextFactory