Skip to content

Commit

Permalink
Fix1
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin committed Apr 1, 2024
1 parent 15e5f72 commit ef3043e
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.paimon.data.columnar.ColumnarRowIterator;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.FileUtils;
import org.apache.paimon.utils.ProjectedRow;

Expand All @@ -41,15 +40,13 @@ public class FileRecordReader implements RecordReader<InternalRow> {
@Nullable private final int[] indexMapping;
@Nullable private final PartitionInfo partitionInfo;
@Nullable private final CastFieldGetter[] castMapping;
private final boolean ignoreDelete;

public FileRecordReader(
FormatReaderFactory readerFactory,
FormatReaderFactory.Context context,
@Nullable int[] indexMapping,
@Nullable CastFieldGetter[] castMapping,
@Nullable PartitionInfo partitionInfo,
boolean ignoreDelete)
@Nullable PartitionInfo partitionInfo)
throws IOException {
try {
this.reader = readerFactory.createReader(context);
Expand All @@ -60,7 +57,6 @@ public FileRecordReader(
this.indexMapping = indexMapping;
this.partitionInfo = partitionInfo;
this.castMapping = castMapping;
this.ignoreDelete = ignoreDelete;
}

@Nullable
Expand Down Expand Up @@ -90,10 +86,6 @@ public RecordReader.RecordIterator<InternalRow> readBatch() throws IOException {
iterator = iterator.transform(castedRow::replaceRow);
}

if (ignoreDelete) {
iterator = iterator.filter(row -> RowKind.fromByteValue(row.getByte(1)).isAdd());
}

return iterator;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,18 @@ public class KeyValueDataFileRecordReader implements RecordReader<KeyValue> {
private final RecordReader<InternalRow> reader;
private final KeyValueSerializer serializer;
private final int level;
private final boolean ignoreDelete;

public KeyValueDataFileRecordReader(
RecordReader<InternalRow> reader, RowType keyType, RowType valueType, int level) {
RecordReader<InternalRow> reader,
RowType keyType,
RowType valueType,
int level,
boolean ignoreDelete) {
this.reader = reader;
this.serializer = new KeyValueSerializer(keyType, valueType);
this.level = level;
this.ignoreDelete = ignoreDelete;
}

@Nullable
Expand All @@ -50,11 +56,14 @@ public RecordIterator<KeyValue> readBatch() throws IOException {
return null;
}

return iterator.transform(
internalRow ->
internalRow == null
? null
: serializer.fromRow(internalRow).setLevel(level));
RecordIterator<KeyValue> transformed =
iterator.transform(
internalRow ->
internalRow == null
? null
: serializer.fromRow(internalRow).setLevel(level));

return ignoreDelete ? transformed.filter(KeyValue::isAdd) : transformed;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,16 +136,20 @@ private RecordReader<KeyValue> createRecordReader(
fileIO, filePath, fileSize, orcPoolSize),
bulkFormatMapping.getIndexMapping(),
bulkFormatMapping.getCastMapping(),
PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition),
CoreOptions.fromMap(schema.options()).ignoreDelete());
PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition));

Optional<DeletionVector> deletionVector = dvFactory.create(fileName);
if (deletionVector.isPresent() && !deletionVector.get().isEmpty()) {
fileRecordReader =
new ApplyDeletionVectorReader<>(fileRecordReader, deletionVector.get());
}

return new KeyValueDataFileRecordReader(fileRecordReader, keyType, valueType, level);
return new KeyValueDataFileRecordReader(
fileRecordReader,
keyType,
valueType,
level,
CoreOptions.fromMap(schema.options()).ignoreDelete());
}

public static Builder builder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.paimon.operation;

import org.apache.paimon.AppendOnlyFileStore;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FileFormatDiscover;
Expand Down Expand Up @@ -185,8 +184,7 @@ public RecordReader<InternalRow> createReader(DataSplit split) throws IOExceptio
bulkFormatMapping.getIndexMapping(),
bulkFormatMapping.getCastMapping(),
PartitionUtils.create(
bulkFormatMapping.getPartitionPair(), partition),
CoreOptions.fromMap(schema.options()).ignoreDelete()));
bulkFormatMapping.getPartitionPair(), partition)));
}

return ConcatRecordReader.create(suppliers);
Expand Down

0 comments on commit ef3043e

Please sign in to comment.