From b1da320cc48b68c74beb72a2ca84fcab27cc291a Mon Sep 17 00:00:00 2001 From: TaoZex <45089228+TaoZex@users.noreply.github.com> Date: Sat, 10 Feb 2024 00:40:33 +0800 Subject: [PATCH 1/4] [Core]Add filesystem catalog lock --- .../apache/paimon/catalog/CatalogLock.java | 2 + .../paimon/catalog/FileSystemCatalog.java | 9 +- .../paimon/catalog/FileSystemCatalogLock.java | 221 ++++++++++++++++++ .../org/apache/paimon/catalog/LockState.java | 31 +++ .../apache/paimon/catalog/RetryHelper.java | 144 ++++++++++++ .../paimon/flink/FileSystemCatalogITCase.java | 73 ++++++ .../apache/paimon/hive/HiveCatalogLock.java | 5 + 7 files changed, 484 insertions(+), 1 deletion(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogLock.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/catalog/LockState.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/catalog/RetryHelper.java diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLock.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLock.java index 278b3ad631af..4aea5f90ddbf 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLock.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLock.java @@ -35,6 +35,8 @@ public interface CatalogLock extends Closeable { /** Run with catalog lock. The caller should tell catalog the database and table name. */ T runWithLock(String database, String table, Callable callable) throws Exception; + String getLock(); + /** Factory to create {@link CatalogLock}. */ interface Factory extends Serializable { CatalogLock create(); diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java index e458dad7c34e..8a14fb9d1148 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java @@ -38,6 +38,7 @@ import java.util.concurrent.Callable; import static org.apache.paimon.catalog.FileSystemCatalogOptions.CASE_SENSITIVE; +import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED; /** A catalog implementation for {@link FileIO}. */ public class FileSystemCatalog extends AbstractCatalog { @@ -58,7 +59,13 @@ public FileSystemCatalog(FileIO fileIO, Path warehouse, Options options) { @Override public Optional lockFactory() { - return Optional.empty(); + return lockEnabled() + ? Optional.of(FileSystemCatalogLock.createFactory(fileIO, warehouse)) + : Optional.empty(); + } + + private boolean lockEnabled() { + return Boolean.parseBoolean(catalogOptions.get(LOCK_ENABLED.key())); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogLock.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogLock.java new file mode 100644 index 000000000000..c6cf3f3a262f --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogLock.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.catalog; + +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.concurrent.Callable; + +/** FileSystemCatalogLock. */ +public class FileSystemCatalogLock implements CatalogLock { + + private static final Logger LOG = LoggerFactory.getLogger(FileSystemCatalogLock.class); + private static final String LOCK_FILE_NAME = "lock"; + private static final Object lockObject = new Object(); + private FileIO fs; + private final transient Path lockFile; + private Path warehouse; + private final long checkMaxSleep; + private final long acquireTimeout; + + private final RetryHelper lockRetryHelper; + + public FileSystemCatalogLock( + FileIO fileIO, Path warehouse, long checkMaxSleep, long acquireTimeout) { + this.fs = fileIO; + this.warehouse = warehouse; + this.lockFile = new Path(warehouse, "lock"); + this.checkMaxSleep = checkMaxSleep; + this.acquireTimeout = acquireTimeout; + lockRetryHelper = + new RetryHelper<>( + 1000, + 15, + 1000, + Arrays.asList(RuntimeException.class, InterruptedException.class), + "acquire lock"); + } + + // private void lock() { + // int cnt =15; + // while() + // while (true) { + // if (tryLock()) { + // break; + // } + // } + // } + + // public void lock() { + // // lockRetryHelper.start(() -> { + // // try { + // // if (!tryLock()) { + // // throw new RuntimeException("Unable to acquire the lock. Current + // lock + // // owner information : "); + // // } + // // return true; + // // } catch (Exception e) { + // // throw new RuntimeException(e); + // // } + // // }); + // int retryCount = 0; + // boolean acquired = false; + // + // while (retryCount <= 15) { + // try { + // acquired = tryLock(); + // if (acquired) { + // break; + // } + // LOG.info("Retrying to acquire lock..."); + // Thread.sleep(1000); + // retryCount++; + // } catch (InterruptedException e) { + // if (retryCount >= 15) { + // throw new RuntimeException("Unable to acquire lock, lock object ", e); + // } + // } + // } + // } + + public boolean tryLock() { + try { + synchronized (LOCK_FILE_NAME) { + // synchronized (lockObject) { + // Check whether lock is already expired, if so try to delete lock file + // if (fs.exists(this.lockFile) && checkIfExpired()) { + if (fs.exists(this.lockFile)) { + fs.delete(this.lockFile, true); + } + acquireLock(); + return fs.exists(this.lockFile); + } + } catch (IOException e) { + LOG.info(generateLogInfo(LockState.FAILED_TO_ACQUIRE), e); + return false; + } + } + + public void unlock() { + synchronized (LOCK_FILE_NAME) { + // synchronized (lockObject) { + try { + if (fs.exists(this.lockFile)) { + fs.delete(this.lockFile, true); + } + } catch (IOException io) { + throw new RuntimeException(generateLogInfo(LockState.FAILED_TO_RELEASE), io); + } + } + } + + private void acquireLock() { + try { + // synchronized (lockObject) { + synchronized (LOCK_FILE_NAME) { + // fs.writeFileUtf8(this.lockFile, ""); + fs.newOutputStream(this.lockFile, false).close(); + } + // fs.create(this.lockFile, false).close(); + } catch (IOException e) { + throw new RuntimeException(generateLogInfo(LockState.FAILED_TO_ACQUIRE), e); + } + } + // public void close() throws IOException { + // + // } + + // private boolean checkIfExpired() { + // if (lockTimeoutMinutes == 0) { + // return false; + // } + // try { + // long modificationTime = fs.getFileStatus(this.lockFile).getModificationTime(); + // if (System.currentTimeMillis() - modificationTime > lockTimeoutMinutes * 60 * + // 1000L) { + // return true; + // } + // } catch (IOException | HoodieIOException e) { + // LOG.error(generateLogStatement(LockState.ALREADY_RELEASED) + " failed to get + // lockFile's modification time", e); + // } + // return false; + // } + + public String getLock() { + return this.lockFile.toString(); + } + + protected String generateLogInfo(LockState state) { + return String.format("%s lock at: %s", state.name(), getLock()); + } + + public static CatalogLock.Factory createFactory(FileIO fileIO, Path warehouse) { + return new FileSystemCatalogLockFactory(fileIO, warehouse); + } + + @Override + public T runWithLock(String database, String table, Callable callable) throws Exception { + try { + while (!tryLock()) { + Thread.sleep(1000); + } + return callable.call(); + } finally { + unlock(); + } + } + + @Override + public void close() throws IOException { + synchronized (LOCK_FILE_NAME) { + // synchronized (lockObject) { + try { + fs.delete(this.lockFile, true); + } catch (IOException e) { + throw new RuntimeException(generateLogInfo(LockState.FAILED_TO_RELEASE), e); + } + } + } + + private static class FileSystemCatalogLockFactory implements CatalogLock.Factory { + + private static final long serialVersionUID = 1L; + + private final FileIO fileIO; + private final Path warehouse; + + public FileSystemCatalogLockFactory(FileIO fileIO, Path warehouse) { + this.fileIO = fileIO; + this.warehouse = warehouse; + } + + @Override + public CatalogLock create() { + return new FileSystemCatalogLock(fileIO, warehouse, 1000, 1000); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/LockState.java b/paimon-core/src/main/java/org/apache/paimon/catalog/LockState.java new file mode 100644 index 000000000000..2feb22564125 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/LockState.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.catalog; + +/** Enum to signal the state of the lock. */ +public enum LockState { + ACQUIRING, + ACQUIRED, + ALREADY_ACQUIRED, + RELEASING, + RELEASED, + ALREADY_RELEASED, + FAILED_TO_ACQUIRE, + FAILED_TO_RELEASE +} diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/RetryHelper.java b/paimon-core/src/main/java/org/apache/paimon/catalog/RetryHelper.java new file mode 100644 index 000000000000..753cece10ca0 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/RetryHelper.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.catalog; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; +import java.util.Random; + +/** + * Retry Helper implementation. + * + * @param Type of return value for checked function. + */ +public class RetryHelper implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(RetryHelper.class); + private static final List> DEFAULT_RETRY_EXCEPTIONS = + Arrays.asList(IOException.class, RuntimeException.class); + private transient CheckedFunction func; + private final int num; + private final long maxIntervalTime; + private final long initialIntervalTime; + private String taskInfo = "N/A"; + private List> retryExceptionsClasses; + + public RetryHelper( + long maxRetryIntervalMs, + int maxRetryNumbers, + long initialRetryIntervalMs, + List> retryExceptions, + String taskInfo) { + this.num = maxRetryNumbers; + this.initialIntervalTime = initialRetryIntervalMs; + this.maxIntervalTime = maxRetryIntervalMs; + this.retryExceptionsClasses = retryExceptions; + this.taskInfo = taskInfo; + } + + // public RetryHelper(long maxRetryIntervalMs, int maxRetryNumbers, long + // initialRetryIntervalMs, String retryExceptions, String taskInfo) { + // this(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptions); + // this.taskInfo = taskInfo; + // } + + public RetryHelper tryWith(CheckedFunction func) { + this.func = func; + return this; + } + + public T start(CheckedFunction func) throws R { + int retries = 0; + T functionResult = null; + + while (true) { + long waitTime = Math.min(getWaitTimeExp(retries), maxIntervalTime); + try { + functionResult = func.get(); + break; + } catch (Exception e) { + if (!checkIfExceptionInRetryList(e)) { + throw e; + } + if (retries++ >= num) { + String message = + "Still failed to " + taskInfo + " after retried " + num + " times."; + LOG.error(message, e); + throw e; + } + LOG.warn( + "Catch Exception for " + + taskInfo + + ", will retry after " + + waitTime + + " ms.", + e); + try { + Thread.sleep(waitTime); + } catch (InterruptedException ex) { + // ignore InterruptedException here + } + } + } + + if (retries > 0) { + LOG.info("Success to " + taskInfo + " after retried " + retries + " times."); + } + + return functionResult; + } + + public T start() throws R { + return start(this.func); + } + + private boolean checkIfExceptionInRetryList(Exception e) { + boolean inRetryList = false; + for (Class clazz : retryExceptionsClasses) { + if (clazz.isInstance(e)) { + inRetryList = true; + break; + } + } + return inRetryList; + } + + private long getWaitTimeExp(int retryCount) { + Random random = new Random(); + if (0 == retryCount) { + return initialIntervalTime; + } + + return (long) Math.pow(2, retryCount) * initialIntervalTime + random.nextInt(100); + } + + /** + * Checked function interface. + * + * @param Type of return value. + */ + @FunctionalInterface + public interface CheckedFunction extends Serializable { + T get() throws R; + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java index 71f93553d49d..76008f64f02d 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java @@ -20,6 +20,7 @@ import org.apache.paimon.catalog.AbstractCatalog; import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogLock; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.util.AbstractTestBase; import org.apache.paimon.fs.Path; @@ -38,6 +39,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -146,6 +149,76 @@ public void testCatalogOptionsInheritAndOverride() throws Exception { assertThat(tableOptions).doesNotContainKey("lock.enabled"); } + @Test + public void testFileSystemCatalogLock() throws Exception { + tEnv.executeSql( + String.format( + "CREATE CATALOG fs_with_lock WITH (" + + "'type'='paimon', " + + "'warehouse'='%s', " + + "'table-default.opt1'='value1', " + + "'table-default.opt2'='value2', " + + "'table-default.opt3'='value3', " + + "'fs.allow-hadoop-fallback'='false'," + + "'lock.enabled'='true'" + + ")", + path)); + tEnv.useCatalog("fs_with_lock"); + CatalogLock.Factory lockFactory = + ((FlinkCatalog) tEnv.getCatalog(tEnv.getCurrentCatalog()).get()) + .catalog() + .lockFactory() + .get(); + + AtomicInteger count = new AtomicInteger(0); + List threads = new ArrayList<>(); + Callable unsafeIncrement = + () -> { + int nextCount = count.get() + 1; + Thread.sleep(1); + count.set(nextCount); + return null; + }; + for (int i = 0; i < 10; i++) { + Thread thread = + new Thread( + () -> { + CatalogLock lock = lockFactory.create(); + for (int j = 0; j < 10; j++) { + try { + System.out.println( + Thread.currentThread().getName() + + "拿道锁" + + lock.getLock() + + "再进行第" + + j + + "次"); + lock.runWithLock("test_db", "t", unsafeIncrement); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + thread.setName(String.valueOf(i)); + thread.start(); + threads.add(thread); + // CatalogLock lock = lockFactory.create(); + // for (int j = 0; j < 10; j++) { + // try { + // lock.runWithLock("test_db", "t", unsafeIncrement); + // } catch (Exception e) { + // throw new RuntimeException(e); + // } + // } + } + + for (Thread thread : threads) { + thread.join(); + } + + assertThat(count.get()).isEqualTo(100); + } + private void innerTestWriteRead() throws Exception { tEnv.executeSql("INSERT INTO T VALUES ('1', '2', '3'), ('4', '5', '6')").await(); BlockingIterator iterator = diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java index 0f6cd4837ff8..c9e8c2dcf671 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java @@ -63,6 +63,11 @@ public T runWithLock(String database, String table, Callable callable) th } } + @Override + public String getLock() { + return null; + } + private long lock(String database, String table) throws UnknownHostException, TException, InterruptedException { final LockComponent lockComponent = From 25eb1207e51cded535e8f2491b250b86124524c3 Mon Sep 17 00:00:00 2001 From: TaoZex <45089228+TaoZex@users.noreply.github.com> Date: Sat, 10 Feb 2024 15:40:59 +0800 Subject: [PATCH 2/4] fix --- .../paimon/catalog/FileSystemCatalogLock.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogLock.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogLock.java index c6cf3f3a262f..9cd04515e87e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogLock.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogLock.java @@ -179,13 +179,15 @@ public static CatalogLock.Factory createFactory(FileIO fileIO, Path warehouse) { @Override public T runWithLock(String database, String table, Callable callable) throws Exception { - try { - while (!tryLock()) { - Thread.sleep(1000); + synchronized (LOCK_FILE_NAME) { + try { + while (!tryLock()) { + Thread.sleep(1000); + } + return callable.call(); + } finally { + unlock(); } - return callable.call(); - } finally { - unlock(); } } From 5545b76b7bfeb131e7633d142f11ab1ebf49582a Mon Sep 17 00:00:00 2001 From: TaoZex <45089228+TaoZex@users.noreply.github.com> Date: Sat, 10 Feb 2024 18:23:17 +0800 Subject: [PATCH 3/4] fix test --- .../paimon/catalog/FileSystemCatalogLock.java | 123 +++------------ .../apache/paimon/catalog/RetryHelper.java | 144 ------------------ .../paimon/flink/FileSystemCatalogITCase.java | 19 --- 3 files changed, 19 insertions(+), 267 deletions(-) delete mode 100644 paimon-core/src/main/java/org/apache/paimon/catalog/RetryHelper.java diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogLock.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogLock.java index 9cd04515e87e..2a121ab8b0ca 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogLock.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogLock.java @@ -25,88 +25,43 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.Arrays; import java.util.concurrent.Callable; -/** FileSystemCatalogLock. */ +/** + * Allows users to lock table operations using file system. Only works for file system with atomic + * operation. + */ public class FileSystemCatalogLock implements CatalogLock { private static final Logger LOG = LoggerFactory.getLogger(FileSystemCatalogLock.class); private static final String LOCK_FILE_NAME = "lock"; - private static final Object lockObject = new Object(); private FileIO fs; private final transient Path lockFile; private Path warehouse; - private final long checkMaxSleep; - private final long acquireTimeout; - - private final RetryHelper lockRetryHelper; - public FileSystemCatalogLock( - FileIO fileIO, Path warehouse, long checkMaxSleep, long acquireTimeout) { + public FileSystemCatalogLock(FileIO fileIO, Path warehouse) { this.fs = fileIO; this.warehouse = warehouse; this.lockFile = new Path(warehouse, "lock"); - this.checkMaxSleep = checkMaxSleep; - this.acquireTimeout = acquireTimeout; - lockRetryHelper = - new RetryHelper<>( - 1000, - 15, - 1000, - Arrays.asList(RuntimeException.class, InterruptedException.class), - "acquire lock"); } - // private void lock() { - // int cnt =15; - // while() - // while (true) { - // if (tryLock()) { - // break; - // } - // } - // } - - // public void lock() { - // // lockRetryHelper.start(() -> { - // // try { - // // if (!tryLock()) { - // // throw new RuntimeException("Unable to acquire the lock. Current - // lock - // // owner information : "); - // // } - // // return true; - // // } catch (Exception e) { - // // throw new RuntimeException(e); - // // } - // // }); - // int retryCount = 0; - // boolean acquired = false; - // - // while (retryCount <= 15) { - // try { - // acquired = tryLock(); - // if (acquired) { - // break; - // } - // LOG.info("Retrying to acquire lock..."); - // Thread.sleep(1000); - // retryCount++; - // } catch (InterruptedException e) { - // if (retryCount >= 15) { - // throw new RuntimeException("Unable to acquire lock, lock object ", e); - // } - // } - // } - // } + @Override + public T runWithLock(String database, String table, Callable callable) throws Exception { + try { + synchronized (LOCK_FILE_NAME) { + while (!tryLock()) { + Thread.sleep(100); + } + return callable.call(); + } + } finally { + unlock(); + } + } public boolean tryLock() { try { synchronized (LOCK_FILE_NAME) { - // synchronized (lockObject) { - // Check whether lock is already expired, if so try to delete lock file - // if (fs.exists(this.lockFile) && checkIfExpired()) { if (fs.exists(this.lockFile)) { fs.delete(this.lockFile, true); } @@ -121,7 +76,6 @@ public boolean tryLock() { public void unlock() { synchronized (LOCK_FILE_NAME) { - // synchronized (lockObject) { try { if (fs.exists(this.lockFile)) { fs.delete(this.lockFile, true); @@ -134,36 +88,13 @@ public void unlock() { private void acquireLock() { try { - // synchronized (lockObject) { synchronized (LOCK_FILE_NAME) { - // fs.writeFileUtf8(this.lockFile, ""); fs.newOutputStream(this.lockFile, false).close(); } - // fs.create(this.lockFile, false).close(); } catch (IOException e) { throw new RuntimeException(generateLogInfo(LockState.FAILED_TO_ACQUIRE), e); } } - // public void close() throws IOException { - // - // } - - // private boolean checkIfExpired() { - // if (lockTimeoutMinutes == 0) { - // return false; - // } - // try { - // long modificationTime = fs.getFileStatus(this.lockFile).getModificationTime(); - // if (System.currentTimeMillis() - modificationTime > lockTimeoutMinutes * 60 * - // 1000L) { - // return true; - // } - // } catch (IOException | HoodieIOException e) { - // LOG.error(generateLogStatement(LockState.ALREADY_RELEASED) + " failed to get - // lockFile's modification time", e); - // } - // return false; - // } public String getLock() { return this.lockFile.toString(); @@ -177,24 +108,8 @@ public static CatalogLock.Factory createFactory(FileIO fileIO, Path warehouse) { return new FileSystemCatalogLockFactory(fileIO, warehouse); } - @Override - public T runWithLock(String database, String table, Callable callable) throws Exception { - synchronized (LOCK_FILE_NAME) { - try { - while (!tryLock()) { - Thread.sleep(1000); - } - return callable.call(); - } finally { - unlock(); - } - } - } - - @Override public void close() throws IOException { synchronized (LOCK_FILE_NAME) { - // synchronized (lockObject) { try { fs.delete(this.lockFile, true); } catch (IOException e) { @@ -217,7 +132,7 @@ public FileSystemCatalogLockFactory(FileIO fileIO, Path warehouse) { @Override public CatalogLock create() { - return new FileSystemCatalogLock(fileIO, warehouse, 1000, 1000); + return new FileSystemCatalogLock(fileIO, warehouse); } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/RetryHelper.java b/paimon-core/src/main/java/org/apache/paimon/catalog/RetryHelper.java deleted file mode 100644 index 753cece10ca0..000000000000 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/RetryHelper.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.catalog; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.Serializable; -import java.util.Arrays; -import java.util.List; -import java.util.Random; - -/** - * Retry Helper implementation. - * - * @param Type of return value for checked function. - */ -public class RetryHelper implements Serializable { - private static final Logger LOG = LoggerFactory.getLogger(RetryHelper.class); - private static final List> DEFAULT_RETRY_EXCEPTIONS = - Arrays.asList(IOException.class, RuntimeException.class); - private transient CheckedFunction func; - private final int num; - private final long maxIntervalTime; - private final long initialIntervalTime; - private String taskInfo = "N/A"; - private List> retryExceptionsClasses; - - public RetryHelper( - long maxRetryIntervalMs, - int maxRetryNumbers, - long initialRetryIntervalMs, - List> retryExceptions, - String taskInfo) { - this.num = maxRetryNumbers; - this.initialIntervalTime = initialRetryIntervalMs; - this.maxIntervalTime = maxRetryIntervalMs; - this.retryExceptionsClasses = retryExceptions; - this.taskInfo = taskInfo; - } - - // public RetryHelper(long maxRetryIntervalMs, int maxRetryNumbers, long - // initialRetryIntervalMs, String retryExceptions, String taskInfo) { - // this(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptions); - // this.taskInfo = taskInfo; - // } - - public RetryHelper tryWith(CheckedFunction func) { - this.func = func; - return this; - } - - public T start(CheckedFunction func) throws R { - int retries = 0; - T functionResult = null; - - while (true) { - long waitTime = Math.min(getWaitTimeExp(retries), maxIntervalTime); - try { - functionResult = func.get(); - break; - } catch (Exception e) { - if (!checkIfExceptionInRetryList(e)) { - throw e; - } - if (retries++ >= num) { - String message = - "Still failed to " + taskInfo + " after retried " + num + " times."; - LOG.error(message, e); - throw e; - } - LOG.warn( - "Catch Exception for " - + taskInfo - + ", will retry after " - + waitTime - + " ms.", - e); - try { - Thread.sleep(waitTime); - } catch (InterruptedException ex) { - // ignore InterruptedException here - } - } - } - - if (retries > 0) { - LOG.info("Success to " + taskInfo + " after retried " + retries + " times."); - } - - return functionResult; - } - - public T start() throws R { - return start(this.func); - } - - private boolean checkIfExceptionInRetryList(Exception e) { - boolean inRetryList = false; - for (Class clazz : retryExceptionsClasses) { - if (clazz.isInstance(e)) { - inRetryList = true; - break; - } - } - return inRetryList; - } - - private long getWaitTimeExp(int retryCount) { - Random random = new Random(); - if (0 == retryCount) { - return initialIntervalTime; - } - - return (long) Math.pow(2, retryCount) * initialIntervalTime + random.nextInt(100); - } - - /** - * Checked function interface. - * - * @param Type of return value. - */ - @FunctionalInterface - public interface CheckedFunction extends Serializable { - T get() throws R; - } -} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java index 76008f64f02d..422b15d37610 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java @@ -156,9 +156,6 @@ public void testFileSystemCatalogLock() throws Exception { "CREATE CATALOG fs_with_lock WITH (" + "'type'='paimon', " + "'warehouse'='%s', " - + "'table-default.opt1'='value1', " - + "'table-default.opt2'='value2', " - + "'table-default.opt3'='value3', " + "'fs.allow-hadoop-fallback'='false'," + "'lock.enabled'='true'" + ")", @@ -186,30 +183,14 @@ public void testFileSystemCatalogLock() throws Exception { CatalogLock lock = lockFactory.create(); for (int j = 0; j < 10; j++) { try { - System.out.println( - Thread.currentThread().getName() - + "拿道锁" - + lock.getLock() - + "再进行第" - + j - + "次"); lock.runWithLock("test_db", "t", unsafeIncrement); } catch (Exception e) { throw new RuntimeException(e); } } }); - thread.setName(String.valueOf(i)); thread.start(); threads.add(thread); - // CatalogLock lock = lockFactory.create(); - // for (int j = 0; j < 10; j++) { - // try { - // lock.runWithLock("test_db", "t", unsafeIncrement); - // } catch (Exception e) { - // throw new RuntimeException(e); - // } - // } } for (Thread thread : threads) { From 2705575081ace768d59ca0deefc04147cfc0a03d Mon Sep 17 00:00:00 2001 From: TaoZex <45089228+TaoZex@users.noreply.github.com> Date: Sat, 10 Feb 2024 18:31:04 +0800 Subject: [PATCH 4/4] fix --- .../src/main/java/org/apache/paimon/catalog/CatalogLock.java | 2 -- .../main/java/org/apache/paimon/hive/HiveCatalogLock.java | 5 ----- 2 files changed, 7 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLock.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLock.java index 4aea5f90ddbf..278b3ad631af 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLock.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLock.java @@ -35,8 +35,6 @@ public interface CatalogLock extends Closeable { /** Run with catalog lock. The caller should tell catalog the database and table name. */ T runWithLock(String database, String table, Callable callable) throws Exception; - String getLock(); - /** Factory to create {@link CatalogLock}. */ interface Factory extends Serializable { CatalogLock create(); diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java index c9e8c2dcf671..0f6cd4837ff8 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java @@ -63,11 +63,6 @@ public T runWithLock(String database, String table, Callable callable) th } } - @Override - public String getLock() { - return null; - } - private long lock(String database, String table) throws UnknownHostException, TException, InterruptedException { final LockComponent lockComponent =