diff --git a/docs/content/how-to/creating-catalogs.md b/docs/content/how-to/creating-catalogs.md index fc2927d4153a..536a6b165ff7 100644 --- a/docs/content/how-to/creating-catalogs.md +++ b/docs/content/how-to/creating-catalogs.md @@ -53,16 +53,6 @@ USE CATALOG my_catalog; You can define any default table options with the prefix `table-default.` for tables created in the catalog. -The FileSystem catalog supports jdbc lock and can take effect through the following configuration: - -> ```shell -> 'uri' = 'jdbc:mysql://:/' -> 'jdbc.user' = '...', -> 'jdbc.password' = '...', -> ``` - - - {{< /tab >}} {{< tab "Spark3" >}} diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index 0181d2db9e9f..5be55102a5cd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -83,26 +83,31 @@ protected AbstractCatalog(FileIO fileIO, Options options) { this.tableDefaultOptions = convertToPropertiesPrefixKey(options.toMap(), TABLE_DEFAULT_OPTION_PREFIX); this.catalogOptions = options; + } - if (lockEnabled()) { - checkArgument(options.contains(LOCK_TYPE), "No lock type when lock is enabled."); + @Override + public Optional lockFactory() { + if (!lockEnabled()) { + return Optional.empty(); } + + String lock = catalogOptions.get(LOCK_TYPE); + if (lock == null) { + return defaultLockFactory(); + } + + return Optional.of( + FactoryUtil.discoverFactory( + AbstractCatalog.class.getClassLoader(), CatalogLockFactory.class, lock)); } - @Override - public Optional lockFactory() { - return lockEnabled() - ? Optional.of( - FactoryUtil.discoverFactory( - AbstractCatalog.class.getClassLoader(), - CatalogLock.LockFactory.class, - catalogOptions.get(LOCK_TYPE))) - : Optional.empty(); + public Optional defaultLockFactory() { + return Optional.empty(); } @Override - public Optional lockContext() { - return Optional.of(new OptionLockContext(catalogOptions)); + public Optional lockContext() { + return Optional.of(CatalogLockContext.fromOptions(catalogOptions)); } protected boolean lockEnabled() { @@ -498,12 +503,4 @@ private void validateAutoCreateClose(Map options) { "The value of %s property should be %s.", CoreOptions.AUTO_CREATE.key(), Boolean.FALSE)); } - - static class OptionLockContext implements CatalogLock.LockContext { - private final Options catalogOptions; - - public OptionLockContext(Options catalogOptions) { - this.catalogOptions = catalogOptions; - } - } } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java index ffe33cb21731..07a6b274cf3a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java @@ -49,10 +49,10 @@ public interface Catalog extends AutoCloseable { * Get lock factory from catalog. Lock is used to support multiple concurrent writes on the * object store. */ - Optional lockFactory(); + Optional lockFactory(); /** Get lock context for lock factory to create a lock. */ - default Optional lockContext() { + default Optional lockContext() { return Optional.empty(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLock.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLock.java index 0e547037e3a2..4f147c6aada7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLock.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLock.java @@ -19,10 +19,8 @@ package org.apache.paimon.catalog; import org.apache.paimon.annotation.Public; -import org.apache.paimon.factories.Factory; import java.io.Closeable; -import java.io.Serializable; import java.util.concurrent.Callable; /** @@ -35,12 +33,4 @@ public interface CatalogLock extends Closeable { /** Run with catalog lock. The caller should tell catalog the database and table name. */ T runWithLock(String database, String table, Callable callable) throws Exception; - - /** Factory to create {@link CatalogLock}. */ - interface LockFactory extends Factory, Serializable { - CatalogLock create(LockContext context); - } - - /** Context for lock factory to create lock. */ - interface LockContext extends Serializable {} } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLockContext.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLockContext.java new file mode 100644 index 000000000000..442409c81be8 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLockContext.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.catalog; + +import org.apache.paimon.options.Options; + +import java.io.Serializable; + +/** Context for lock factory to create lock. */ +public interface CatalogLockContext extends Serializable { + + Options options(); + + static CatalogLockContext fromOptions(Options options) { + return () -> options; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLockFactory.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLockFactory.java new file mode 100644 index 000000000000..d964ebda4ee2 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLockFactory.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.catalog; + +import org.apache.paimon.factories.Factory; + +import java.io.Serializable; + +/** Factory to create {@link CatalogLock}. */ +public interface CatalogLockFactory extends Factory, Serializable { + + CatalogLock createLock(CatalogLockContext context); +} diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java index b2c08ae384f7..a6c018b490a8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java @@ -18,7 +18,6 @@ package org.apache.paimon.catalog; -import org.apache.paimon.client.ClientPool; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; @@ -36,7 +35,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.Callable; import static org.apache.paimon.catalog.FileSystemCatalogOptions.CASE_SENSITIVE; @@ -48,8 +46,6 @@ public class FileSystemCatalog extends AbstractCatalog { private final Path warehouse; - private ClientPool.ClientPoolImpl clientPool; - public FileSystemCatalog(FileIO fileIO, Path warehouse) { super(fileIO); this.warehouse = warehouse; @@ -161,7 +157,7 @@ private SchemaManager schemaManager(Identifier identifier) { lockFactory() .map( fac -> - fac.create( + fac.createLock( lockContext() .orElseThrow( () -> @@ -172,14 +168,6 @@ private SchemaManager schemaManager(Identifier identifier) { .withLock(catalogLock == null ? null : Lock.fromCatalog(catalogLock, identifier)); } - @Override - public Optional lockContext() { - if (clientPool == null) { - this.clientPool = LockContextUtils.tryInitializeClientPool(catalogOptions); - } - return LockContextUtils.lockContext(this.clientPool, catalogOptions, "filesystem"); - } - @Override public void renameTableImpl(Identifier fromTable, Identifier toTable) { Path fromPath = getDataTableLocation(fromTable); @@ -211,9 +199,7 @@ private static String database(Path path) { } @Override - public void close() throws Exception { - LockContextUtils.close(clientPool); - } + public void close() throws Exception {} @Override public String warehouse() { diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/LockContextUtils.java b/paimon-core/src/main/java/org/apache/paimon/catalog/LockContextUtils.java deleted file mode 100644 index 699c54de4474..000000000000 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/LockContextUtils.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.catalog; - -import org.apache.paimon.client.ClientPool; -import org.apache.paimon.jdbc.JdbcCatalogFactory; -import org.apache.paimon.jdbc.JdbcCatalogLock; -import org.apache.paimon.jdbc.JdbcClientPool; -import org.apache.paimon.jdbc.JdbcUtils; -import org.apache.paimon.options.CatalogOptions; -import org.apache.paimon.options.Options; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.sql.SQLException; -import java.util.Optional; - -/** Utils for {@link org.apache.paimon.catalog.CatalogLock.LockContext}. */ -public class LockContextUtils { - - private static final Logger LOG = LoggerFactory.getLogger(FileSystemCatalog.class); - - public static Optional lockContext( - ClientPool.ClientPoolImpl clientPool, Options catalogOptions, String catalogKey) { - if (clientPool == null) { - return Optional.of(new AbstractCatalog.OptionLockContext(catalogOptions)); - } - String lockType = catalogOptions.get(CatalogOptions.LOCK_TYPE); - switch (lockType) { - case JdbcCatalogFactory.IDENTIFIER: - JdbcClientPool connections = (JdbcClientPool) clientPool; - return Optional.of( - new JdbcCatalogLock.JdbcLockContext( - connections, catalogKey, catalogOptions)); - default: - LOG.warn("Unsupported lock type:" + lockType); - return Optional.of(new AbstractCatalog.OptionLockContext(catalogOptions)); - } - } - - public static ClientPool.ClientPoolImpl tryInitializeClientPool(Options catalogOptions) { - String lockType = catalogOptions.get(CatalogOptions.LOCK_TYPE); - if (lockType == null) { - return null; - } - switch (lockType) { - case JdbcCatalogFactory.IDENTIFIER: - JdbcClientPool connections = - new JdbcClientPool( - catalogOptions.get(CatalogOptions.CLIENT_POOL_SIZE), - catalogOptions.get(CatalogOptions.URI.key()), - catalogOptions.toMap()); - try { - JdbcUtils.createDistributedLockTable(connections, catalogOptions); - } catch (SQLException e) { - throw new RuntimeException("Cannot initialize JDBC distributed lock.", e); - } catch (InterruptedException e) { - throw new RuntimeException("Interrupted in call to initialize", e); - } - return connections; - default: - LOG.warn("Unsupported lock type:" + lockType); - return null; - } - } - - public static void close(ClientPool.ClientPoolImpl clientPool) { - if (clientPool == null) { - return; - } - if (clientPool instanceof JdbcClientPool) { - JdbcClientPool connections = (JdbcClientPool) clientPool; - if (!connections.isClosed()) { - connections.close(); - } - } else { - clientPool.close(); - } - clientPool = null; - } -} diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java index f83402f36ab2..c81af9b4c913 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java @@ -20,7 +20,8 @@ import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.catalog.AbstractCatalog; -import org.apache.paimon.catalog.CatalogLock; +import org.apache.paimon.catalog.CatalogLockContext; +import org.apache.paimon.catalog.CatalogLockFactory; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; @@ -344,8 +345,13 @@ public boolean caseSensitive() { } @Override - public Optional lockContext() { - return Optional.of(new JdbcCatalogLock.JdbcLockContext(connections, catalogKey, options)); + public Optional defaultLockFactory() { + return Optional.of(new JdbcCatalogLockFactory()); + } + + @Override + public Optional lockContext() { + return Optional.of(new JdbcCatalogLockContext(connections, catalogKey, options)); } private Lock lock(Identifier identifier) { diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java index adaaf3f43632..6c3c1d0e41cc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java @@ -25,9 +25,6 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.options.Options; -import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED; -import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE; - /** Factory to create {@link JdbcCatalog}. */ public class JdbcCatalogFactory implements CatalogFactory { @@ -42,11 +39,6 @@ public String identifier() { public Catalog create(FileIO fileIO, Path warehouse, CatalogContext context) { Options options = context.options(); String catalogKey = options.get(JdbcCatalogOptions.CATALOG_KEY); - if (options.get(LOCK_ENABLED)) { - if (!options.getOptional(LOCK_TYPE).isPresent()) { - options.set(LOCK_TYPE, JdbcCatalogLock.JdbcCatalogLockFactory.IDENTIFIER); - } - } return new JdbcCatalog(fileIO, catalogKey, context.options(), warehouse.toString()); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java index 2d409f12a693..307f92f0a570 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java @@ -19,7 +19,6 @@ package org.apache.paimon.jdbc; import org.apache.paimon.catalog.CatalogLock; -import org.apache.paimon.options.Options; import org.apache.paimon.utils.TimeUtils; import java.io.IOException; @@ -87,41 +86,6 @@ public void close() throws IOException { // Do nothing } - /** Jdbc catalog lock factory. */ - public static class JdbcCatalogLockFactory implements LockFactory { - - private static final long serialVersionUID = 1L; - public static final String IDENTIFIER = "jdbc"; - - @Override - public String identifier() { - return IDENTIFIER; - } - - @Override - public CatalogLock create(LockContext context) { - JdbcLockContext lockContext = (JdbcLockContext) context; - return new JdbcCatalogLock( - lockContext.connections, - lockContext.catalogKey, - checkMaxSleep(lockContext.conf.toMap()), - acquireTimeout(lockContext.conf.toMap())); - } - } - - /** Jdbc lock context. */ - public static class JdbcLockContext implements LockContext { - private final JdbcClientPool connections; - private final String catalogKey; - private final Options conf; - - public JdbcLockContext(JdbcClientPool connections, String catalogKey, Options conf) { - this.connections = connections; - this.catalogKey = catalogKey; - this.conf = conf; - } - } - public static long checkMaxSleep(Map conf) { return TimeUtils.parseDuration( conf.getOrDefault( diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLockContext.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLockContext.java new file mode 100644 index 000000000000..e56b3474ccb3 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLockContext.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.jdbc; + +import org.apache.paimon.catalog.CatalogLockContext; +import org.apache.paimon.options.Options; + +/** Jdbc lock context. */ +public class JdbcCatalogLockContext implements CatalogLockContext { + + private final JdbcClientPool connections; + private final String catalogKey; + private final Options options; + + public JdbcCatalogLockContext(JdbcClientPool connections, String catalogKey, Options options) { + this.connections = connections; + this.catalogKey = catalogKey; + this.options = options; + } + + @Override + public Options options() { + return options; + } + + public JdbcClientPool connections() { + return connections; + } + + public String catalogKey() { + return catalogKey; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLockFactory.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLockFactory.java new file mode 100644 index 000000000000..ce0a2d24eea5 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLockFactory.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.jdbc; + +import org.apache.paimon.catalog.CatalogLock; +import org.apache.paimon.catalog.CatalogLockContext; +import org.apache.paimon.catalog.CatalogLockFactory; + +import java.util.Map; + +import static org.apache.paimon.jdbc.JdbcCatalogLock.acquireTimeout; +import static org.apache.paimon.jdbc.JdbcCatalogLock.checkMaxSleep; + +/** Jdbc catalog lock factory. */ +public class JdbcCatalogLockFactory implements CatalogLockFactory { + + private static final long serialVersionUID = 1L; + + public static final String IDENTIFIER = "jdbc"; + + @Override + public String identifier() { + return IDENTIFIER; + } + + @Override + public CatalogLock createLock(CatalogLockContext context) { + JdbcCatalogLockContext lockContext = (JdbcCatalogLockContext) context; + Map optionsMap = lockContext.options().toMap(); + return new JdbcCatalogLock( + lockContext.connections(), + lockContext.catalogKey(), + checkMaxSleep(optionsMap), + acquireTimeout(optionsMap)); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/Lock.java b/paimon-core/src/main/java/org/apache/paimon/operation/Lock.java index a9f27e70ae6a..76cd8b178d2c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/Lock.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/Lock.java @@ -20,6 +20,8 @@ import org.apache.paimon.annotation.Public; import org.apache.paimon.catalog.CatalogLock; +import org.apache.paimon.catalog.CatalogLockContext; +import org.apache.paimon.catalog.CatalogLockFactory; import org.apache.paimon.catalog.Identifier; import javax.annotation.Nullable; @@ -44,12 +46,12 @@ interface Factory extends Serializable { } static Factory factory( - @Nullable CatalogLock.LockFactory lockFactory, - @Nullable CatalogLock.LockContext lockContext, + @Nullable CatalogLockFactory lockFactory, + @Nullable CatalogLockContext lockContext, Identifier tablePath) { return lockFactory == null ? new EmptyFactory() - : new CatalogLockFactory(lockFactory, lockContext, tablePath); + : new LockFactory(lockFactory, lockContext, tablePath); } static Factory emptyFactory() { @@ -57,17 +59,17 @@ static Factory emptyFactory() { } /** A {@link Factory} creating lock from catalog. */ - class CatalogLockFactory implements Factory { + class LockFactory implements Factory { private static final long serialVersionUID = 1L; - private final CatalogLock.LockFactory lockFactory; - private final CatalogLock.LockContext lockContext; + private final CatalogLockFactory lockFactory; + private final CatalogLockContext lockContext; private final Identifier tablePath; - public CatalogLockFactory( - CatalogLock.LockFactory lockFactory, - CatalogLock.LockContext lockContext, + public LockFactory( + CatalogLockFactory lockFactory, + CatalogLockContext lockContext, Identifier tablePath) { this.lockFactory = lockFactory; this.lockContext = lockContext; @@ -76,7 +78,7 @@ public CatalogLockFactory( @Override public Lock create() { - return fromCatalog(lockFactory.create(lockContext), tablePath); + return fromCatalog(lockFactory.createLock(lockContext), tablePath); } } diff --git a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index 34de4106bada..0f87c96b0d4e 100644 --- a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -15,4 +15,4 @@ org.apache.paimon.catalog.FileSystemCatalogFactory org.apache.paimon.jdbc.JdbcCatalogFactory -org.apache.paimon.jdbc.JdbcCatalogLock$JdbcCatalogLockFactory +org.apache.paimon.jdbc.JdbcCatalogLockFactory diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java deleted file mode 100644 index 0948ab07c4c2..000000000000 --- a/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.catalog; - -import org.apache.paimon.fs.Path; -import org.apache.paimon.jdbc.JdbcCatalog; -import org.apache.paimon.options.CatalogOptions; -import org.apache.paimon.options.Options; - -import org.apache.paimon.shade.guava30.com.google.common.collect.Maps; - -import org.junit.jupiter.api.BeforeEach; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.UUID; - -import static org.assertj.core.api.Assertions.assertThat; - -/** Tests for {@link FileSystemCatalog}. */ -public class FileSystemCatalogTest extends CatalogTestBase { - - @BeforeEach - public void setUp() throws Exception { - super.setUp(); - catalog = initCatalog(Maps.newHashMap()); - } - - private FileSystemCatalog initCatalog(Map props) { - Map properties = Maps.newHashMap(); - properties.put( - CatalogOptions.URI.key(), - "jdbc:sqlite:file::memory:?ic" + UUID.randomUUID().toString().replace("-", "")); - - properties.put(JdbcCatalog.PROPERTY_PREFIX + "username", "user"); - properties.put(JdbcCatalog.PROPERTY_PREFIX + "password", "password"); - properties.put(CatalogOptions.WAREHOUSE.key(), warehouse); - properties.put(CatalogOptions.LOCK_ENABLED.key(), "true"); - properties.put(CatalogOptions.LOCK_TYPE.key(), "jdbc"); - properties.putAll(props); - FileSystemCatalog catalog = - new FileSystemCatalog(fileIO, new Path(warehouse), Options.fromMap(properties)); - return catalog; - } - - @Override - public void testListDatabasesWhenNoDatabases() { - List databases = catalog.listDatabases(); - assertThat(databases).isEqualTo(new ArrayList<>()); - } -} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java index b68d65dd2251..50d28cd112fc 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java @@ -21,6 +21,8 @@ import org.apache.paimon.catalog.AbstractCatalog; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogLock; +import org.apache.paimon.catalog.CatalogLockContext; +import org.apache.paimon.catalog.CatalogLockFactory; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.util.AbstractTestBase; import org.apache.paimon.fs.Path; @@ -154,18 +156,6 @@ public void testCatalogOptionsInheritAndOverride() throws Exception { @Test void testCatalogWithLockForSchema() throws Exception { LOCK_COUNT.set(0); - assertThatThrownBy( - () -> - tEnv.executeSql( - String.format( - "CREATE CATALOG fs_with_lock WITH (" - + "'type'='paimon', " - + "'warehouse'='%s', " - + "'lock.enabled'='true'" - + ")", - path)) - .await()) - .hasRootCauseMessage("No lock type when lock is enabled."); tEnv.executeSql( String.format( "CREATE CATALOG fs_with_lock WITH (" @@ -203,7 +193,8 @@ private List collect(String sql) throws Exception { } /** Lock factory for file system catalog. */ - public static class FileSystemCatalogDummyLockFactory implements CatalogLock.LockFactory { + public static class FileSystemCatalogDummyLockFactory implements CatalogLockFactory { + private static final String IDENTIFIER = "DUMMY"; @Override @@ -212,7 +203,7 @@ public String identifier() { } @Override - public CatalogLock create(CatalogLock.LockContext context) { + public CatalogLock createLock(CatalogLockContext context) { return new CatalogLock() { @Override public T runWithLock(String database, String table, Callable callable) diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index 2d64f88061c5..4dc8daa6007c 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -23,7 +23,8 @@ import org.apache.paimon.catalog.AbstractCatalog; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; -import org.apache.paimon.catalog.CatalogLock; +import org.apache.paimon.catalog.CatalogLockContext; +import org.apache.paimon.catalog.CatalogLockFactory; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; @@ -77,15 +78,12 @@ import java.util.stream.Collectors; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE; -import static org.apache.paimon.hive.HiveCatalogLock.LOCK_IDENTIFIER; import static org.apache.paimon.hive.HiveCatalogLock.acquireTimeout; import static org.apache.paimon.hive.HiveCatalogLock.checkMaxSleep; import static org.apache.paimon.hive.HiveCatalogOptions.HADOOP_CONF_DIR; import static org.apache.paimon.hive.HiveCatalogOptions.HIVE_CONF_DIR; import static org.apache.paimon.hive.HiveCatalogOptions.IDENTIFIER; import static org.apache.paimon.hive.HiveCatalogOptions.LOCATION_IN_PROPERTIES; -import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED; -import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE; import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE; import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -93,6 +91,7 @@ /** A catalog implementation for Hive. */ public class HiveCatalog extends AbstractCatalog { + private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class); // Reserved properties @@ -149,10 +148,15 @@ public HiveCatalog( } @Override - public Optional lockContext() { + public Optional defaultLockFactory() { + return Optional.of(new HiveCatalogLockFactory()); + } + + @Override + public Optional lockContext() { return Optional.of( - new HiveCatalogLock.HiveLockContext( - new SerializableHiveConf(hiveConf), clientClassName)); + new HiveCatalogLockContext( + new SerializableHiveConf(hiveConf), clientClassName, catalogOptions)); } @Override @@ -635,7 +639,7 @@ public static HiveConf createHiveConf( try (InputStream inputStream = hiveSite.getFileSystem(hadoopConf).open(hiveSite)) { hiveConf.addResource(inputStream, hiveSite.toString()); // trigger a read from the conf to avoid input stream is closed - isEmbeddedMetastore(hiveConf); + hiveConf.getVar(HiveConf.ConfVars.METASTOREURIS); } catch (IOException e) { throw new RuntimeException( "Failed to load hive-site.xml from specified path:" + hiveSite, e); @@ -656,10 +660,6 @@ public static HiveConf createHiveConf( } } - public static boolean isEmbeddedMetastore(HiveConf hiveConf) { - return isNullOrWhitespaceOnly(hiveConf.getVar(HiveConf.ConfVars.METASTOREURIS)); - } - public static Catalog createHiveCatalog(CatalogContext context) { HiveConf hiveConf = createHiveConf(context); Options options = context.options(); @@ -680,14 +680,6 @@ public static Catalog createHiveCatalog(CatalogContext context) { } catch (IOException e) { throw new UncheckedIOException(e); } - - /** Hive catalog only support hive lock. */ - if (options.getOptional(LOCK_ENABLED).orElse(false)) { - Optional lockType = options.getOptional(LOCK_TYPE); - if (!lockType.isPresent()) { - options.set(LOCK_TYPE, LOCK_IDENTIFIER); - } - } return new HiveCatalog( fileIO, hiveConf, diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java index c49cd020c654..8c3d3829ea28 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java @@ -39,7 +39,6 @@ import static org.apache.paimon.options.CatalogOptions.LOCK_ACQUIRE_TIMEOUT; import static org.apache.paimon.options.CatalogOptions.LOCK_CHECK_MAX_SLEEP; -import static org.apache.paimon.utils.Preconditions.checkArgument; /** Hive {@link CatalogLock}. */ public class HiveCatalogLock implements CatalogLock { @@ -114,28 +113,6 @@ public void close() { this.client.close(); } - /** Catalog lock factory for hive. */ - public static class HiveCatalogLockFactory implements LockFactory { - - private static final long serialVersionUID = 1L; - - @Override - public CatalogLock create(LockContext context) { - checkArgument(context instanceof HiveLockContext); - HiveLockContext hiveLockContext = (HiveLockContext) context; - HiveConf conf = hiveLockContext.hiveConf.conf(); - return new HiveCatalogLock( - HiveCatalog.createClient(conf, hiveLockContext.clientClassName), - checkMaxSleep(conf), - acquireTimeout(conf)); - } - - @Override - public String identifier() { - return LOCK_IDENTIFIER; - } - } - public static long checkMaxSleep(HiveConf conf) { return TimeUtils.parseDuration( conf.get( @@ -151,14 +128,4 @@ public static long acquireTimeout(HiveConf conf) { TimeUtils.getStringInMillis(LOCK_ACQUIRE_TIMEOUT.defaultValue()))) .toMillis(); } - - static class HiveLockContext implements LockContext { - private final SerializableHiveConf hiveConf; - private final String clientClassName; - - public HiveLockContext(SerializableHiveConf hiveConf, String clientClassName) { - this.hiveConf = hiveConf; - this.clientClassName = clientClassName; - } - } } diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLockContext.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLockContext.java new file mode 100644 index 000000000000..ecffd7f1e633 --- /dev/null +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLockContext.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.hive; + +import org.apache.paimon.catalog.CatalogLockContext; +import org.apache.paimon.options.Options; + +/** Hive {@link CatalogLockContext}. */ +public class HiveCatalogLockContext implements CatalogLockContext { + + private final SerializableHiveConf hiveConf; + private final String clientClassName; + private final Options options; + + public HiveCatalogLockContext( + SerializableHiveConf hiveConf, String clientClassName, Options options) { + this.hiveConf = hiveConf; + this.clientClassName = clientClassName; + this.options = options; + } + + @Override + public Options options() { + return options; + } + + public SerializableHiveConf hiveConf() { + return hiveConf; + } + + public String clientClassName() { + return clientClassName; + } +} diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLockFactory.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLockFactory.java new file mode 100644 index 000000000000..7c05ce3ee520 --- /dev/null +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLockFactory.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.hive; + +import org.apache.paimon.catalog.CatalogLock; +import org.apache.paimon.catalog.CatalogLockContext; +import org.apache.paimon.catalog.CatalogLockFactory; + +import org.apache.hadoop.hive.conf.HiveConf; + +import static org.apache.paimon.hive.HiveCatalogLock.LOCK_IDENTIFIER; +import static org.apache.paimon.hive.HiveCatalogLock.acquireTimeout; +import static org.apache.paimon.hive.HiveCatalogLock.checkMaxSleep; +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/** Catalog lock factory for hive. */ +public class HiveCatalogLockFactory implements CatalogLockFactory { + + private static final long serialVersionUID = 1L; + + @Override + public CatalogLock createLock(CatalogLockContext context) { + checkArgument(context instanceof HiveCatalogLockContext); + HiveCatalogLockContext hiveLockContext = (HiveCatalogLockContext) context; + HiveConf conf = hiveLockContext.hiveConf().conf(); + return new HiveCatalogLock( + HiveCatalog.createClient(conf, hiveLockContext.clientClassName()), + checkMaxSleep(conf), + acquireTimeout(conf)); + } + + @Override + public String identifier() { + return LOCK_IDENTIFIER; + } +} diff --git a/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index d4af13cc08e6..baab92184129 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -14,6 +14,4 @@ # limitations under the License. org.apache.paimon.hive.HiveCatalogFactory - -# Hive catalog lock factory -org.apache.paimon.hive.HiveCatalogLock$HiveCatalogLockFactory +org.apache.paimon.hive.HiveCatalogLockFactory diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java index 668c88f1f32c..4d9753babc34 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java @@ -20,6 +20,7 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogLock; +import org.apache.paimon.catalog.CatalogLockFactory; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.FlinkCatalog; import org.apache.paimon.hive.annotation.Minio; @@ -695,7 +696,7 @@ public void testHiveLock() throws InterruptedException { tEnv.executeSql("CREATE TABLE t (a INT)"); Catalog catalog = ((FlinkCatalog) tEnv.getCatalog(tEnv.getCurrentCatalog()).get()).catalog(); - CatalogLock.LockFactory lockFactory = catalog.lockFactory().get(); + CatalogLockFactory lockFactory = catalog.lockFactory().get(); AtomicInteger count = new AtomicInteger(0); List threads = new ArrayList<>(); @@ -710,7 +711,8 @@ public void testHiveLock() throws InterruptedException { Thread thread = new Thread( () -> { - CatalogLock lock = lockFactory.create(catalog.lockContext().get()); + CatalogLock lock = + lockFactory.createLock(catalog.lockContext().get()); for (int j = 0; j < 10; j++) { try { lock.runWithLock("test_db", "t", unsafeIncrement);