diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java index 933b76e39a56..ab20ff5ae877 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java @@ -21,6 +21,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.Timestamp; +import org.apache.paimon.fs.Path; import org.apache.paimon.stats.BinaryTableStats; import org.apache.paimon.stats.FieldStatsArraySerializer; import org.apache.paimon.types.ArrayType; @@ -240,6 +241,13 @@ public DataFileMeta upgrade(int newLevel) { creationTime); } + public List collectFiles(DataFilePathFactory pathFactory) { + List paths = new ArrayList<>(); + paths.add(pathFactory.toPath(fileName)); + extraFiles.forEach(f -> paths.add(pathFactory.toPath(f))); + return paths; + } + public DataFileMeta copy(List newExtraFiles) { return new DataFileMeta( fileName, diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java index a1025d9f6452..774c11f74d76 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java @@ -19,9 +19,11 @@ package org.apache.paimon.operation; import org.apache.paimon.Snapshot; +import org.apache.paimon.fs.FileIO; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.operation.metrics.CommitMetrics; import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.utils.FileStorePathFactory; import java.util.List; import java.util.Map; @@ -82,4 +84,8 @@ void overwrite( /** With metrics to measure commits. */ FileStoreCommit withMetrics(CommitMetrics metrics); + + FileStorePathFactory pathFactory(); + + FileIO fileIO(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 11982c4b578a..b4b230653d2f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -504,6 +504,16 @@ public FileStoreCommit withMetrics(CommitMetrics metrics) { return this; } + @Override + public FileStorePathFactory pathFactory() { + return pathFactory; + } + + @Override + public FileIO fileIO() { + return fileIO; + } + private void collectChanges( List commitMessages, List appendTableFiles, diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java index 8fe553e96c74..83914744b7fe 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java @@ -20,6 +20,11 @@ import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.consumer.ConsumerManager; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.fs.Path; +import org.apache.paimon.index.IndexFileMeta; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.DataFilePathFactory; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.metrics.MetricRegistry; import org.apache.paimon.operation.FileStoreCommit; @@ -29,7 +34,9 @@ import org.apache.paimon.operation.metrics.CommitMetrics; import org.apache.paimon.tag.TagAutoCreation; import org.apache.paimon.utils.ExecutorThreadFactory; +import org.apache.paimon.utils.FileUtils; import org.apache.paimon.utils.IOUtils; +import org.apache.paimon.utils.Pair; import org.apache.paimon.shade.guava30.com.google.common.util.concurrent.MoreExecutors; @@ -38,17 +45,23 @@ import javax.annotation.Nullable; +import java.io.IOException; +import java.io.UncheckedIOException; import java.time.Duration; import java.time.LocalDateTime; +import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.function.Predicate; import java.util.stream.Collectors; import static org.apache.paimon.CoreOptions.ExpireExecutionMode; @@ -230,11 +243,79 @@ public int filterAndCommitMultiple(List committables) { .sorted(Comparator.comparingLong(ManifestCommittable::identifier)) .collect(Collectors.toList()); if (retryCommittables.size() > 0) { + checkFilesExistence(retryCommittables); commitMultiple(retryCommittables); } return retryCommittables.size(); } + private void checkFilesExistence(List committables) { + List files = new ArrayList<>(); + Map, DataFilePathFactory> factoryMap = new HashMap<>(); + for (ManifestCommittable committable : committables) { + for (CommitMessage message : committable.fileCommittables()) { + CommitMessageImpl msg = (CommitMessageImpl) message; + DataFilePathFactory pathFactory = + factoryMap.computeIfAbsent( + Pair.of(message.partition(), message.bucket()), + k -> + commit.pathFactory() + .createDataFilePathFactory( + k.getKey(), k.getValue())); + + Consumer collector = f -> files.addAll(f.collectFiles(pathFactory)); + msg.newFilesIncrement().newFiles().forEach(collector); + msg.newFilesIncrement().changelogFiles().forEach(collector); + msg.compactIncrement().compactBefore().forEach(collector); + msg.compactIncrement().compactAfter().forEach(collector); + msg.indexIncrement().newIndexFiles().stream() + .map(IndexFileMeta::fileName) + .map(pathFactory::toPath) + .forEach(files::add); + } + } + + Predicate nonExists = + p -> { + try { + return !commit.fileIO().exists(p); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }; + List nonExistFiles; + try { + nonExistFiles = + FileUtils.COMMON_IO_FORK_JOIN_POOL + .submit( + () -> + files.parallelStream() + .filter(nonExists) + .collect(Collectors.toList())) + .get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e.getCause()); + } + + if (nonExistFiles.size() > 0) { + String message = + String.join( + "\n", + "Cannot recover from this checkpoint because some files in the snapshot that" + + " need to be resubmitted have been deleted:", + " " + + nonExistFiles.stream() + .map(Object::toString) + .collect(Collectors.joining(",")), + " The most likely reason is because you are recovering from a very old savepoint that" + + " contains some uncommitted files that have already been deleted."); + throw new RuntimeException(message); + } + } + private void expire(long partitionExpireIdentifier, ExecutorService executor) { if (expireError.get() != null) { throw new RuntimeException(expireError.get()); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java index b849fd0238f9..cba062894299 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java @@ -22,6 +22,7 @@ import org.apache.paimon.data.GenericRow; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.io.DataFilePathFactory; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.operation.Lock; import org.apache.paimon.options.Options; @@ -37,10 +38,12 @@ import org.apache.paimon.types.RowType; import org.apache.paimon.utils.ExceptionUtils; import org.apache.paimon.utils.FailingFileIO; +import org.apache.paimon.utils.FileStorePathFactory; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -54,6 +57,7 @@ import java.util.stream.LongStream; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for {@link TableCommit}. */ public class TableCommitTest { @@ -167,4 +171,66 @@ public void call(List committables) { @Override public void close() throws Exception {} } + + @Test + public void testRecoverDeletedFiles() throws Exception { + String path = tempDir.toString(); + RowType rowType = + RowType.of( + new DataType[] {DataTypes.INT(), DataTypes.BIGINT()}, + new String[] {"k", "v"}); + + Options conf = new Options(); + conf.set(CoreOptions.PATH, path); + TableSchema tableSchema = + SchemaUtils.forceCommit( + new SchemaManager(LocalFileIO.create(), new Path(path)), + new Schema( + rowType.getFields(), + Collections.emptyList(), + Collections.singletonList("k"), + conf.toMap(), + "")); + + FileStoreTable table = + FileStoreTableFactory.create( + LocalFileIO.create(), + new Path(path), + tableSchema, + new CatalogEnvironment(Lock.emptyFactory(), null, null)); + + String commitUser = UUID.randomUUID().toString(); + StreamTableWrite write = table.newWrite(commitUser); + write.write(GenericRow.of(0, 0L)); + List messages0 = write.prepareCommit(true, 0); + + write.write(GenericRow.of(1, 1L)); + List messages1 = write.prepareCommit(true, 1); + write.close(); + + StreamTableCommit commit = table.newCommit(commitUser); + commit.commit(0, messages0); + + // delete files for commit0 and commit1 + for (CommitMessageImpl message : + Arrays.asList( + (CommitMessageImpl) messages0.get(0), + (CommitMessageImpl) messages1.get(0))) { + DataFilePathFactory pathFactory = + new FileStorePathFactory(new Path(path)) + .createDataFilePathFactory(message.partition(), message.bucket()); + Path file = + message.newFilesIncrement().newFiles().get(0).collectFiles(pathFactory).get(0); + LocalFileIO.create().delete(file, true); + } + + // commit 0, fine, it will be filtered + commit.filterAndCommit(Collections.singletonMap(0L, messages0)); + + // commit 1, exception now. + assertThatThrownBy(() -> commit.filterAndCommit(Collections.singletonMap(1L, messages1))) + .hasMessageContaining( + "Cannot recover from this checkpoint because some files in the" + + " snapshot that need to be resubmitted have been deleted"); + } }