diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java index e458dad7c34e..8a14fb9d1148 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java @@ -38,6 +38,7 @@ import java.util.concurrent.Callable; import static org.apache.paimon.catalog.FileSystemCatalogOptions.CASE_SENSITIVE; +import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED; /** A catalog implementation for {@link FileIO}. */ public class FileSystemCatalog extends AbstractCatalog { @@ -58,7 +59,13 @@ public FileSystemCatalog(FileIO fileIO, Path warehouse, Options options) { @Override public Optional lockFactory() { - return Optional.empty(); + return lockEnabled() + ? Optional.of(FileSystemCatalogLock.createFactory(fileIO, warehouse)) + : Optional.empty(); + } + + private boolean lockEnabled() { + return Boolean.parseBoolean(catalogOptions.get(LOCK_ENABLED.key())); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogLock.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogLock.java new file mode 100644 index 000000000000..2a121ab8b0ca --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogLock.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.catalog; + +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.Callable; + +/** + * Allows users to lock table operations using file system. Only works for file system with atomic + * operation. + */ +public class FileSystemCatalogLock implements CatalogLock { + + private static final Logger LOG = LoggerFactory.getLogger(FileSystemCatalogLock.class); + private static final String LOCK_FILE_NAME = "lock"; + private FileIO fs; + private final transient Path lockFile; + private Path warehouse; + + public FileSystemCatalogLock(FileIO fileIO, Path warehouse) { + this.fs = fileIO; + this.warehouse = warehouse; + this.lockFile = new Path(warehouse, "lock"); + } + + @Override + public T runWithLock(String database, String table, Callable callable) throws Exception { + try { + synchronized (LOCK_FILE_NAME) { + while (!tryLock()) { + Thread.sleep(100); + } + return callable.call(); + } + } finally { + unlock(); + } + } + + public boolean tryLock() { + try { + synchronized (LOCK_FILE_NAME) { + if (fs.exists(this.lockFile)) { + fs.delete(this.lockFile, true); + } + acquireLock(); + return fs.exists(this.lockFile); + } + } catch (IOException e) { + LOG.info(generateLogInfo(LockState.FAILED_TO_ACQUIRE), e); + return false; + } + } + + public void unlock() { + synchronized (LOCK_FILE_NAME) { + try { + if (fs.exists(this.lockFile)) { + fs.delete(this.lockFile, true); + } + } catch (IOException io) { + throw new RuntimeException(generateLogInfo(LockState.FAILED_TO_RELEASE), io); + } + } + } + + private void acquireLock() { + try { + synchronized (LOCK_FILE_NAME) { + fs.newOutputStream(this.lockFile, false).close(); + } + } catch (IOException e) { + throw new RuntimeException(generateLogInfo(LockState.FAILED_TO_ACQUIRE), e); + } + } + + public String getLock() { + return this.lockFile.toString(); + } + + protected String generateLogInfo(LockState state) { + return String.format("%s lock at: %s", state.name(), getLock()); + } + + public static CatalogLock.Factory createFactory(FileIO fileIO, Path warehouse) { + return new FileSystemCatalogLockFactory(fileIO, warehouse); + } + + public void close() throws IOException { + synchronized (LOCK_FILE_NAME) { + try { + fs.delete(this.lockFile, true); + } catch (IOException e) { + throw new RuntimeException(generateLogInfo(LockState.FAILED_TO_RELEASE), e); + } + } + } + + private static class FileSystemCatalogLockFactory implements CatalogLock.Factory { + + private static final long serialVersionUID = 1L; + + private final FileIO fileIO; + private final Path warehouse; + + public FileSystemCatalogLockFactory(FileIO fileIO, Path warehouse) { + this.fileIO = fileIO; + this.warehouse = warehouse; + } + + @Override + public CatalogLock create() { + return new FileSystemCatalogLock(fileIO, warehouse); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/LockState.java b/paimon-core/src/main/java/org/apache/paimon/catalog/LockState.java new file mode 100644 index 000000000000..2feb22564125 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/LockState.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.catalog; + +/** Enum to signal the state of the lock. */ +public enum LockState { + ACQUIRING, + ACQUIRED, + ALREADY_ACQUIRED, + RELEASING, + RELEASED, + ALREADY_RELEASED, + FAILED_TO_ACQUIRE, + FAILED_TO_RELEASE +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java index 71f93553d49d..422b15d37610 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java @@ -20,6 +20,7 @@ import org.apache.paimon.catalog.AbstractCatalog; import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogLock; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.util.AbstractTestBase; import org.apache.paimon.fs.Path; @@ -38,6 +39,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -146,6 +149,57 @@ public void testCatalogOptionsInheritAndOverride() throws Exception { assertThat(tableOptions).doesNotContainKey("lock.enabled"); } + @Test + public void testFileSystemCatalogLock() throws Exception { + tEnv.executeSql( + String.format( + "CREATE CATALOG fs_with_lock WITH (" + + "'type'='paimon', " + + "'warehouse'='%s', " + + "'fs.allow-hadoop-fallback'='false'," + + "'lock.enabled'='true'" + + ")", + path)); + tEnv.useCatalog("fs_with_lock"); + CatalogLock.Factory lockFactory = + ((FlinkCatalog) tEnv.getCatalog(tEnv.getCurrentCatalog()).get()) + .catalog() + .lockFactory() + .get(); + + AtomicInteger count = new AtomicInteger(0); + List threads = new ArrayList<>(); + Callable unsafeIncrement = + () -> { + int nextCount = count.get() + 1; + Thread.sleep(1); + count.set(nextCount); + return null; + }; + for (int i = 0; i < 10; i++) { + Thread thread = + new Thread( + () -> { + CatalogLock lock = lockFactory.create(); + for (int j = 0; j < 10; j++) { + try { + lock.runWithLock("test_db", "t", unsafeIncrement); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + thread.start(); + threads.add(thread); + } + + for (Thread thread : threads) { + thread.join(); + } + + assertThat(count.get()).isEqualTo(100); + } + private void innerTestWriteRead() throws Exception { tEnv.executeSql("INSERT INTO T VALUES ('1', '2', '3'), ('4', '5', '6')").await(); BlockingIterator iterator =