diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java index fc3f2a3d6d20..bb3074ac7ba3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java @@ -197,6 +197,11 @@ public BucketFileRead bucketReader(BinaryRow partition, int bucket) { .withPartition(partition) .withBucket(bucket) .withDataFiles(files) + .rawConvertible(true) + .withBucketPath( + pathFactory + .bucketPath(partition, bucket) + .toString()) .build())); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java index fbd5f723e975..027aa0662725 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java @@ -33,12 +33,13 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.OptionalLong; +import java.util.stream.Collectors; +import static org.apache.paimon.io.DataFilePathFactory.INDEX_PATH_SUFFIX; import static org.apache.paimon.utils.Preconditions.checkArgument; /** Input splits. Needed by most batch computation engines. */ @@ -56,7 +57,8 @@ public class DataSplit implements Split { private List dataFiles; @Nullable private List dataDeletionFiles; - private List rawFiles = Collections.emptyList(); + private boolean rawConvertible; + private String bucketPath; public DataSplit() {} @@ -93,6 +95,14 @@ public boolean isStreaming() { return isStreaming; } + public boolean rawConvertible() { + return rawConvertible; + } + + public String getBucketPath() { + return bucketPath; + } + public OptionalLong getLatestFileCreationEpochMillis() { return this.dataFiles.stream().mapToLong(DataFileMeta::creationTimeEpochMillis).max(); } @@ -108,13 +118,61 @@ public long rowCount() { @Override public Optional> convertToRawFiles() { - if (rawFiles.isEmpty()) { - return Optional.empty(); + if (rawConvertible) { + return Optional.of( + dataFiles.stream() + .map(f -> makeRawTableFile(bucketPath, f)) + .collect(Collectors.toList())); } else { - return Optional.of(rawFiles); + return Optional.empty(); } } + private RawFile makeRawTableFile(String bucketPath, DataFileMeta meta) { + return new RawFile( + bucketPath + "/" + meta.fileName(), + 0, + meta.fileSize(), + meta.fileFormat() + .map(t -> t.toString().toLowerCase()) + .orElseThrow( + () -> + new RuntimeException( + "Can't find format from file: " + + bucketPath + + "/" + + meta.fileName())), + meta.schemaId(), + meta.rowCount()); + } + + @Override + @Nullable + public Optional> indexFiles() { + List indexFiles = new ArrayList<>(); + boolean hasIndexFile = false; + for (DataFileMeta file : dataFiles) { + List exFiles = + file.extraFiles().stream() + .filter(s -> s.endsWith(INDEX_PATH_SUFFIX)) + .collect(Collectors.toList()); + if (exFiles.isEmpty()) { + indexFiles.add(null); + } else if (exFiles.size() == 1) { + hasIndexFile = true; + indexFiles.add(new IndexFile(bucketPath + "/" + exFiles.get(0))); + } else { + throw new RuntimeException( + "Wrong number of file index for file " + + file.fileName() + + " index files: " + + String.join(",", exFiles)); + } + } + + return hasIndexFile ? Optional.of(indexFiles) : Optional.empty(); + } + @Override public boolean equals(Object o) { if (this == o) { @@ -131,7 +189,8 @@ public boolean equals(Object o) { && Objects.equals(dataFiles, split.dataFiles) && Objects.equals(dataDeletionFiles, split.dataDeletionFiles) && isStreaming == split.isStreaming - && Objects.equals(rawFiles, split.rawFiles); + && rawConvertible == split.rawConvertible + && Objects.equals(bucketPath, split.bucketPath); } @Override @@ -144,7 +203,8 @@ public int hashCode() { dataFiles, dataDeletionFiles, isStreaming, - rawFiles); + rawConvertible, + bucketPath); } private void writeObject(ObjectOutputStream out) throws IOException { @@ -164,7 +224,8 @@ private void assign(DataSplit other) { this.dataFiles = other.dataFiles; this.dataDeletionFiles = other.dataDeletionFiles; this.isStreaming = other.isStreaming; - this.rawFiles = other.rawFiles; + this.rawConvertible = other.rawConvertible; + this.bucketPath = other.bucketPath; } public void serialize(DataOutputView out) throws IOException { @@ -189,10 +250,8 @@ public void serialize(DataOutputView out) throws IOException { out.writeBoolean(isStreaming); - out.writeInt(rawFiles.size()); - for (RawFile rawFile : rawFiles) { - rawFile.serialize(out); - } + out.writeBoolean(rawConvertible); + out.writeUTF(bucketPath); } public static DataSplit deserialize(DataInputView in) throws IOException { @@ -218,12 +277,8 @@ public static DataSplit deserialize(DataInputView in) throws IOException { List dataDeletionFiles = DeletionFile.deserializeList(in); boolean isStreaming = in.readBoolean(); - - int rawFileNum = in.readInt(); - List rawFiles = new ArrayList<>(); - for (int i = 0; i < rawFileNum; i++) { - rawFiles.add(RawFile.deserialize(in)); - } + boolean rawConvertible = in.readBoolean(); + String bucketPath = in.readUTF(); DataSplit.Builder builder = builder() @@ -233,7 +288,9 @@ public static DataSplit deserialize(DataInputView in) throws IOException { .withBeforeFiles(beforeFiles) .withDataFiles(dataFiles) .isStreaming(isStreaming) - .rawFiles(rawFiles); + .rawConvertible(rawConvertible) + .withBucketPath(bucketPath); + if (beforeDeletionFiles != null) { builder.withBeforeDeletionFiles(beforeDeletionFiles); } @@ -292,8 +349,13 @@ public Builder isStreaming(boolean isStreaming) { return this; } - public Builder rawFiles(List rawFiles) { - this.split.rawFiles = rawFiles; + public Builder rawConvertible(boolean rawConvertible) { + this.split.rawConvertible = rawConvertible; + return this; + } + + public Builder withBucketPath(String bucketPath) { + this.split.bucketPath = bucketPath; return this; } @@ -301,6 +363,7 @@ public DataSplit build() { checkArgument(split.partition != null); checkArgument(split.bucket != -1); checkArgument(split.dataFiles != null); + checkArgument(split.bucketPath != null); DataSplit split = new DataSplit(); split.assign(this.split); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/IndexFile.java b/paimon-core/src/main/java/org/apache/paimon/table/source/IndexFile.java new file mode 100644 index 000000000000..1c7814d4e163 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/IndexFile.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.source; + +import org.apache.paimon.io.DataInputView; +import org.apache.paimon.io.DataOutputView; + +import java.io.IOException; +import java.util.Objects; + +/** Index file for data file. */ +public class IndexFile { + + private final String path; + + public IndexFile(String path) { + this.path = path; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof IndexFile)) { + return false; + } + + IndexFile other = (IndexFile) o; + return Objects.equals(path, other.path); + } + + public void serialize(DataOutputView out) throws IOException { + out.writeUTF(path); + } + + public static IndexFile deserialize(DataInputView in) throws IOException { + String path = in.readUTF(); + return new IndexFile(path); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/Split.java b/paimon-core/src/main/java/org/apache/paimon/table/source/Split.java index adefb868fc9d..d6cb381e246a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/Split.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/Split.java @@ -51,4 +51,14 @@ default Optional> convertToRawFiles() { default Optional> deletionFiles() { return Optional.empty(); } + + /** + * * Return the index file of the data file, for example, bloom-filter index. All the type of + * indexes and columns will be stored in one single index file. + * + *

If there is no corresponding index file, the element will be null. + */ + default Optional> indexFiles() { + return Optional.empty(); + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java index 602a6370a76f..86705ce44e76 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java @@ -23,17 +23,22 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.DeletionFile; import org.apache.paimon.table.source.PlanImpl; import org.apache.paimon.table.source.ScanMode; +import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.SplitGenerator; -import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.SnapshotManager; +import javax.annotation.Nullable; + import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; /** {@link StartingScanner} for incremental changes by snapshot. */ public class IncrementalStartingScanner extends AbstractStartingScanner { @@ -52,34 +57,49 @@ public IncrementalStartingScanner( @Override public Result scan(SnapshotReader reader) { - Map, List> grouped = new HashMap<>(); + Map> grouped = new HashMap<>(); for (long i = startingSnapshotId + 1; i < endingSnapshotId + 1; i++) { List splits = readSplits(reader, snapshotManager.snapshot(i)); for (DataSplit split : splits) { grouped.computeIfAbsent( - Pair.of(split.partition(), split.bucket()), k -> new ArrayList<>()) + new SplitInfo( + split.partition(), + split.bucket(), + // take it for false, because multiple snapshot read may + // need merge for primary key table + false, + split.getBucketPath(), + split.deletionFiles().orElse(null)), + k -> new ArrayList<>()) .addAll(split.dataFiles()); } } - List result = new ArrayList<>(); - for (Map.Entry, List> entry : grouped.entrySet()) { - BinaryRow partition = entry.getKey().getLeft(); - int bucket = entry.getKey().getRight(); + List result = new ArrayList<>(); + for (Map.Entry> entry : grouped.entrySet()) { + BinaryRow partition = entry.getKey().partition; + int bucket = entry.getKey().bucket; + boolean rawConvertible = entry.getKey().rawConvertible; + String bucketPath = entry.getKey().bucketPath; + List deletionFiles = entry.getKey().deletionFiles; for (SplitGenerator.SplitGroup splitGroup : reader.splitGenerator().splitForBatch(entry.getValue())) { - // TODO pass deletion files - result.add( + DataSplit.Builder dataSplitBuilder = DataSplit.builder() .withSnapshot(endingSnapshotId) .withPartition(partition) .withBucket(bucket) .withDataFiles(splitGroup.files) - .build()); + .rawConvertible(rawConvertible) + .withBucketPath(bucketPath); + if (deletionFiles != null) { + dataSplitBuilder.withDataDeletionFiles(deletionFiles); + } + result.add(dataSplitBuilder.build()); } } - return StartingScanner.fromPlan(new PlanImpl(null, endingSnapshotId, (List) result)); + return StartingScanner.fromPlan(new PlanImpl(null, endingSnapshotId, result)); } private List readSplits(SnapshotReader reader, Snapshot s) { @@ -110,4 +130,49 @@ private List readChangeLogSplits(SnapshotReader reader, Snapshot s) { } return (List) reader.withSnapshot(s).withMode(ScanMode.CHANGELOG).read().splits(); } + + /** Split information to pass. */ + private static class SplitInfo { + + private final BinaryRow partition; + private final int bucket; + private final boolean rawConvertible; + private final String bucketPath; + @Nullable private final List deletionFiles; + + private SplitInfo( + BinaryRow partition, + int bucket, + boolean rawConvertible, + String bucketPath, + @Nullable List deletionFiles) { + this.partition = partition; + this.bucket = bucket; + this.rawConvertible = rawConvertible; + this.bucketPath = bucketPath; + this.deletionFiles = deletionFiles; + } + + @Override + public int hashCode() { + return Arrays.hashCode( + new Object[] {partition, bucket, rawConvertible, bucketPath, deletionFiles}); + } + + @Override + public boolean equals(Object obj) { + + if (!(obj instanceof SplitInfo)) { + return false; + } + + SplitInfo that = (SplitInfo) obj; + + return Objects.equals(partition, that.partition) + && bucket == that.bucket + && rawConvertible == that.rawConvertible + && Objects.equals(bucketPath, that.bucketPath) + && Objects.equals(deletionFiles, that.deletionFiles); + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java index cb3c9fc7e136..c1e5c2e47e37 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java @@ -40,7 +40,6 @@ import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.DeletionFile; import org.apache.paimon.table.source.PlanImpl; -import org.apache.paimon.table.source.RawFile; import org.apache.paimon.table.source.ScanMode; import org.apache.paimon.table.source.SplitGenerator; import org.apache.paimon.types.RowType; @@ -290,11 +289,10 @@ private List generateSplits( : null; for (SplitGenerator.SplitGroup splitGroup : splitGroups) { List dataFiles = splitGroup.files; - builder.withDataFiles(dataFiles); - builder.rawFiles( - splitGroup.rawConvertible - ? convertToRawFiles(partition, bucket, dataFiles) - : Collections.emptyList()); + String bucketPath = pathFactory.bucketPath(partition, bucket).toString(); + builder.withDataFiles(dataFiles) + .rawConvertible(splitGroup.rawConvertible) + .withBucketPath(bucketPath); if (deletionVectors) { builder.withDataDeletionFiles( getDeletionFiles(dataFiles, deletionIndexFile)); @@ -376,7 +374,8 @@ private Plan toChangesPlan( .withBucket(bucket) .withBeforeFiles(before) .withDataFiles(data) - .isStreaming(isStreaming); + .isStreaming(isStreaming) + .withBucketPath(pathFactory.bucketPath(part, bucket).toString()); if (deletionVectors) { IndexFileMeta beforeDeletionIndex = indexFileHandler @@ -438,28 +437,4 @@ private List getDeletionFiles( return deletionFiles; } - - private List convertToRawFiles( - BinaryRow partition, int bucket, List dataFiles) { - String bucketPath = pathFactory.bucketPath(partition, bucket).toString(); - return dataFiles.stream() - .map(f -> makeRawTableFile(bucketPath, f)) - .collect(Collectors.toList()); - } - - private RawFile makeRawTableFile(String bucketPath, DataFileMeta meta) { - return new RawFile( - bucketPath + "/" + meta.fileName(), - 0, - meta.fileSize(), - meta.fileFormat() - .map(t -> t.toString().toLowerCase()) - .orElse( - new CoreOptions(tableSchema.options()) - .formatType() - .toString() - .toLowerCase()), - meta.schemaId(), - meta.rowCount()); - } } diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java index 6adc3aff04f8..e71f5d281762 100644 --- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java +++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java @@ -439,6 +439,8 @@ public List readKvsFromManifestEntries( .withBucket(entryWithBucket.getKey()) .withDataFiles(entryWithBucket.getValue()) .isStreaming(isStreaming) + .rawConvertible(false) + .withBucketPath("not used") .build())); while (iterator.hasNext()) { kvs.add(iterator.next().copy(keySerializer, valueSerializer)); diff --git a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java index 3eff9b7cdf33..b8950772a799 100644 --- a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java @@ -117,6 +117,7 @@ private DataSplit newSplit(DataFileMeta... files) { .withPartition(EMPTY_ROW) .withBucket(0) .withDataFiles(Arrays.asList(files)) + .withBucketPath("") // not used .build(); } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java index 806c869f94f3..5652fcd43700 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java @@ -246,6 +246,7 @@ private List writeThenRead( entry.getValue().stream() .map(ManifestEntry::file) .collect(Collectors.toList())) + .withBucketPath("not used") .build()); RecordReaderIterator actualIterator = new RecordReaderIterator<>(reader); while (actualIterator.hasNext()) { diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java index 7fbd8f96db8b..b5dda8df7012 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java @@ -50,6 +50,7 @@ public void testSerializer() throws IOException { .withPartition(data.partition) .withBucket(data.bucket) .withDataFiles(files) + .withBucketPath("my path") .build(); ByteArrayOutputStream out = new ByteArrayOutputStream(); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/SnapshotReaderTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/SnapshotReaderTest.java index a34f16a0305d..5788e97221e9 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/SnapshotReaderTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/SnapshotReaderTest.java @@ -22,6 +22,7 @@ import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.serializer.InternalRowSerializer; +import org.apache.paimon.fileindex.bloomfilter.BloomFilterFileIndex; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileIOFinder; import org.apache.paimon.fs.Path; @@ -38,6 +39,7 @@ import org.apache.paimon.table.sink.StreamTableCommit; import org.apache.paimon.table.sink.StreamTableWrite; import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.IndexFile; import org.apache.paimon.table.source.RawFile; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; @@ -55,6 +57,7 @@ import java.util.Map; import java.util.UUID; +import static org.apache.paimon.io.DataFilePathFactory.INDEX_PATH_SUFFIX; import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link SnapshotReader}. */ @@ -255,6 +258,82 @@ public void testGetAppendOnlyRawFiles() throws Exception { commit.close(); } + @Test + public void testGetAppendOnlyIndexFiles() throws Exception { + RowType rowType = + RowType.of( + new DataType[] {DataTypes.INT(), DataTypes.BIGINT()}, + new String[] {"k", "v"}); + FileStoreTable table = + createFileStoreTable(rowType, Collections.emptyList(), Collections.emptyList()); + + String commitUser = UUID.randomUUID().toString(); + StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser); + SnapshotReader reader = table.newSnapshotReader(); + + // write one file + + write.write(GenericRow.of(11, 1101L)); + write.write(GenericRow.of(12, 1201L)); + write.write(GenericRow.of(21, 2101L)); + write.write(GenericRow.of(22, 2201L)); + commit.commit(1, write.prepareCommit(false, 1)); + + List dataSplits = reader.read().dataSplits(); + assertThat(dataSplits).hasSize(1); + DataSplit dataSplit = dataSplits.get(0); + assertThat(dataSplit.dataFiles()).hasSize(1); + DataFileMeta meta = dataSplit.dataFiles().get(0); + assertThat(dataSplit.indexFiles()) + .hasValue( + Collections.singletonList( + new IndexFile( + String.format( + "%s/bucket-0/%s" + INDEX_PATH_SUFFIX, + tablePath, + meta.fileName())))); + + // change file schema + + write.close(); + SchemaManager schemaManager = new SchemaManager(fileIO, tablePath); + schemaManager.commitChanges(SchemaChange.addColumn("v2", DataTypes.STRING())); + table = table.copyWithLatestSchema(); + write = table.newWrite(commitUser); + + // write another file + + write.write(GenericRow.of(11, 1102L, BinaryString.fromString("eleven"))); + write.write(GenericRow.of(12, 1202L, BinaryString.fromString("twelve"))); + write.write(GenericRow.of(21, 2102L, BinaryString.fromString("twenty-one"))); + write.write(GenericRow.of(22, 2202L, BinaryString.fromString("twenty-two"))); + commit.commit(2, write.prepareCommit(false, 2)); + + dataSplits = reader.read().dataSplits(); + assertThat(dataSplits).hasSize(1); + dataSplit = dataSplits.get(0); + assertThat(dataSplit.dataFiles()).hasSize(2); + DataFileMeta meta0 = dataSplit.dataFiles().get(0); + DataFileMeta meta1 = dataSplit.dataFiles().get(1); + assertThat(dataSplit.indexFiles()) + .hasValue( + Arrays.asList( + new IndexFile( + String.format( + "%s/bucket-0/%s" + INDEX_PATH_SUFFIX, + tablePath, + meta0.fileName())), + new IndexFile( + String.format( + "%s/bucket-0/%s" + INDEX_PATH_SUFFIX, + tablePath, + meta1.fileName())))); + + write.close(); + commit.close(); + } + private FileStoreTable createFileStoreTable( RowType rowType, List partitionKeys, List primaryKeys) throws Exception { @@ -265,6 +344,14 @@ private FileStoreTable createFileStoreTable( Map formatPerLevel = new HashMap<>(); formatPerLevel.put("5", "orc"); options.set(CoreOptions.FILE_FORMAT_PER_LEVEL, formatPerLevel); + // test read with extra files + options.set( + CoreOptions.FILE_INDEX + + "." + + BloomFilterFileIndex.BLOOM_FILTER + + "." + + CoreOptions.COLUMNS, + rowType.getFieldNames().get(0)); SchemaManager schemaManager = new SchemaManager(fileIO, tablePath); TableSchema tableSchema = diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/PlaceholderSplit.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/PlaceholderSplit.java index d6c172fed45a..070b6f6b8f62 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/PlaceholderSplit.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/PlaceholderSplit.java @@ -34,7 +34,9 @@ * contain any {@link org.apache.paimon.table.source.Split}. */ public class PlaceholderSplit extends DataSplit { + private static final long serialVersionUID = 3L; + private static final String NO_USE_BUCKET_PATH = "/no-used"; private final DataSplit dataSplit; @@ -47,6 +49,8 @@ public PlaceholderSplit(long snapshotId) { .withDataFiles(Collections.emptyList()) .withPartition(BinaryRow.EMPTY_ROW) .isStreaming(true) + .rawConvertible(false) + .withBucketPath(NO_USE_BUCKET_PATH) .build(); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java index b41da234eba7..a3b327027d9c 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java @@ -103,6 +103,7 @@ public void testOrderResult() throws Exception { .withPartition(entry.partition()) .withBucket(entry.bucket()) .withDataFiles(Collections.singletonList(entry.file())) + .withBucketPath("not used") .build(); final AtomicInteger i = new AtomicInteger(Integer.MIN_VALUE); @@ -128,6 +129,7 @@ public void testOrderResult() throws Exception { .withPartition(entry.partition()) .withBucket(entry.bucket()) .withDataFiles(Collections.singletonList(entry.file())) + .withBucketPath("not used") .build(); i.set(Integer.MIN_VALUE); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java index 1a3f424245e1..77572bf012c9 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java @@ -837,6 +837,8 @@ private static DataSplit createDataSplit( .withBucket(bucket) .withDataFiles(files) .isStreaming(true) + .rawConvertible(false) + .withBucketPath("") // not used .build(); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileSplitEnumeratorTestBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileSplitEnumeratorTestBase.java index 419a8c4782dd..446cbec0e902 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileSplitEnumeratorTestBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileSplitEnumeratorTestBase.java @@ -61,6 +61,7 @@ protected FileStoreSourceSplit createSnapshotSplit( .withBucket(bucket) .withDataFiles(files) .isStreaming(true) + .withBucketPath("/temp/xxx") // not used .build(), 0); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java index ef364a1ed278..e32ddbd7ea7a 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java @@ -122,6 +122,8 @@ private DataSplit dataSplit(int partition, int bucket, String... fileNames) { .withBucket(bucket) .isStreaming(false) .withDataFiles(metas) + .rawConvertible(false) + .withBucketPath("/") // not used .build(); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java index 8d7d4c04e172..e4f9473ebccc 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java @@ -126,6 +126,8 @@ public static FileStoreSourceSplit newSourceSplit( .withBucket(bucket) .withDataFiles(files) .isStreaming(isIncremental) + .rawConvertible(false) + .withBucketPath("/temp/" + bucket) // no used .build(); return new FileStoreSourceSplit(id, split, recordsToSkip); } diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonInputSplitTest.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonInputSplitTest.java index 7d152c444778..755b2482f335 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonInputSplitTest.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonInputSplitTest.java @@ -76,6 +76,8 @@ public void testWriteAndRead() throws Exception { .filter(d -> d.partition.equals(wantedPartition)) .map(d -> d.meta) .collect(Collectors.toList())) + .rawConvertible(false) + .withBucketPath("not used") .build(); PaimonInputSplit split = new PaimonInputSplit(tempDir.toString(), dataSplit, null); diff --git a/paimon-spark/paimon-spark-common/pom.xml b/paimon-spark/paimon-spark-common/pom.xml index 9cca628cbe56..e435174f810a 100644 --- a/paimon-spark/paimon-spark-common/pom.xml +++ b/paimon-spark/paimon-spark-common/pom.xml @@ -135,10 +135,6 @@ under the License. org.slf4j slf4j-log4j12 - - org.apache.orc - orc-core - diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala index 01a2ca165411..556bd82cd69a 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala @@ -64,25 +64,22 @@ trait ScanHelper { var currentSplit: Option[DataSplit] = None val currentDataFiles = new ArrayBuffer[DataFileMeta] val currentDeletionFiles = new ArrayBuffer[DeletionFile] - val currentRawFiles = new ArrayBuffer[RawFile] var currentSize = 0L def closeDataSplit(): Unit = { if (currentSplit.nonEmpty && currentDataFiles.nonEmpty) { val newSplit = - copyDataSplit(currentSplit.get, currentDataFiles, currentDeletionFiles, currentRawFiles) + copyDataSplit(currentSplit.get, currentDataFiles, currentDeletionFiles) newSplits += newSplit } currentDataFiles.clear() currentDeletionFiles.clear() - currentRawFiles.clear() currentSize = 0 } splits.foreach { split => currentSplit = Some(split) - val hasRawFiles = split.convertToRawFiles().isPresent split.dataFiles().asScala.zipWithIndex.foreach { case (file, idx) => @@ -94,9 +91,6 @@ trait ScanHelper { if (deletionVectors) { currentDeletionFiles += split.deletionFiles().get().get(idx) } - if (hasRawFiles) { - currentRawFiles += split.convertToRawFiles().get().get(idx) - } } closeDataSplit() } @@ -115,15 +109,15 @@ trait ScanHelper { private def copyDataSplit( split: DataSplit, dataFiles: Seq[DataFileMeta], - deletionFiles: Seq[DeletionFile], - rawFiles: Seq[RawFile]): DataSplit = { + deletionFiles: Seq[DeletionFile]): DataSplit = { val builder = DataSplit .builder() .withSnapshot(split.snapshotId()) .withPartition(split.partition()) .withBucket(split.bucket()) .withDataFiles(dataFiles.toList.asJava) - .rawFiles(rawFiles.toList.asJava) + .rawConvertible(split.rawConvertible()) + .withBucketPath(split.getBucketPath) if (deletionVectors) { builder.withDataDeletionFiles(deletionFiles.toList.asJava) } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala index 95efb7a23bcf..0ac0b14bbe6c 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala @@ -120,7 +120,10 @@ case class DeleteFromPaimonTableCommand( } // Step4: build a dataframe that contains the unchanged data, and write out them. - val touchedDataSplits = SparkDataFileMeta.convertToDataSplits(touchedFiles) + val touchedDataSplits = SparkDataFileMeta.convertToDataSplits( + touchedFiles, + rawConvertible = true, + table.store().pathFactory()) val toRewriteScanRelation = Filter( Not(condition), Compatibility.createDataSourceV2ScanRelation( diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala index cb3266157726..8299f6b8c66b 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala @@ -48,7 +48,10 @@ object SparkDataFileMeta { } } - def convertToDataSplits(sparkDataFiles: Array[SparkDataFileMeta]): Array[DataSplit] = { + def convertToDataSplits( + sparkDataFiles: Array[SparkDataFileMeta], + rawConvertible: Boolean, + pathFactory: FileStorePathFactory): Array[DataSplit] = { sparkDataFiles .groupBy(file => (file.partition, file.bucket)) .map { @@ -57,6 +60,8 @@ object SparkDataFileMeta { .withPartition(partition) .withBucket(bucket) .withDataFiles(files.map(_.dataFileMeta).toList.asJava) + .rawConvertible(rawConvertible) + .withBucketPath(pathFactory.bucketPath(partition, bucket).toString) .build() } .toArray diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala index 6c16ce5e8b8b..ee41c5ebb596 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala @@ -102,7 +102,11 @@ case class UpdatePaimonTableCommand( } new Column(updated).as(origin.name, origin.metadata) } - val touchedDataSplits = SparkDataFileMeta.convertToDataSplits(touchedFiles) + // append only file always set rawConvertible true. + val touchedDataSplits = SparkDataFileMeta.convertToDataSplits( + touchedFiles, + rawConvertible = true, + table.store().pathFactory()) val toUpdateScanRelation = DataSourceV2ScanRelation( relation, PaimonSplitScan(table, touchedDataSplits), diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala index c6bda9e7cb0e..7b150c1fc8a6 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala @@ -38,13 +38,10 @@ class ScanHelperTest extends PaimonSparkTestBase { val fileNum = 100 val files = scala.collection.mutable.ListBuffer.empty[DataFileMeta] - val rawFiles = scala.collection.mutable.ListBuffer.empty[RawFile] 0.until(fileNum).foreach { i => val path = s"f$i.parquet" files += DataFileMeta.forAppend(path, 750000, 30000, null, 0, 29999, 1) - - rawFiles += new RawFile(s"/a/b/$path", 0, 75000, "parquet", 0, 30000) } val dataSplits = mutable.ArrayBuffer.empty[Split] @@ -56,7 +53,8 @@ class ScanHelperTest extends PaimonSparkTestBase { .withBucket(0) .withPartition(new BinaryRow(0)) .withDataFiles(files.zipWithIndex.filter(_._2 % splitNum == i).map(_._1).toList.asJava) - .rawFiles(rawFiles.zipWithIndex.filter(_._2 % splitNum == i).map(_._1).toList.asJava) + .rawConvertible(true) + .withBucketPath("no use") .build() }