From 8ff2537b2a0ac43365746193c59bdd6977e9394d Mon Sep 17 00:00:00 2001 From: Jingsong Lee Date: Mon, 15 Apr 2024 16:48:16 +0800 Subject: [PATCH] [core] Introduce RawFileSplitRead to accelerate batch read for primary key table (#3209) --- .../org/apache/paimon/utils/LazyField.java | 48 ++++ .../apache/paimon/AppendOnlyFileStore.java | 6 +- .../java/org/apache/paimon/FileStore.java | 4 +- .../org/apache/paimon/KeyValueFileStore.java | 17 +- .../ApplyDeletionVectorReader.java | 13 -- .../operation/AppendOnlyFileStoreRead.java | 192 ---------------- .../operation/AppendOnlyFileStoreWrite.java | 4 +- ...StoreRead.java => MergeFileSplitRead.java} | 25 +- .../paimon/operation/RawFileSplitRead.java | 216 ++++++++++++++++++ .../{FileStoreRead.java => SplitRead.java} | 4 +- .../table/AppendOnlyFileStoreTable.java | 12 +- .../table/PrimaryKeyFileStoreTable.java | 23 +- .../table/source/AbstractDataTableRead.java | 10 +- .../table/source/KeyValueTableRead.java | 123 +++++++--- .../apache/paimon/table/source/TableRead.java | 4 +- .../java/org/apache/paimon/TestFileStore.java | 4 +- ...dTest.java => MergeFileSplitReadTest.java} | 6 +- .../flink/action/MergeIntoActionITCase.java | 6 +- .../source/TestChangelogDataReadWrite.java | 46 ++-- .../flink/util/ReadWriteTableTestUtil.java | 22 +- 20 files changed, 455 insertions(+), 330 deletions(-) create mode 100644 paimon-common/src/main/java/org/apache/paimon/utils/LazyField.java delete mode 100644 paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java rename paimon-core/src/main/java/org/apache/paimon/operation/{KeyValueFileStoreRead.java => MergeFileSplitRead.java} (94%) create mode 100644 paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java rename paimon-core/src/main/java/org/apache/paimon/operation/{FileStoreRead.java => SplitRead.java} (93%) rename paimon-core/src/test/java/org/apache/paimon/operation/{KeyValueFileStoreReadTest.java => MergeFileSplitReadTest.java} (99%) diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/LazyField.java b/paimon-common/src/main/java/org/apache/paimon/utils/LazyField.java new file mode 100644 index 000000000000..2bb701362507 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/utils/LazyField.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import java.util.function.Supplier; + +/** A class to lazy initialized field. */ +public class LazyField { + + private final Supplier supplier; + + private boolean initialized; + private T value; + + public LazyField(Supplier supplier) { + this.supplier = supplier; + } + + public T get() { + if (!initialized) { + T t = supplier.get(); + value = t; + initialized = true; + return t; + } + return value; + } + + public boolean initialized() { + return initialized; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java index 0d546e2154d5..186fd479922f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java @@ -22,9 +22,9 @@ import org.apache.paimon.format.FileFormatDiscover; import org.apache.paimon.fs.FileIO; import org.apache.paimon.manifest.ManifestCacheFilter; -import org.apache.paimon.operation.AppendOnlyFileStoreRead; import org.apache.paimon.operation.AppendOnlyFileStoreScan; import org.apache.paimon.operation.AppendOnlyFileStoreWrite; +import org.apache.paimon.operation.RawFileSplitRead; import org.apache.paimon.operation.ScanBucketFilter; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.schema.SchemaManager; @@ -79,8 +79,8 @@ public AppendOnlyFileStoreScan newScan(String branchName) { } @Override - public AppendOnlyFileStoreRead newRead() { - return new AppendOnlyFileStoreRead( + public RawFileSplitRead newRead() { + return new RawFileSplitRead( fileIO, schemaManager, schema, diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java b/paimon-core/src/main/java/org/apache/paimon/FileStore.java index cd38d20611fc..6731121c567b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java @@ -23,11 +23,11 @@ import org.apache.paimon.manifest.ManifestFile; import org.apache.paimon.manifest.ManifestList; import org.apache.paimon.operation.FileStoreCommit; -import org.apache.paimon.operation.FileStoreRead; import org.apache.paimon.operation.FileStoreScan; import org.apache.paimon.operation.FileStoreWrite; import org.apache.paimon.operation.PartitionExpire; import org.apache.paimon.operation.SnapshotDeletion; +import org.apache.paimon.operation.SplitRead; import org.apache.paimon.operation.TagDeletion; import org.apache.paimon.service.ServiceManager; import org.apache.paimon.stats.StatsFileHandler; @@ -73,7 +73,7 @@ public interface FileStore extends Serializable { StatsFileHandler newStatsFileHandler(); - FileStoreRead newRead(); + SplitRead newRead(); FileStoreWrite newWrite(String commitUser); diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java index 956b615d7884..354cc6dda195 100644 --- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java @@ -28,9 +28,10 @@ import org.apache.paimon.io.KeyValueFileReaderFactory; import org.apache.paimon.manifest.ManifestCacheFilter; import org.apache.paimon.mergetree.compact.MergeFunctionFactory; -import org.apache.paimon.operation.KeyValueFileStoreRead; import org.apache.paimon.operation.KeyValueFileStoreScan; import org.apache.paimon.operation.KeyValueFileStoreWrite; +import org.apache.paimon.operation.MergeFileSplitRead; +import org.apache.paimon.operation.RawFileSplitRead; import org.apache.paimon.operation.ScanBucketFilter; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.schema.KeyValueFieldsExtractor; @@ -119,8 +120,8 @@ public KeyValueFileStoreScan newScan(String branchName) { } @Override - public KeyValueFileStoreRead newRead() { - return new KeyValueFileStoreRead( + public MergeFileSplitRead newRead() { + return new MergeFileSplitRead( options, schema, keyType, @@ -130,6 +131,16 @@ public KeyValueFileStoreRead newRead() { newReaderFactoryBuilder()); } + public RawFileSplitRead newBatchRawFileRead() { + return new RawFileSplitRead( + fileIO, + schemaManager, + schema, + valueType, + FileFormatDiscover.of(options), + pathFactory()); + } + public KeyValueFileReaderFactory.Builder newReaderFactoryBuilder() { return KeyValueFileReaderFactory.builder( fileIO, diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java index dadde99eac5f..6cc8b396f888 100644 --- a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java @@ -24,7 +24,6 @@ import javax.annotation.Nullable; import java.io.IOException; -import java.util.Optional; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -40,18 +39,6 @@ public ApplyDeletionVectorReader(RecordReader reader, DeletionVector deletion this.deletionVector = deletionVector; } - public static RecordReader create(RecordReader reader, Optional dv) { - return create(reader, dv.orElse(null)); - } - - public static RecordReader create(RecordReader reader, @Nullable DeletionVector dv) { - if (dv == null) { - return reader; - } - - return new ApplyDeletionVectorReader<>(reader, dv); - } - @Nullable @Override public RecordIterator readBatch() throws IOException { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java deleted file mode 100644 index 49eea905c4b1..000000000000 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java +++ /dev/null @@ -1,192 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.operation; - -import org.apache.paimon.AppendOnlyFileStore; -import org.apache.paimon.data.BinaryRow; -import org.apache.paimon.data.InternalRow; -import org.apache.paimon.format.FileFormatDiscover; -import org.apache.paimon.format.FormatKey; -import org.apache.paimon.format.FormatReaderContext; -import org.apache.paimon.fs.FileIO; -import org.apache.paimon.io.DataFileMeta; -import org.apache.paimon.io.DataFilePathFactory; -import org.apache.paimon.io.FileRecordReader; -import org.apache.paimon.mergetree.compact.ConcatRecordReader; -import org.apache.paimon.partition.PartitionUtils; -import org.apache.paimon.predicate.Predicate; -import org.apache.paimon.reader.RecordReader; -import org.apache.paimon.schema.IndexCastMapping; -import org.apache.paimon.schema.SchemaEvolutionUtil; -import org.apache.paimon.schema.SchemaManager; -import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.table.source.DataSplit; -import org.apache.paimon.types.RowType; -import org.apache.paimon.utils.BulkFormatMapping; -import org.apache.paimon.utils.FileStorePathFactory; -import org.apache.paimon.utils.Pair; -import org.apache.paimon.utils.Projection; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nullable; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.apache.paimon.predicate.PredicateBuilder.splitAnd; - -/** {@link FileStoreRead} for {@link AppendOnlyFileStore}. */ -public class AppendOnlyFileStoreRead implements FileStoreRead { - - private static final Logger LOG = LoggerFactory.getLogger(AppendOnlyFileStoreRead.class); - - private final FileIO fileIO; - private final SchemaManager schemaManager; - private final TableSchema schema; - private final FileFormatDiscover formatDiscover; - private final FileStorePathFactory pathFactory; - private final Map bulkFormatMappings; - - private int[][] projection; - - @Nullable private List filters; - - public AppendOnlyFileStoreRead( - FileIO fileIO, - SchemaManager schemaManager, - TableSchema schema, - RowType rowType, - FileFormatDiscover formatDiscover, - FileStorePathFactory pathFactory) { - this.fileIO = fileIO; - this.schemaManager = schemaManager; - this.schema = schema; - this.formatDiscover = formatDiscover; - this.pathFactory = pathFactory; - this.bulkFormatMappings = new HashMap<>(); - - this.projection = Projection.range(0, rowType.getFieldCount()).toNestedIndexes(); - } - - public FileStoreRead withProjection(int[][] projectedFields) { - projection = projectedFields; - return this; - } - - @Override - public FileStoreRead withFilter(Predicate predicate) { - this.filters = splitAnd(predicate); - return this; - } - - @Override - public RecordReader createReader(DataSplit split) throws IOException { - DataFilePathFactory dataFilePathFactory = - pathFactory.createDataFilePathFactory(split.partition(), split.bucket()); - List> suppliers = new ArrayList<>(); - if (split.beforeFiles().size() > 0) { - LOG.info("Ignore split before files: " + split.beforeFiles()); - } - for (DataFileMeta file : split.dataFiles()) { - String formatIdentifier = DataFilePathFactory.formatIdentifier(file.fileName()); - BulkFormatMapping bulkFormatMapping = - bulkFormatMappings.computeIfAbsent( - new FormatKey(file.schemaId(), formatIdentifier), - key -> { - TableSchema tableSchema = schema; - TableSchema dataSchema = - key.schemaId == schema.id() - ? schema - : schemaManager.schema(key.schemaId); - - // projection to data schema - int[][] dataProjection = - SchemaEvolutionUtil.createDataProjection( - tableSchema.fields(), - dataSchema.fields(), - projection); - - IndexCastMapping indexCastMapping = - SchemaEvolutionUtil.createIndexCastMapping( - Projection.of(projection).toTopLevelIndexes(), - tableSchema.fields(), - Projection.of(dataProjection).toTopLevelIndexes(), - dataSchema.fields()); - - List dataFilters = - this.schema.id() == key.schemaId - ? filters - : SchemaEvolutionUtil.createDataFilters( - tableSchema.fields(), - dataSchema.fields(), - filters); - - Pair partitionPair = null; - if (!dataSchema.partitionKeys().isEmpty()) { - Pair partitionMapping = - PartitionUtils.constructPartitionMapping( - dataSchema, dataProjection); - // if partition fields are not selected, we just do nothing - if (partitionMapping != null) { - dataProjection = partitionMapping.getRight(); - partitionPair = - Pair.of( - partitionMapping.getLeft(), - dataSchema.projectedLogicalRowType( - dataSchema.partitionKeys())); - } - } - - RowType projectedRowType = - Projection.of(dataProjection) - .project(dataSchema.logicalRowType()); - - return new BulkFormatMapping( - indexCastMapping.getIndexMapping(), - indexCastMapping.getCastMapping(), - partitionPair, - formatDiscover - .discover(formatIdentifier) - .createReaderFactory( - projectedRowType, dataFilters)); - }); - - final BinaryRow partition = split.partition(); - suppliers.add( - () -> - new FileRecordReader( - bulkFormatMapping.getReaderFactory(), - new FormatReaderContext( - fileIO, - dataFilePathFactory.toPath(file.fileName()), - file.fileSize()), - bulkFormatMapping.getIndexMapping(), - bulkFormatMapping.getCastMapping(), - PartitionUtils.create( - bulkFormatMapping.getPartitionPair(), partition))); - } - - return ConcatRecordReader.create(suppliers); - } -} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java index a0d86337139e..b3361b0df735 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java @@ -59,7 +59,7 @@ public class AppendOnlyFileStoreWrite extends MemoryFileStoreWrite { private final FileIO fileIO; - private final AppendOnlyFileStoreRead read; + private final RawFileSplitRead read; private final long schemaId; private final RowType rowType; private final FileFormat fileFormat; @@ -81,7 +81,7 @@ public class AppendOnlyFileStoreWrite extends MemoryFileStoreWrite public AppendOnlyFileStoreWrite( FileIO fileIO, - AppendOnlyFileStoreRead read, + RawFileSplitRead read, long schemaId, String commitUser, RowType rowType, diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java similarity index 94% rename from paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java rename to paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java index b0ab4338f117..d457c9093650 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java @@ -64,8 +64,13 @@ import static org.apache.paimon.predicate.PredicateBuilder.containsFields; import static org.apache.paimon.predicate.PredicateBuilder.splitAnd; -/** {@link FileStoreRead} implementation for {@link KeyValueFileStore}. */ -public class KeyValueFileStoreRead implements FileStoreRead { +/** + * An implementation for {@link KeyValueFileStore}, this class handle LSM merging and changelog row + * kind things, it will force reading fields such as sequence and row_kind. + * + * @see RawFileSplitRead If in batch mode and reading raw files, it is recommended to use this read. + */ +public class MergeFileSplitRead implements SplitRead { private final TableSchema tableSchema; private final FileIO fileIO; @@ -86,7 +91,7 @@ public class KeyValueFileStoreRead implements FileStoreRead { private boolean forceKeepDelete = false; - public KeyValueFileStoreRead( + public MergeFileSplitRead( CoreOptions options, TableSchema schema, RowType keyType, @@ -105,13 +110,13 @@ public KeyValueFileStoreRead( this.sequenceFields = options.sequenceField(); } - public KeyValueFileStoreRead withKeyProjection(@Nullable int[][] projectedFields) { + public MergeFileSplitRead withKeyProjection(@Nullable int[][] projectedFields) { readerFactoryBuilder.withKeyProjection(projectedFields); this.keyProjectedFields = projectedFields; return this; } - public KeyValueFileStoreRead withValueProjection(@Nullable int[][] projectedFields) { + public MergeFileSplitRead withValueProjection(@Nullable int[][] projectedFields) { if (projectedFields == null) { return this; } @@ -155,18 +160,22 @@ public KeyValueFileStoreRead withValueProjection(@Nullable int[][] projectedFiel return this; } - public KeyValueFileStoreRead withIOManager(IOManager ioManager) { + public MergeFileSplitRead withIOManager(IOManager ioManager) { this.mergeSorter.setIOManager(ioManager); return this; } - public KeyValueFileStoreRead forceKeepDelete() { + public MergeFileSplitRead forceKeepDelete() { this.forceKeepDelete = true; return this; } @Override - public FileStoreRead withFilter(Predicate predicate) { + public MergeFileSplitRead withFilter(Predicate predicate) { + if (predicate == null) { + return this; + } + List allFilters = new ArrayList<>(); List pkFilters = null; List primaryKeys = tableSchema.trimmedPrimaryKeys(); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java new file mode 100644 index 000000000000..c801dcaa87d8 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.deletionvectors.ApplyDeletionVectorReader; +import org.apache.paimon.deletionvectors.DeletionVector; +import org.apache.paimon.format.FileFormatDiscover; +import org.apache.paimon.format.FormatKey; +import org.apache.paimon.format.FormatReaderContext; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.DataFilePathFactory; +import org.apache.paimon.io.FileRecordReader; +import org.apache.paimon.mergetree.compact.ConcatRecordReader; +import org.apache.paimon.partition.PartitionUtils; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.schema.IndexCastMapping; +import org.apache.paimon.schema.SchemaEvolutionUtil; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.BulkFormatMapping; +import org.apache.paimon.utils.FileStorePathFactory; +import org.apache.paimon.utils.Pair; +import org.apache.paimon.utils.Projection; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.paimon.predicate.PredicateBuilder.splitAnd; + +/** A {@link SplitRead} to read raw file directly from {@link DataSplit}. */ +public class RawFileSplitRead implements SplitRead { + + private static final Logger LOG = LoggerFactory.getLogger(RawFileSplitRead.class); + + private final FileIO fileIO; + private final SchemaManager schemaManager; + private final TableSchema schema; + private final FileFormatDiscover formatDiscover; + private final FileStorePathFactory pathFactory; + private final Map bulkFormatMappings; + + private int[][] projection; + + @Nullable private List filters; + + public RawFileSplitRead( + FileIO fileIO, + SchemaManager schemaManager, + TableSchema schema, + RowType rowType, + FileFormatDiscover formatDiscover, + FileStorePathFactory pathFactory) { + this.fileIO = fileIO; + this.schemaManager = schemaManager; + this.schema = schema; + this.formatDiscover = formatDiscover; + this.pathFactory = pathFactory; + this.bulkFormatMappings = new HashMap<>(); + + this.projection = Projection.range(0, rowType.getFieldCount()).toNestedIndexes(); + } + + public RawFileSplitRead withProjection(int[][] projectedFields) { + if (projectedFields != null) { + projection = projectedFields; + } + return this; + } + + @Override + public RawFileSplitRead withFilter(Predicate predicate) { + if (predicate != null) { + this.filters = splitAnd(predicate); + } + return this; + } + + @Override + public RecordReader createReader(DataSplit split) throws IOException { + DataFilePathFactory dataFilePathFactory = + pathFactory.createDataFilePathFactory(split.partition(), split.bucket()); + List> suppliers = new ArrayList<>(); + if (split.beforeFiles().size() > 0) { + LOG.info("Ignore split before files: " + split.beforeFiles()); + } + + DeletionVector.Factory dvFactory = + DeletionVector.factory( + fileIO, split.dataFiles(), split.deletionFiles().orElse(null)); + + for (DataFileMeta file : split.dataFiles()) { + String formatIdentifier = DataFilePathFactory.formatIdentifier(file.fileName()); + BulkFormatMapping bulkFormatMapping = + bulkFormatMappings.computeIfAbsent( + new FormatKey(file.schemaId(), formatIdentifier), + this::createBulkFormatMapping); + + BinaryRow partition = split.partition(); + suppliers.add( + () -> + createFileReader( + partition, + file, + dataFilePathFactory, + bulkFormatMapping, + dvFactory)); + } + + return ConcatRecordReader.create(suppliers); + } + + private BulkFormatMapping createBulkFormatMapping(FormatKey key) { + TableSchema tableSchema = schema; + TableSchema dataSchema = + key.schemaId == schema.id() ? schema : schemaManager.schema(key.schemaId); + + // projection to data schema + int[][] dataProjection = + SchemaEvolutionUtil.createDataProjection( + tableSchema.fields(), dataSchema.fields(), projection); + + IndexCastMapping indexCastMapping = + SchemaEvolutionUtil.createIndexCastMapping( + Projection.of(projection).toTopLevelIndexes(), + tableSchema.fields(), + Projection.of(dataProjection).toTopLevelIndexes(), + dataSchema.fields()); + + List dataFilters = + this.schema.id() == key.schemaId + ? filters + : SchemaEvolutionUtil.createDataFilters( + tableSchema.fields(), dataSchema.fields(), filters); + + Pair partitionPair = null; + if (!dataSchema.partitionKeys().isEmpty()) { + Pair partitionMapping = + PartitionUtils.constructPartitionMapping(dataSchema, dataProjection); + // if partition fields are not selected, we just do nothing + if (partitionMapping != null) { + dataProjection = partitionMapping.getRight(); + partitionPair = + Pair.of( + partitionMapping.getLeft(), + dataSchema.projectedLogicalRowType(dataSchema.partitionKeys())); + } + } + + RowType projectedRowType = + Projection.of(dataProjection).project(dataSchema.logicalRowType()); + + return new BulkFormatMapping( + indexCastMapping.getIndexMapping(), + indexCastMapping.getCastMapping(), + partitionPair, + formatDiscover + .discover(key.format) + .createReaderFactory(projectedRowType, dataFilters)); + } + + private RecordReader createFileReader( + BinaryRow partition, + DataFileMeta file, + DataFilePathFactory dataFilePathFactory, + BulkFormatMapping bulkFormatMapping, + DeletionVector.Factory dvFactory) + throws IOException { + FileRecordReader fileRecordReader = + new FileRecordReader( + bulkFormatMapping.getReaderFactory(), + new FormatReaderContext( + fileIO, + dataFilePathFactory.toPath(file.fileName()), + file.fileSize()), + bulkFormatMapping.getIndexMapping(), + bulkFormatMapping.getCastMapping(), + PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition)); + + Optional deletionVector = dvFactory.create(file.fileName()); + if (deletionVector.isPresent() && !deletionVector.get().isEmpty()) { + return new ApplyDeletionVectorReader<>(fileRecordReader, deletionVector.get()); + } + return fileRecordReader; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java similarity index 93% rename from paimon-core/src/main/java/org/apache/paimon/operation/FileStoreRead.java rename to paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java index 2d3e121b1c45..a5b05e8f4a4c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java @@ -29,9 +29,9 @@ * * @param type of record to read. */ -public interface FileStoreRead { +public interface SplitRead { - FileStoreRead withFilter(Predicate predicate); + SplitRead withFilter(Predicate predicate); /** Create a {@link RecordReader} from split. */ RecordReader createReader(DataSplit split) throws IOException; diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java index 2eb41fdd50a7..810669f2e7f1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java @@ -24,11 +24,11 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.manifest.ManifestCacheFilter; -import org.apache.paimon.operation.AppendOnlyFileStoreRead; import org.apache.paimon.operation.AppendOnlyFileStoreScan; import org.apache.paimon.operation.AppendOnlyFileStoreWrite; import org.apache.paimon.operation.FileStoreScan; import org.apache.paimon.operation.Lock; +import org.apache.paimon.operation.RawFileSplitRead; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.TableSchema; @@ -112,8 +112,14 @@ protected BiConsumer nonPartitionFilterConsumer() { @Override public InnerTableRead newRead() { - AppendOnlyFileStoreRead read = store().newRead(); - return new AbstractDataTableRead(read, schema()) { + RawFileSplitRead read = store().newRead(); + return new AbstractDataTableRead(schema()) { + + @Override + protected InnerTableRead innerWithFilter(Predicate predicate) { + read.withFilter(predicate); + return this; + } @Override public void projection(int[][] projection) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java index f2ab323d272d..bf26ec31cf2f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java @@ -32,7 +32,6 @@ import org.apache.paimon.operation.Lock; import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; -import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.KeyValueFieldsExtractor; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.query.LocalTableQuery; @@ -42,7 +41,6 @@ import org.apache.paimon.table.source.KeyValueTableRead; import org.apache.paimon.table.source.MergeTreeSplitGenerator; import org.apache.paimon.table.source.SplitGenerator; -import org.apache.paimon.table.source.ValueContentRowDataRecordIterator; import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; @@ -157,25 +155,8 @@ protected BiConsumer nonPartitionFilterConsumer() { @Override public InnerTableRead newRead() { - return new KeyValueTableRead(store().newRead(), schema()) { - - @Override - public void projection(int[][] projection) { - read.withValueProjection(projection); - } - - @Override - protected RecordReader.RecordIterator rowDataRecordIteratorFromKv( - RecordReader.RecordIterator kvRecordIterator) { - return new ValueContentRowDataRecordIterator(kvRecordIterator); - } - - @Override - public InnerTableRead forceKeepDelete() { - read.forceKeepDelete(); - return this; - } - }; + return new KeyValueTableRead( + () -> store().newRead(), () -> store().newBatchRawFileRead(), schema()); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java index 930cddcd589d..bdb548d61954 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java @@ -21,7 +21,6 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.disk.IOManager; import org.apache.paimon.operation.DefaultValueAssigner; -import org.apache.paimon.operation.FileStoreRead; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateProjectionConverter; import org.apache.paimon.reader.RecordReader; @@ -34,15 +33,13 @@ /** A {@link InnerTableRead} for data table. */ public abstract class AbstractDataTableRead implements InnerTableRead { - private final FileStoreRead fileStoreRead; private final DefaultValueAssigner defaultValueAssigner; private int[][] projection; private boolean executeFilter = false; private Predicate predicate; - public AbstractDataTableRead(FileStoreRead fileStoreRead, TableSchema schema) { - this.fileStoreRead = fileStoreRead; + public AbstractDataTableRead(TableSchema schema) { this.defaultValueAssigner = schema == null ? null : DefaultValueAssigner.create(schema); } @@ -61,10 +58,11 @@ public final InnerTableRead withFilter(Predicate predicate) { if (defaultValueAssigner != null) { predicate = defaultValueAssigner.handlePredicate(predicate); } - fileStoreRead.withFilter(predicate); - return this; + return innerWithFilter(predicate); } + protected abstract InnerTableRead innerWithFilter(Predicate predicate); + @Override public TableRead executeFilter() { this.executeFilter = true; diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java index bb0354eee52e..f3f66c4aaf23 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java @@ -21,61 +21,120 @@ import org.apache.paimon.KeyValue; import org.apache.paimon.data.InternalRow; import org.apache.paimon.disk.IOManager; -import org.apache.paimon.operation.KeyValueFileStoreRead; +import org.apache.paimon.operation.MergeFileSplitRead; +import org.apache.paimon.operation.RawFileSplitRead; +import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.utils.LazyField; import javax.annotation.Nullable; import java.io.IOException; +import java.util.function.Supplier; /** - * An abstraction layer above {@link KeyValueFileStoreRead} to provide reading of {@link - * InternalRow}. + * An abstraction layer above {@link MergeFileSplitRead} to provide reading of {@link InternalRow}. */ -public abstract class KeyValueTableRead extends AbstractDataTableRead { +public final class KeyValueTableRead extends AbstractDataTableRead { - protected final KeyValueFileStoreRead read; + private final LazyField mergeRead; + private final LazyField batchRawRead; - protected KeyValueTableRead(KeyValueFileStoreRead read, TableSchema schema) { - super(read, schema); - // We don't need any key fields, the columns that need to be read are already included in - // the value - this.read = read.withKeyProjection(new int[0][]); - } + private int[][] projection = null; + private boolean forceKeepDelete = false; + private Predicate predicate = null; + private IOManager ioManager = null; - @Override - public TableRead withIOManager(IOManager ioManager) { - read.withIOManager(ioManager); - return this; + public KeyValueTableRead( + Supplier mergeReadSupplier, + Supplier batchRawReadSupplier, + TableSchema schema) { + super(schema); + this.mergeRead = new LazyField<>(() -> createMergeRead(mergeReadSupplier)); + this.batchRawRead = new LazyField<>(() -> createBatchRawRead(batchRawReadSupplier)); } - @Override - public final RecordReader reader(Split split) throws IOException { - return new RowDataRecordReader(read.createReader((DataSplit) split)); + private MergeFileSplitRead createMergeRead(Supplier readSupplier) { + MergeFileSplitRead read = + readSupplier + .get() + .withKeyProjection(new int[0][]) + .withValueProjection(projection) + .withFilter(predicate) + .withIOManager(ioManager); + if (forceKeepDelete) { + read = read.forceKeepDelete(); + } + return read; } - protected abstract RecordReader.RecordIterator rowDataRecordIteratorFromKv( - RecordReader.RecordIterator kvRecordIterator); + private RawFileSplitRead createBatchRawRead(Supplier readSupplier) { + return readSupplier.get().withProjection(projection).withFilter(predicate); + } - private class RowDataRecordReader implements RecordReader { + @Override + public void projection(int[][] projection) { + if (mergeRead.initialized()) { + mergeRead.get().withValueProjection(projection); + } + if (batchRawRead.initialized()) { + batchRawRead.get().withProjection(projection); + } + this.projection = projection; + } - private final RecordReader wrapped; + @Override + public InnerTableRead forceKeepDelete() { + if (mergeRead.initialized()) { + mergeRead.get().forceKeepDelete(); + } + this.forceKeepDelete = true; + return this; + } - private RowDataRecordReader(RecordReader wrapped) { - this.wrapped = wrapped; + @Override + protected InnerTableRead innerWithFilter(Predicate predicate) { + if (mergeRead.initialized()) { + mergeRead.get().withFilter(predicate); } + if (batchRawRead.initialized()) { + batchRawRead.get().withFilter(predicate); + } + this.predicate = predicate; + return this; + } - @Nullable - @Override - public RecordIterator readBatch() throws IOException { - RecordIterator batch = wrapped.readBatch(); - return batch == null ? null : rowDataRecordIteratorFromKv(batch); + @Override + public TableRead withIOManager(IOManager ioManager) { + if (mergeRead.initialized()) { + mergeRead.get().withIOManager(ioManager); } + this.ioManager = ioManager; + return this; + } - @Override - public void close() throws IOException { - wrapped.close(); + @Override + public RecordReader reader(Split split) throws IOException { + DataSplit dataSplit = (DataSplit) split; + if (!forceKeepDelete && !dataSplit.isStreaming() && split.convertToRawFiles().isPresent()) { + return batchRawRead.get().createReader(dataSplit); } + + RecordReader reader = mergeRead.get().createReader(dataSplit); + return new RecordReader() { + + @Nullable + @Override + public RecordIterator readBatch() throws IOException { + RecordIterator batch = reader.readBatch(); + return batch == null ? null : new ValueContentRowDataRecordIterator(batch); + } + + @Override + public void close() throws IOException { + reader.close(); + } + }; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java b/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java index 72b54ae6f648..1b2c6299ba0f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java @@ -22,7 +22,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.disk.IOManager; import org.apache.paimon.mergetree.compact.ConcatRecordReader; -import org.apache.paimon.operation.FileStoreRead; +import org.apache.paimon.operation.SplitRead; import org.apache.paimon.reader.RecordReader; import java.io.IOException; @@ -30,7 +30,7 @@ import java.util.List; /** - * An abstraction layer above {@link FileStoreRead} to provide reading of {@link InternalRow}. + * An abstraction layer above {@link SplitRead} to provide reading of {@link InternalRow}. * * @since 0.4.0 */ diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java index b20f89b940cd..6adc3aff04f8 100644 --- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java +++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java @@ -36,9 +36,9 @@ import org.apache.paimon.operation.AbstractFileStoreWrite; import org.apache.paimon.operation.FileStoreCommit; import org.apache.paimon.operation.FileStoreCommitImpl; -import org.apache.paimon.operation.FileStoreRead; import org.apache.paimon.operation.FileStoreScan; import org.apache.paimon.operation.Lock; +import org.apache.paimon.operation.SplitRead; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; import org.apache.paimon.reader.RecordReaderIterator; @@ -426,7 +426,7 @@ public List readKvsFromManifestEntries( } List kvs = new ArrayList<>(); - FileStoreRead read = newRead(); + SplitRead read = newRead(); for (Map.Entry>> entryWithPartition : filesPerPartitionAndBucket.entrySet()) { for (Map.Entry> entryWithBucket : diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreReadTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java similarity index 99% rename from paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreReadTest.java rename to paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java index c464825566ce..806c869f94f3 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreReadTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java @@ -65,8 +65,8 @@ import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.assertj.core.api.Assertions.assertThat; -/** Tests for {@link KeyValueFileStoreRead}. */ -public class KeyValueFileStoreReadTest { +/** Tests for {@link MergeFileSplitRead}. */ +public class MergeFileSplitReadTest { @TempDir java.nio.file.Path tempDir; @@ -226,7 +226,7 @@ private List writeThenRead( Map> filesGroupedByPartition = scan.withSnapshot(snapshotId).plan().files().stream() .collect(Collectors.groupingBy(ManifestEntry::partition)); - KeyValueFileStoreRead read = store.newRead(); + MergeFileSplitRead read = store.newRead(); if (keyProjection != null) { read.withKeyProjection(keyProjection); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java index 523033579594..99af52ce660f 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java @@ -124,9 +124,9 @@ public void testVariousChangelogProducer( expected, Arrays.asList( changelogRow("+I", 1, "v_1", "creation", "02-27"), - changelogRow("+U", 2, "v_2_nmu", "not_matched_upsert", "02-27"), - changelogRow("+U", 3, "v_3_nmu", "not_matched_upsert", "02-27"), - changelogRow("+U", 7, "Seven", "matched_upsert", "02-28"), + changelogRow("+I", 2, "v_2_nmu", "not_matched_upsert", "02-27"), + changelogRow("+I", 3, "v_3_nmu", "not_matched_upsert", "02-27"), + changelogRow("+I", 7, "Seven", "matched_upsert", "02-28"), changelogRow("+I", 8, "v_8", "insert", "02-29"), changelogRow("+I", 11, "v_11", "insert", "02-29"), changelogRow("+I", 12, "v_12", "insert", "02-29"))); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java index d51aefb962c5..7fd14b1adb86 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java @@ -25,6 +25,7 @@ import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.format.FileFormat; +import org.apache.paimon.format.FileFormatDiscover; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.io.DataFileMeta; @@ -32,16 +33,15 @@ import org.apache.paimon.memory.HeapMemorySegmentPool; import org.apache.paimon.memory.MemoryOwner; import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction; -import org.apache.paimon.operation.KeyValueFileStoreRead; import org.apache.paimon.operation.KeyValueFileStoreWrite; +import org.apache.paimon.operation.MergeFileSplitRead; +import org.apache.paimon.operation.RawFileSplitRead; import org.apache.paimon.options.Options; -import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.KeyValueFieldsExtractor; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.source.KeyValueTableRead; import org.apache.paimon.table.source.TableRead; -import org.apache.paimon.table.source.ValueContentRowDataRecordIterator; import org.apache.paimon.types.BigIntType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.IntType; @@ -60,7 +60,6 @@ import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.function.Function; import static java.util.Collections.singletonList; @@ -110,20 +109,13 @@ public TestChangelogDataReadWrite(String root) { } public TableRead createReadWithKey() { - return createRead(ValueContentRowDataRecordIterator::new); - } - - private TableRead createRead( - Function< - RecordReader.RecordIterator, - RecordReader.RecordIterator> - rowDataIteratorCreator) { SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(), tablePath); CoreOptions options = new CoreOptions(new HashMap<>()); - KeyValueFileStoreRead read = - new KeyValueFileStoreRead( + TableSchema schema = schemaManager.schema(0); + MergeFileSplitRead read = + new MergeFileSplitRead( options, - schemaManager.schema(0), + schema, KEY_TYPE, VALUE_TYPE, COMPARATOR, @@ -131,26 +123,22 @@ private TableRead createRead( KeyValueFileReaderFactory.builder( LocalFileIO.create(), schemaManager, - schemaManager.schema(0), + schema, KEY_TYPE, VALUE_TYPE, ignore -> avro, pathFactory, EXTRACTOR, options)); - return new KeyValueTableRead(read, null) { - - @Override - public void projection(int[][] projection) { - throw new UnsupportedOperationException(); - } - - @Override - protected RecordReader.RecordIterator rowDataRecordIteratorFromKv( - RecordReader.RecordIterator kvRecordIterator) { - return rowDataIteratorCreator.apply(kvRecordIterator); - } - }; + RawFileSplitRead rawFileRead = + new RawFileSplitRead( + LocalFileIO.create(), + schemaManager, + schema, + VALUE_TYPE, + FileFormatDiscover.of(options), + pathFactory); + return new KeyValueTableRead(() -> read, () -> rawFileRead, null); } public List writeFiles( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java index 2303ba1bc3ef..fc7e05d75da5 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java @@ -30,6 +30,7 @@ import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; import org.apache.flink.util.CloseableIterator; import javax.annotation.Nullable; @@ -261,15 +262,28 @@ public static void testBatchRead(String query, List expected) throws Except CloseableIterator resultItr = bEnv.executeSql(query).collect(); try (BlockingIterator iterator = BlockingIterator.of(resultItr)) { if (!expected.isEmpty()) { - assertThat( - iterator.collect( - expected.size(), TIME_OUT.getSize(), TIME_OUT.getUnit())) - .containsExactlyInAnyOrderElementsOf(expected); + List result = + iterator.collect(expected.size(), TIME_OUT.getSize(), TIME_OUT.getUnit()); + assertThat(toInsertOnlyRows(result)) + .containsExactlyInAnyOrderElementsOf(toInsertOnlyRows(expected)); } assertThat(resultItr.hasNext()).isFalse(); } } + private static List toInsertOnlyRows(List rows) { + List result = new ArrayList<>(); + for (Row row : rows) { + assertThat(row.getKind()).isIn(RowKind.INSERT, RowKind.UPDATE_AFTER); + Row newRow = new Row(row.getArity()); + for (int i = 0; i < row.getArity(); i++) { + newRow.setField(i, row.getField(i)); + } + result.add(newRow); + } + return result; + } + public static BlockingIterator testStreamingRead(String query, List expected) throws Exception { BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect());