diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java new file mode 100644 index 000000000000..1b7b2c8bb96d --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.manifest; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.utils.FileStorePathFactory; +import org.apache.paimon.utils.FileUtils; +import org.apache.paimon.utils.Preconditions; + +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +/** Entry representing a file. */ +public interface FileEntry { + + FileKind kind(); + + BinaryRow partition(); + + int bucket(); + + int level(); + + String fileName(); + + Identifier identifier(); + + BinaryRow minKey(); + + BinaryRow maxKey(); + + /** + * The same {@link Identifier} indicates that the {@link ManifestEntry} refers to the same data + * file. + */ + class Identifier { + public final BinaryRow partition; + public final int bucket; + public final int level; + public final String fileName; + + /* Cache the hash code for the string */ + private Integer hash; + + public Identifier(BinaryRow partition, int bucket, int level, String fileName) { + this.partition = partition; + this.bucket = bucket; + this.level = level; + this.fileName = fileName; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof Identifier)) { + return false; + } + Identifier that = (Identifier) o; + return Objects.equals(partition, that.partition) + && bucket == that.bucket + && level == that.level + && Objects.equals(fileName, that.fileName); + } + + @Override + public int hashCode() { + if (hash == null) { + hash = Objects.hash(partition, bucket, level, fileName); + } + return hash; + } + + @Override + public String toString() { + return String.format("{%s, %d, %d, %s}", partition, bucket, level, fileName); + } + + public String toString(FileStorePathFactory pathFactory) { + return pathFactory.getPartitionString(partition) + + ", bucket " + + bucket + + ", level " + + level + + ", file " + + fileName; + } + } + + static Collection mergeEntries(Iterable entries) { + LinkedHashMap map = new LinkedHashMap<>(); + mergeEntries(entries, map); + return map.values(); + } + + static void mergeEntries( + ManifestFile manifestFile, + List manifestFiles, + Map map) { + List>> manifestReadFutures = + manifestFiles.stream() + .map( + manifestFileMeta -> + CompletableFuture.supplyAsync( + () -> + manifestFile.read( + manifestFileMeta.fileName()), + FileUtils.COMMON_IO_FORK_JOIN_POOL)) + .collect(Collectors.toList()); + + try { + for (CompletableFuture> taskResult : manifestReadFutures) { + mergeEntries(taskResult.get(), map); + } + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException("Failed to read manifest file.", e); + } + } + + static void mergeEntries(Iterable entries, Map map) { + for (T entry : entries) { + Identifier identifier = entry.identifier(); + switch (entry.kind()) { + case ADD: + Preconditions.checkState( + !map.containsKey(identifier), + "Trying to add file %s which is already added.", + identifier); + map.put(identifier, entry); + break; + case DELETE: + // each dataFile will only be added once and deleted once, + // if we know that it is added before then both add and delete entry can be + // removed because there won't be further operations on this file, + // otherwise we have to keep the delete entry because the add entry must be + // in the previous manifest files + if (map.containsKey(identifier)) { + map.remove(identifier); + } else { + map.put(identifier, entry); + } + break; + default: + throw new UnsupportedOperationException( + "Unknown value kind " + entry.kind().name()); + } + } + } + + static void assertNoDelete(Collection entries) { + for (T entry : entries) { + Preconditions.checkState( + entry.kind() != FileKind.DELETE, + "Trying to delete file %s which is not previously added.", + entry.fileName()); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java index 25b6919f0d28..34ad169ff207 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java @@ -19,29 +19,26 @@ package org.apache.paimon.manifest; import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.InternalRow; import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.partition.PartitionPredicate; import org.apache.paimon.types.DataField; import org.apache.paimon.types.IntType; import org.apache.paimon.types.RowType; import org.apache.paimon.types.TinyIntType; -import org.apache.paimon.utils.FileStorePathFactory; -import org.apache.paimon.utils.FileUtils; -import org.apache.paimon.utils.Preconditions; +import org.apache.paimon.utils.Filter; + +import javax.annotation.Nullable; import java.util.ArrayList; -import java.util.Collection; -import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; import java.util.Objects; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.stream.Collectors; +import java.util.function.Function; import static org.apache.paimon.utils.SerializationUtils.newBytesType; /** Entry of a manifest file, representing an addition / deletion of a data file. */ -public class ManifestEntry { +public class ManifestEntry implements FileEntry { private final FileKind kind; // for tables without partition this field should be a row with 0 columns (not null) @@ -59,18 +56,41 @@ public ManifestEntry( this.file = file; } + @Override public FileKind kind() { return kind; } + @Override public BinaryRow partition() { return partition; } + @Override public int bucket() { return bucket; } + @Override + public int level() { + return file.level(); + } + + @Override + public String fileName() { + return file.fileName(); + } + + @Override + public BinaryRow minKey() { + return file.minKey(); + } + + @Override + public BinaryRow maxKey() { + return file.maxKey(); + } + public int totalBuckets() { return totalBuckets; } @@ -79,6 +99,7 @@ public DataFileMeta file() { return file; } + @Override public Identifier identifier() { return new Identifier(partition, bucket, file.level(), file.fileName()); } @@ -116,129 +137,56 @@ public String toString() { return String.format("{%s, %s, %d, %d, %s}", kind, partition, bucket, totalBuckets, file); } - public static Collection mergeEntries(Iterable entries) { - LinkedHashMap map = new LinkedHashMap<>(); - mergeEntries(entries, map); - return map.values(); - } - - public static void mergeEntries( - ManifestFile manifestFile, - List manifestFiles, - Map map) { - List>> manifestReadFutures = - manifestFiles.stream() - .map( - manifestFileMeta -> - CompletableFuture.supplyAsync( - () -> - manifestFile.read( - manifestFileMeta.fileName()), - FileUtils.COMMON_IO_FORK_JOIN_POOL)) - .collect(Collectors.toList()); - - try { - for (CompletableFuture> taskResult : manifestReadFutures) { - mergeEntries(taskResult.get(), map); - } - } catch (ExecutionException | InterruptedException e) { - throw new RuntimeException("Failed to read manifest file.", e); + /** + * According to the {@link ManifestCacheFilter}, entry that needs to be cached will be retained, + * so the entry that will not be accessed in the future will not be cached. + * + *

Implemented to {@link InternalRow} is for performance (No deserialization). + */ + public static Filter createCacheRowFilter( + @Nullable ManifestCacheFilter manifestCacheFilter, int numOfBuckets) { + if (manifestCacheFilter == null) { + return Filter.alwaysTrue(); } - } - public static void mergeEntries( - Iterable entries, Map map) { - for (ManifestEntry entry : entries) { - ManifestEntry.Identifier identifier = entry.identifier(); - switch (entry.kind()) { - case ADD: - Preconditions.checkState( - !map.containsKey(identifier), - "Trying to add file %s which is already added.", - identifier); - map.put(identifier, entry); - break; - case DELETE: - // each dataFile will only be added once and deleted once, - // if we know that it is added before then both add and delete entry can be - // removed because there won't be further operations on this file, - // otherwise we have to keep the delete entry because the add entry must be - // in the previous manifest files - if (map.containsKey(identifier)) { - map.remove(identifier); - } else { - map.put(identifier, entry); - } - break; - default: - throw new UnsupportedOperationException( - "Unknown value kind " + entry.kind().name()); + Function partitionGetter = + ManifestEntrySerializer.partitionGetter(); + Function bucketGetter = ManifestEntrySerializer.bucketGetter(); + Function totalBucketGetter = + ManifestEntrySerializer.totalBucketGetter(); + return row -> { + if (numOfBuckets != totalBucketGetter.apply(row)) { + return true; } - } - } - public static void assertNoDelete(Collection entries) { - for (ManifestEntry entry : entries) { - Preconditions.checkState( - entry.kind() != FileKind.DELETE, - "Trying to delete file %s which is not previously added.", - entry.file().fileName()); - } + return manifestCacheFilter.test(partitionGetter.apply(row), bucketGetter.apply(row)); + }; } /** - * The same {@link Identifier} indicates that the {@link ManifestEntry} refers to the same data - * file. + * Read the corresponding entries based on the current required partition and bucket. + * + *

Implemented to {@link InternalRow} is for performance (No deserialization). */ - public static class Identifier { - public final BinaryRow partition; - public final int bucket; - public final int level; - public final String fileName; - - /* Cache the hash code for the string */ - private Integer hash; - - private Identifier(BinaryRow partition, int bucket, int level, String fileName) { - this.partition = partition; - this.bucket = bucket; - this.level = level; - this.fileName = fileName; - } - - @Override - public boolean equals(Object o) { - if (!(o instanceof Identifier)) { + public static Filter createEntryRowFilter( + @Nullable PartitionPredicate partitionFilter, + @Nullable Filter bucketFilter, + int numOfBuckets) { + Function partitionGetter = + ManifestEntrySerializer.partitionGetter(); + Function bucketGetter = ManifestEntrySerializer.bucketGetter(); + Function totalBucketGetter = + ManifestEntrySerializer.totalBucketGetter(); + return row -> { + if ((partitionFilter != null && !partitionFilter.test(partitionGetter.apply(row)))) { return false; } - Identifier that = (Identifier) o; - return Objects.equals(partition, that.partition) - && bucket == that.bucket - && level == that.level - && Objects.equals(fileName, that.fileName); - } - @Override - public int hashCode() { - if (hash == null) { - hash = Objects.hash(partition, bucket, level, fileName); + if (bucketFilter != null && numOfBuckets == totalBucketGetter.apply(row)) { + return bucketFilter.test(bucketGetter.apply(row)); } - return hash; - } - @Override - public String toString() { - return String.format("{%s, %d, %d, %s}", partition, bucket, level, fileName); - } - - public String toString(FileStorePathFactory pathFactory) { - return pathFactory.getPartitionString(partition) - + ", bucket " - + bucket - + ", level " - + level - + ", file " - + fileName; - } + return true; + }; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java index 6070611da50e..a434e2acd28c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java @@ -188,5 +188,16 @@ public ManifestFile create() { suggestedFileSize, cache); } + + public ObjectsFile createSimpleFileEntryReader() { + RowType entryType = VersionedObjectSerializer.versionType(ManifestEntry.schema()); + return new ObjectsFile<>( + fileIO, + new SimpleFileEntrySerializer(), + fileFormat.createReaderFactory(entryType), + fileFormat.createWriterFactory(entryType), + pathFactory.manifestFileFactory(), + cache); + } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java index 5910ad75f077..beecb4482d25 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java @@ -19,7 +19,7 @@ package org.apache.paimon.manifest; import org.apache.paimon.data.BinaryRow; -import org.apache.paimon.manifest.ManifestEntry.Identifier; +import org.apache.paimon.manifest.FileEntry.Identifier; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.stats.BinaryTableStats; @@ -215,7 +215,7 @@ private static void mergeCandidates( } Map map = new LinkedHashMap<>(); - ManifestEntry.mergeEntries(manifestFile, candidates, map); + FileEntry.mergeEntries(manifestFile, candidates, map); if (!map.isEmpty()) { List merged = manifestFile.write(new ArrayList<>(map.values())); result.addAll(merged); @@ -270,7 +270,7 @@ public static Optional> tryFullCompaction( // 2.1. try to skip base files by partition filter Map deltaMerged = new LinkedHashMap<>(); - ManifestEntry.mergeEntries(manifestFile, delta, deltaMerged); + FileEntry.mergeEntries(manifestFile, delta, deltaMerged); List result = new ArrayList<>(); int j = 0; @@ -314,7 +314,7 @@ public static Optional> tryFullCompaction( Map fullMerged = new LinkedHashMap<>(); for (; j < base.size(); j++) { ManifestFileMeta file = base.get(j); - ManifestEntry.mergeEntries(manifestFile.read(file.fileName), fullMerged); + FileEntry.mergeEntries(manifestFile.read(file.fileName), fullMerged); boolean contains = false; for (Identifier identifier : deleteEntries) { if (fullMerged.containsKey(identifier)) { @@ -334,8 +334,8 @@ public static Optional> tryFullCompaction( // 2.3. merge base files - ManifestEntry.mergeEntries(manifestFile, base.subList(j, base.size()), fullMerged); - ManifestEntry.mergeEntries(deltaMerged.values(), fullMerged); + FileEntry.mergeEntries(manifestFile, base.subList(j, base.size()), fullMerged); + FileEntry.mergeEntries(deltaMerged.values(), fullMerged); // 2.4. write new manifest files diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java new file mode 100644 index 000000000000..3e8b88755a79 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.manifest; + +import org.apache.paimon.data.BinaryRow; + +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** A simple {@link FileEntry} only contains identifier and min max key. */ +public class SimpleFileEntry implements FileEntry { + + private final FileKind kind; + private final BinaryRow partition; + private final int bucket; + private final int level; + private final String fileName; + private final BinaryRow minKey; + private final BinaryRow maxKey; + + public SimpleFileEntry( + FileKind kind, + BinaryRow partition, + int bucket, + int level, + String fileName, + BinaryRow minKey, + BinaryRow maxKey) { + this.kind = kind; + this.partition = partition; + this.bucket = bucket; + this.level = level; + this.fileName = fileName; + this.minKey = minKey; + this.maxKey = maxKey; + } + + public static SimpleFileEntry from(ManifestEntry entry) { + return new SimpleFileEntry( + entry.kind(), + entry.partition(), + entry.bucket(), + entry.level(), + entry.fileName(), + entry.minKey(), + entry.maxKey()); + } + + public static List from(List entries) { + return entries.stream().map(SimpleFileEntry::from).collect(Collectors.toList()); + } + + @Override + public FileKind kind() { + return kind; + } + + @Override + public BinaryRow partition() { + return partition; + } + + @Override + public int bucket() { + return bucket; + } + + @Override + public int level() { + return level; + } + + @Override + public String fileName() { + return fileName; + } + + @Override + public Identifier identifier() { + return new Identifier(partition, bucket, level, fileName); + } + + @Override + public BinaryRow minKey() { + return minKey; + } + + @Override + public BinaryRow maxKey() { + return maxKey; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SimpleFileEntry that = (SimpleFileEntry) o; + return bucket == that.bucket + && level == that.level + && kind == that.kind + && Objects.equals(partition, that.partition) + && Objects.equals(fileName, that.fileName) + && Objects.equals(minKey, that.minKey) + && Objects.equals(maxKey, that.maxKey); + } + + @Override + public int hashCode() { + return Objects.hash(kind, partition, bucket, level, fileName, minKey, maxKey); + } + + @Override + public String toString() { + return "{" + + "kind=" + + kind + + ", partition=" + + partition + + ", bucket=" + + bucket + + ", level=" + + level + + ", fileName='" + + fileName + + '\'' + + ", minKey=" + + minKey + + ", maxKey=" + + maxKey + + '}'; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntrySerializer.java b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntrySerializer.java new file mode 100644 index 000000000000..f23f167f7ef2 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntrySerializer.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.manifest; + +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.utils.VersionedObjectSerializer; + +import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow; + +/** A {@link VersionedObjectSerializer} for {@link SimpleFileEntry}, only supports reading. */ +public class SimpleFileEntrySerializer extends VersionedObjectSerializer { + + private static final long serialVersionUID = 1L; + + private final int version; + + public SimpleFileEntrySerializer() { + super(ManifestEntry.schema()); + this.version = new ManifestEntrySerializer().getVersion(); + } + + @Override + public int getVersion() { + return version; + } + + @Override + public InternalRow convertTo(SimpleFileEntry meta) { + throw new UnsupportedOperationException("Only supports convert from row."); + } + + @Override + public SimpleFileEntry convertFrom(int version, InternalRow row) { + if (this.version != version) { + throw new IllegalArgumentException("Unsupported version: " + version); + } + + InternalRow file = row.getRow(4, 3); + return new SimpleFileEntry( + FileKind.fromByteValue(row.getByte(0)), + deserializeBinaryRow(row.getBinary(1)), + row.getInt(2), + file.getInt(10), + file.getString(0).toString(), + deserializeBinaryRow(file.getBinary(3)), + deserializeBinaryRow(file.getBinary(4))); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java index 81b173321a2d..e6c95e8854b5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java @@ -20,13 +20,13 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.data.BinaryRow; -import org.apache.paimon.data.InternalRow; +import org.apache.paimon.manifest.FileEntry; import org.apache.paimon.manifest.ManifestCacheFilter; import org.apache.paimon.manifest.ManifestEntry; -import org.apache.paimon.manifest.ManifestEntrySerializer; import org.apache.paimon.manifest.ManifestFile; import org.apache.paimon.manifest.ManifestFileMeta; import org.apache.paimon.manifest.ManifestList; +import org.apache.paimon.manifest.SimpleFileEntry; import org.apache.paimon.operation.metrics.ScanMetrics; import org.apache.paimon.operation.metrics.ScanStats; import org.apache.paimon.partition.PartitionPredicate; @@ -54,6 +54,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.paimon.utils.Preconditions.checkState; @@ -67,10 +68,12 @@ public abstract class AbstractFileStoreScan implements FileStoreScan { private final ManifestList manifestList; private final int numOfBuckets; private final boolean checkNumOfBuckets; + private final Integer scanManifestParallelism; private final ConcurrentMap tableSchemas; private final SchemaManager schemaManager; protected final ScanBucketFilter bucketKeyFilter; + private final String branchName; private PartitionPredicate partitionFilter; private Snapshot specifiedSnapshot = null; @@ -81,10 +84,7 @@ public abstract class AbstractFileStoreScan implements FileStoreScan { private Long dataFileTimeMills = null; private ManifestCacheFilter manifestCacheFilter = null; - private final Integer scanManifestParallelism; - private ScanMetrics scanMetrics = null; - private String branchName; public AbstractFileStoreScan( RowType partitionType, @@ -112,21 +112,13 @@ public AbstractFileStoreScan( @Override public FileStoreScan withPartitionFilter(Predicate predicate) { - if (partitionType.getFieldCount() > 0 && predicate != null) { - this.partitionFilter = PartitionPredicate.fromPredicate(predicate); - } else { - this.partitionFilter = null; - } + this.partitionFilter = PartitionPredicate.fromPredicate(partitionType, predicate); return this; } @Override public FileStoreScan withPartitionFilter(List partitions) { - if (partitionType.getFieldCount() > 0 && !partitions.isEmpty()) { - this.partitionFilter = PartitionPredicate.fromMultiple(partitionType, partitions); - } else { - this.partitionFilter = null; - } + this.partitionFilter = PartitionPredicate.fromMultiple(partitionType, partitions); return this; } @@ -216,7 +208,7 @@ public FileStoreScan withMetrics(ScanMetrics metrics) { @Override public Plan plan() { - Pair> planResult = doPlan(this::readManifestFileMeta); + Pair> planResult = doPlan(); final Snapshot readSnapshot = planResult.getLeft(); final List files = planResult.getRight(); @@ -246,44 +238,34 @@ public List files() { }; } - private Pair> doPlan( - Function> readManifest) { + @Override + public List readSimpleEntries() { + List manifests = readManifests().getRight(); + Collection mergedEntries = + readAndMergeFileEntries( + manifests, this::readSimpleEntries, Filter.alwaysTrue(), new AtomicLong()); + return new ArrayList<>(mergedEntries); + } + + private Pair> doPlan() { long started = System.nanoTime(); - List manifests = specifiedManifests; - Snapshot snapshot = null; - if (manifests == null) { - snapshot = - specifiedSnapshot == null - ? snapshotManager.latestSnapshot(branchName) - : specifiedSnapshot; - if (snapshot == null) { - manifests = Collections.emptyList(); - } else { - manifests = readManifests(snapshot); - } - } + Pair> snapshotListPair = readManifests(); + Snapshot snapshot = snapshotListPair.getLeft(); + List manifests = snapshotListPair.getRight(); long startDataFiles = manifests.stream().mapToLong(f -> f.numAddedFiles() + f.numDeletedFiles()).sum(); AtomicLong cntEntries = new AtomicLong(0); - Iterable entries = - ScanParallelExecutor.parallelismBatchIterable( - files -> { - List entryList = - files.parallelStream() - .filter(this::filterManifestFileMeta) - .flatMap(m -> readManifest.apply(m).stream()) - .filter(this::filterUnmergedManifestEntry) - .collect(Collectors.toList()); - cntEntries.getAndAdd(entryList.size()); - return entryList; - }, + + Collection mergedEntries = + readAndMergeFileEntries( manifests, - scanManifestParallelism); + this::readManifestFileMeta, + this::filterUnmergedManifestEntry, + cntEntries); List files = new ArrayList<>(); - Collection mergedEntries = ManifestEntry.mergeEntries(entries); long skippedByPartitionAndStats = startDataFiles - cntEntries.get(); for (ManifestEntry file : mergedEntries) { if (checkNumOfBuckets && file.totalBuckets() != numOfBuckets) { @@ -349,6 +331,50 @@ private Pair> doPlan( return Pair.of(snapshot, files); } + public Collection readAndMergeFileEntries( + List manifests, + Function> manifestReader, + @Nullable Filter filterUnmergedEntry, + @Nullable AtomicLong readEntries) { + Iterable entries = + ScanParallelExecutor.parallelismBatchIterable( + files -> { + Stream stream = + files.parallelStream() + .filter(this::filterManifestFileMeta) + .flatMap(m -> manifestReader.apply(m).stream()); + if (filterUnmergedEntry != null) { + stream = stream.filter(filterUnmergedEntry::test); + } + List entryList = stream.collect(Collectors.toList()); + if (readEntries != null) { + readEntries.getAndAdd(entryList.size()); + } + return entryList; + }, + manifests, + scanManifestParallelism); + + return FileEntry.mergeEntries(entries); + } + + private Pair> readManifests() { + List manifests = specifiedManifests; + Snapshot snapshot = null; + if (manifests == null) { + snapshot = + specifiedSnapshot == null + ? snapshotManager.latestSnapshot(branchName) + : specifiedSnapshot; + if (snapshot == null) { + manifests = Collections.emptyList(); + } else { + manifests = readManifests(snapshot); + } + } + return Pair.of(snapshot, manifests); + } + private List readManifests(Snapshot snapshot) { switch (scanMode) { case ALL: @@ -426,47 +452,25 @@ private boolean filterMergedManifestEntry(ManifestEntry entry) { private List readManifestFileMeta(ManifestFileMeta manifest) { return manifestFileFactory .create() - .read(manifest.fileName(), manifestCacheRowFilter(), manifestEntryRowFilter()); + .read( + manifest.fileName(), + ManifestEntry.createCacheRowFilter(manifestCacheFilter, numOfBuckets), + ManifestEntry.createEntryRowFilter( + partitionFilter, bucketFilter, numOfBuckets)); } /** Note: Keep this thread-safe. */ - private Filter manifestEntryRowFilter() { - Function partitionGetter = - ManifestEntrySerializer.partitionGetter(); - Function bucketGetter = ManifestEntrySerializer.bucketGetter(); - Function totalBucketGetter = - ManifestEntrySerializer.totalBucketGetter(); - return row -> { - if ((partitionFilter != null && !partitionFilter.test(partitionGetter.apply(row)))) { - return false; - } - - if (bucketFilter != null && numOfBuckets == totalBucketGetter.apply(row)) { - return bucketFilter.test(bucketGetter.apply(row)); - } - - return true; - }; - } - - /** Note: Keep this thread-safe. */ - private Filter manifestCacheRowFilter() { - if (manifestCacheFilter == null) { - return Filter.alwaysTrue(); - } - - Function partitionGetter = - ManifestEntrySerializer.partitionGetter(); - Function bucketGetter = ManifestEntrySerializer.bucketGetter(); - Function totalBucketGetter = - ManifestEntrySerializer.totalBucketGetter(); - return row -> { - if (numOfBuckets != totalBucketGetter.apply(row)) { - return true; - } - - return manifestCacheFilter.test(partitionGetter.apply(row), bucketGetter.apply(row)); - }; + private List readSimpleEntries(ManifestFileMeta manifest) { + return manifestFileFactory + .createSimpleFileEntryReader() + .read( + manifest.fileName(), + // use filter for ManifestEntry + // currently, projection is not pushed down to file format + // see SimpleFileEntrySerializer + ManifestEntry.createCacheRowFilter(manifestCacheFilter, numOfBuckets), + ManifestEntry.createEntryRowFilter( + partitionFilter, bucketFilter, numOfBuckets)); } // ------------------------------------------------------------------------ diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java index 2f7817f03203..79a64b049916 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java @@ -24,6 +24,7 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.index.IndexFileHandler; import org.apache.paimon.index.IndexFileMeta; +import org.apache.paimon.manifest.FileEntry; import org.apache.paimon.manifest.IndexManifestEntry; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.ManifestFile; @@ -256,7 +257,7 @@ protected Collection readMergedDataFiles(Snapshot snapshot) throw for (String manifest : files) { List entries; entries = manifestFile.readWithIOException(manifest); - ManifestEntry.mergeEntries(entries, map); + FileEntry.mergeEntries(entries, map); } return map.values(); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 0264f1e45a3b..a64b8239b25c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -26,6 +26,7 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFilePathFactory; +import org.apache.paimon.manifest.FileEntry; import org.apache.paimon.manifest.FileKind; import org.apache.paimon.manifest.IndexManifestEntry; import org.apache.paimon.manifest.IndexManifestFile; @@ -34,6 +35,7 @@ import org.apache.paimon.manifest.ManifestFile; import org.apache.paimon.manifest.ManifestFileMeta; import org.apache.paimon.manifest.ManifestList; +import org.apache.paimon.manifest.SimpleFileEntry; import org.apache.paimon.operation.metrics.CommitMetrics; import org.apache.paimon.operation.metrics.CommitStats; import org.apache.paimon.options.MemorySize; @@ -48,7 +50,6 @@ import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.Preconditions; -import org.apache.paimon.utils.RowDataToObjectArrayConverter; import org.apache.paimon.utils.SnapshotManager; import org.slf4j.Logger; @@ -101,7 +102,6 @@ public class FileStoreCommitImpl implements FileStoreCommit { private final SchemaManager schemaManager; private final String commitUser; private final RowType partitionType; - private final RowDataToObjectArrayConverter partitionObjectConverter; private final FileStorePathFactory pathFactory; private final SnapshotManager snapshotManager; private final ManifestFile manifestFile; @@ -146,7 +146,6 @@ public FileStoreCommitImpl( this.schemaManager = schemaManager; this.commitUser = commitUser; this.partitionType = partitionType; - this.partitionObjectConverter = new RowDataToObjectArrayConverter(partitionType); this.pathFactory = pathFactory; this.snapshotManager = snapshotManager; this.manifestFile = manifestFileFactory.create(); @@ -213,7 +212,7 @@ public void commit(ManifestCommittable committable, Map properti int attempts = 0; Snapshot latestSnapshot = null; Long safeLatestSnapshotId = null; - List baseEntries = new ArrayList<>(); + List baseEntries = new ArrayList<>(); List appendTableFiles = new ArrayList<>(); List appendChangelog = new ArrayList<>(); @@ -228,6 +227,7 @@ public void commit(ManifestCommittable committable, Map properti compactChangelog, appendIndexFiles); try { + List appendSimpleEntries = SimpleFileEntry.from(appendTableFiles); if (!ignoreEmptyCommit || !appendTableFiles.isEmpty() || !appendChangelog.isEmpty() @@ -246,7 +246,8 @@ public void commit(ManifestCommittable committable, Map properti baseEntries.addAll( readAllEntriesFromChangedPartitions( latestSnapshot, appendTableFiles, compactTableFiles)); - noConflictsOrFail(latestSnapshot.commitUser(), baseEntries, appendTableFiles); + noConflictsOrFail( + latestSnapshot.commitUser(), baseEntries, appendSimpleEntries); safeLatestSnapshotId = latestSnapshot.id(); } @@ -274,8 +275,11 @@ public void commit(ManifestCommittable committable, Map properti // This optimization is mainly used to decrease the number of times we read from // files. if (safeLatestSnapshotId != null) { - baseEntries.addAll(appendTableFiles); - noConflictsOrFail(latestSnapshot.commitUser(), baseEntries, compactTableFiles); + baseEntries.addAll(appendSimpleEntries); + noConflictsOrFail( + latestSnapshot.commitUser(), + baseEntries, + SimpleFileEntry.from(compactTableFiles)); // assume this compact commit follows just after the append commit created above safeLatestSnapshotId += 1; } @@ -924,7 +928,7 @@ public boolean tryCommitOnce( } @SafeVarargs - private final List readAllEntriesFromChangedPartitions( + private final List readAllEntriesFromChangedPartitions( Snapshot snapshot, List... changes) { List changedPartitions = Arrays.stream(changes) @@ -935,8 +939,7 @@ private final List readAllEntriesFromChangedPartitions( try { return scan.withSnapshot(snapshot) .withPartitionFilter(changedPartitions) - .plan() - .files(); + .readSimpleEntries(); } catch (Throwable e) { throw new RuntimeException("Cannot read manifest entries from changed partitions.", e); } @@ -947,19 +950,21 @@ private void noConflictsOrFail( noConflictsOrFail( baseCommitUser, readAllEntriesFromChangedPartitions(latestSnapshot, changes), - changes); + SimpleFileEntry.from(changes)); } private void noConflictsOrFail( - String baseCommitUser, List baseEntries, List changes) { - List allEntries = new ArrayList<>(baseEntries); + String baseCommitUser, + List baseEntries, + List changes) { + List allEntries = new ArrayList<>(baseEntries); allEntries.addAll(changes); - Collection mergedEntries; + Collection mergedEntries; try { // merge manifest entries and also check if the files we want to delete are still there - mergedEntries = ManifestEntry.mergeEntries(allEntries); - ManifestEntry.assertNoDelete(mergedEntries); + mergedEntries = FileEntry.mergeEntries(allEntries); + FileEntry.assertNoDelete(mergedEntries); } catch (Throwable e) { Pair conflictException = createConflictException( @@ -979,9 +984,9 @@ private void noConflictsOrFail( } // group entries by partitions, buckets and levels - Map> levels = new HashMap<>(); - for (ManifestEntry entry : mergedEntries) { - int level = entry.file().level(); + Map> levels = new HashMap<>(); + for (SimpleFileEntry entry : mergedEntries) { + int level = entry.level(); if (level >= 1) { levels.computeIfAbsent( new LevelIdentifier(entry.partition(), entry.bucket(), level), @@ -991,12 +996,12 @@ private void noConflictsOrFail( } // check for all LSM level >= 1, key ranges of files do not intersect - for (List entries : levels.values()) { - entries.sort((a, b) -> keyComparator.compare(a.file().minKey(), b.file().minKey())); + for (List entries : levels.values()) { + entries.sort((a, b) -> keyComparator.compare(a.minKey(), b.minKey())); for (int i = 0; i + 1 < entries.size(); i++) { - ManifestEntry a = entries.get(i); - ManifestEntry b = entries.get(i + 1); - if (keyComparator.compare(a.file().maxKey(), b.file().minKey()) >= 0) { + SimpleFileEntry a = entries.get(i); + SimpleFileEntry b = entries.get(i + 1); + if (keyComparator.compare(a.maxKey(), b.minKey()) >= 0) { Pair conflictException = createConflictException( "LSM conflicts detected! Give up committing. Conflict files are:\n" @@ -1024,8 +1029,8 @@ private void noConflictsOrFail( private Pair createConflictException( String message, String baseCommitUser, - List baseEntries, - List changes, + List baseEntries, + List changes, Throwable cause, int maxEntry) { String possibleCauses = @@ -1058,13 +1063,11 @@ private Pair createConflictException( String baseEntriesString = "Base entries are:\n" + baseEntries.stream() - .map(ManifestEntry::toString) + .map(Object::toString) .collect(Collectors.joining("\n")); String changesString = "Changes are:\n" - + changes.stream() - .map(ManifestEntry::toString) - .collect(Collectors.joining("\n")); + + changes.stream().map(Object::toString).collect(Collectors.joining("\n")); RuntimeException fullException = new RuntimeException( @@ -1085,12 +1088,12 @@ private Pair createConflictException( "Base entries are:\n" + baseEntries.subList(0, Math.min(baseEntries.size(), maxEntry)) .stream() - .map(ManifestEntry::toString) + .map(Object::toString) .collect(Collectors.joining("\n")); changesString = "Changes are:\n" + changes.subList(0, Math.min(changes.size(), maxEntry)).stream() - .map(ManifestEntry::toString) + .map(Object::toString) .collect(Collectors.joining("\n")); simplifiedException = new RuntimeException( diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java index f9d60173b7c4..dbed15bd3bd0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java @@ -25,6 +25,7 @@ import org.apache.paimon.manifest.ManifestCacheFilter; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.ManifestFileMeta; +import org.apache.paimon.manifest.SimpleFileEntry; import org.apache.paimon.operation.metrics.ScanMetrics; import org.apache.paimon.partition.PartitionPredicate; import org.apache.paimon.predicate.Predicate; @@ -73,6 +74,12 @@ public interface FileStoreScan { /** Produce a {@link Plan}. */ Plan plan(); + /** + * Read {@link SimpleFileEntry}s, SimpleFileEntry only retains some critical information, so it + * cannot perform filtering based on statistical information. + */ + List readSimpleEntries(); + /** Result plan of this scan. */ interface Plan { diff --git a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java index b0a89e29e798..1675c6c203a4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java +++ b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java @@ -30,6 +30,8 @@ import org.apache.paimon.types.RowType; import org.apache.paimon.utils.RowDataToObjectArrayConverter; +import javax.annotation.Nullable; + import java.util.HashSet; import java.util.List; import java.util.Set; @@ -42,11 +44,21 @@ public interface PartitionPredicate { boolean test( long rowCount, InternalRow minValues, InternalRow maxValues, InternalArray nullCounts); - static PartitionPredicate fromPredicate(Predicate predicate) { + @Nullable + static PartitionPredicate fromPredicate(RowType partitionType, Predicate predicate) { + if (partitionType.getFieldCount() == 0 || predicate == null) { + return null; + } + return new DefaultPartitionPredicate(predicate); } + @Nullable static PartitionPredicate fromMultiple(RowType partitionType, List partitions) { + if (partitionType.getFieldCount() == 0 || partitions.isEmpty()) { + return null; + } + return new MultiplePartitionPredicate( new RowDataToObjectArrayConverter(partitionType), new HashSet<>(partitions)); } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java index 4c1b3aa30d13..61a465e4b6a2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java @@ -40,7 +40,7 @@ import static org.apache.paimon.utils.FileUtils.createFormatReader; /** A file which contains several {@link T}s, provides read and write. */ -public abstract class ObjectsFile { +public class ObjectsFile { protected final FileIO fileIO; protected final ObjectSerializer serializer; @@ -50,7 +50,7 @@ public abstract class ObjectsFile { @Nullable private final ObjectsCache cache; - protected ObjectsFile( + public ObjectsFile( FileIO fileIO, ObjectSerializer serializer, FormatReaderFactory readerFactory, diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java index ac0c0a64e2a1..611876867c81 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java @@ -106,7 +106,7 @@ protected void assertEquivalentEntries( .flatMap(f -> getManifestFile().read(f.fileName()).stream()) .collect(Collectors.toList()); List entryBeforeMerge = - ManifestEntry.mergeEntries(inputEntry).stream() + FileEntry.mergeEntries(inputEntry).stream() .map(entry -> entry.kind() + "-" + entry.file().fileName()) .collect(Collectors.toList()); diff --git a/paimon-core/src/test/java/org/apache/paimon/partition/PartitionPredicateTest.java b/paimon-core/src/test/java/org/apache/paimon/partition/PartitionPredicateTest.java index f43610f71d07..1263529db1fc 100644 --- a/paimon-core/src/test/java/org/apache/paimon/partition/PartitionPredicateTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/partition/PartitionPredicateTest.java @@ -60,7 +60,7 @@ public void testPartition() { and(builder.equal(0, 3), builder.equal(1, 5)), and(builder.equal(0, 4), builder.equal(1, 6))); - PartitionPredicate p1 = PartitionPredicate.fromPredicate(predicate); + PartitionPredicate p1 = PartitionPredicate.fromPredicate(type, predicate); PartitionPredicate p2 = PartitionPredicate.fromMultiple( type, Arrays.asList(createPart(3, 5), createPart(4, 6)));