Skip to content

Commit

Permalink
[spark][core] Support input_file_name UDF (apache#3094)
Browse files Browse the repository at this point in the history
  • Loading branch information
YannByron authored and zhu3pang committed Mar 29, 2024
1 parent 096c433 commit cbe121b
Show file tree
Hide file tree
Showing 12 changed files with 345 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.PartitionInfo;
import org.apache.paimon.fs.Path;
import org.apache.paimon.reader.FileRecordIterator;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.reader.RecordWithPositionIterator;
import org.apache.paimon.utils.RecyclableIterator;
import org.apache.paimon.utils.VectorMappingUtils;

Expand All @@ -32,17 +33,19 @@
* {@link ColumnarRow#setRowId}.
*/
public class ColumnarRowIterator extends RecyclableIterator<InternalRow>
implements RecordWithPositionIterator<InternalRow> {
implements FileRecordIterator<InternalRow> {

private final Path filePath;
private final ColumnarRow rowData;
private final Runnable recycler;

private int num;
private int nextPos;
private long nextGlobalPos;

public ColumnarRowIterator(ColumnarRow rowData, @Nullable Runnable recycler) {
public ColumnarRowIterator(Path filePath, ColumnarRow rowData, @Nullable Runnable recycler) {
super(recycler);
this.filePath = filePath;
this.rowData = rowData;
this.recycler = recycler;
}
Expand Down Expand Up @@ -74,8 +77,14 @@ public long returnedPosition() {
return nextGlobalPos - 1;
}

@Override
public Path filePath() {
return this.filePath;
}

public ColumnarRowIterator copy(ColumnVector[] vectors) {
ColumnarRowIterator newIterator = new ColumnarRowIterator(rowData.copy(vectors), recycler);
ColumnarRowIterator newIterator =
new ColumnarRowIterator(filePath, rowData.copy(vectors), recycler);
newIterator.reset(num, nextGlobalPos);
return newIterator;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.reader;

import org.apache.paimon.fs.Path;
import org.apache.paimon.utils.Filter;

import javax.annotation.Nullable;
Expand All @@ -30,7 +31,7 @@
*
* @param <T> The type of the record.
*/
public interface RecordWithPositionIterator<T> extends RecordReader.RecordIterator<T> {
public interface FileRecordIterator<T> extends RecordReader.RecordIterator<T> {

/**
* Get the row position of the row returned by {@link RecordReader.RecordIterator#next}.
Expand All @@ -39,15 +40,23 @@ public interface RecordWithPositionIterator<T> extends RecordReader.RecordIterat
*/
long returnedPosition();

/** @return the file path */
Path filePath();

@Override
default <R> RecordWithPositionIterator<R> transform(Function<T, R> function) {
RecordWithPositionIterator<T> thisIterator = this;
return new RecordWithPositionIterator<R>() {
default <R> FileRecordIterator<R> transform(Function<T, R> function) {
FileRecordIterator<T> thisIterator = this;
return new FileRecordIterator<R>() {
@Override
public long returnedPosition() {
return thisIterator.returnedPosition();
}

@Override
public Path filePath() {
return thisIterator.filePath();
}

@Nullable
@Override
public R next() throws IOException {
Expand All @@ -66,14 +75,19 @@ public void releaseBatch() {
}

@Override
default RecordWithPositionIterator<T> filter(Filter<T> filter) {
RecordWithPositionIterator<T> thisIterator = this;
return new RecordWithPositionIterator<T>() {
default FileRecordIterator<T> filter(Filter<T> filter) {
FileRecordIterator<T> thisIterator = this;
return new FileRecordIterator<T>() {
@Override
public long returnedPosition() {
return thisIterator.returnedPosition();
}

@Override
public Path filePath() {
return thisIterator.filePath();
}

@Nullable
@Override
public T next() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,11 @@ default void forEachRemaining(Consumer<? super T> action) throws IOException {
*/
default void forEachRemainingWithPosition(BiConsumer<Long, ? super T> action)
throws IOException {
RecordWithPositionIterator<T> batch;
FileRecordIterator<T> batch;
T record;

try {
while ((batch = (RecordWithPositionIterator<T>) readBatch()) != null) {
while ((batch = (FileRecordIterator<T>) readBatch()) != null) {
while ((record = batch.next()) != null) {
action.accept(batch.returnedPosition(), record);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

package org.apache.paimon.deletionvectors;

import org.apache.paimon.reader.FileRecordIterator;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.reader.RecordWithPositionIterator;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -62,10 +62,10 @@ public RecordIterator<T> readBatch() throws IOException {
}

checkArgument(
batch instanceof RecordWithPositionIterator,
batch instanceof FileRecordIterator,
"There is a bug, RecordIterator in ApplyDeletionVectorReader must be RecordWithPositionIterator");

RecordWithPositionIterator<T> batchWithPosition = (RecordWithPositionIterator<T>) batch;
FileRecordIterator<T> batchWithPosition = (FileRecordIterator<T>) batch;

return batchWithPosition.filter(
a -> !deletionVector.isDeleted(batchWithPosition.returnedPosition()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
import org.apache.paimon.lookup.LookupStoreWriter;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.reader.FileRecordIterator;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.reader.RecordWithPositionIterator;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BloomFilter;
Expand Down Expand Up @@ -176,9 +176,8 @@ private LookupFile createLookupFile(DataFileMeta file) throws IOException {
try (RecordReader<KeyValue> reader = fileReaderFactory.apply(file)) {
KeyValue kv;
if (valueProcessor.withPosition()) {
RecordWithPositionIterator<KeyValue> batch;
while ((batch = (RecordWithPositionIterator<KeyValue>) reader.readBatch())
!= null) {
FileRecordIterator<KeyValue> batch;
while ((batch = (FileRecordIterator<KeyValue>) reader.readBatch()) != null) {
while ((kv = batch.next()) != null) {
byte[] keyBytes = keySerializer.serializeToBytes(kv.key());
byte[] valueBytes =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.paimon.format.fs.HadoopReadOnlyFileSystem;
import org.apache.paimon.format.orc.filter.OrcFilters;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.reader.RecordReader.RecordIterator;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;
Expand Down Expand Up @@ -94,7 +95,7 @@ public OrcVectorizedReader createReader(FormatReaderFactory.Context context)
context instanceof OrcFormatReaderContext
? ((OrcFormatReaderContext) context).poolSize()
: 1;
Pool<OrcReaderBatch> poolOfBatches = createPoolOfBatches(poolSize);
Pool<OrcReaderBatch> poolOfBatches = createPoolOfBatches(context.filePath(), poolSize);

RecordReader orcReader =
createRecordReader(
Expand All @@ -114,7 +115,7 @@ public OrcVectorizedReader createReader(FormatReaderFactory.Context context)
* conversion from the ORC representation to the result format.
*/
public OrcReaderBatch createReaderBatch(
VectorizedRowBatch orcBatch, Pool.Recycler<OrcReaderBatch> recycler) {
Path filePath, VectorizedRowBatch orcBatch, Pool.Recycler<OrcReaderBatch> recycler) {
List<String> tableFieldNames = tableType.getFieldNames();
List<DataType> tableFieldTypes = tableType.getFieldTypes();

Expand All @@ -125,17 +126,17 @@ public OrcReaderBatch createReaderBatch(
DataType type = tableFieldTypes.get(i);
vectors[i] = createPaimonVector(orcBatch.cols[tableFieldNames.indexOf(name)], type);
}
return new OrcReaderBatch(orcBatch, new VectorizedColumnBatch(vectors), recycler);
return new OrcReaderBatch(filePath, orcBatch, new VectorizedColumnBatch(vectors), recycler);
}

// ------------------------------------------------------------------------

private Pool<OrcReaderBatch> createPoolOfBatches(int numBatches) {
private Pool<OrcReaderBatch> createPoolOfBatches(Path filePath, int numBatches) {
final Pool<OrcReaderBatch> pool = new Pool<>(numBatches);

for (int i = 0; i < numBatches; i++) {
final VectorizedRowBatch orcBatch = createBatchWrapper(schema, batchSize / numBatches);
final OrcReaderBatch batch = createReaderBatch(orcBatch, pool.recycler());
final OrcReaderBatch batch = createReaderBatch(filePath, orcBatch, pool.recycler());
pool.add(batch);
}

Expand All @@ -153,14 +154,16 @@ private static class OrcReaderBatch {
private final ColumnarRowIterator result;

protected OrcReaderBatch(
final Path filePath,
final VectorizedRowBatch orcVectorizedRowBatch,
final VectorizedColumnBatch paimonColumnBatch,
final Pool.Recycler<OrcReaderBatch> recycler) {
this.orcVectorizedRowBatch = checkNotNull(orcVectorizedRowBatch);
this.recycler = checkNotNull(recycler);
this.paimonColumnBatch = paimonColumnBatch;
this.result =
new ColumnarRowIterator(new ColumnarRow(paimonColumnBatch), this::recycle);
new ColumnarRowIterator(
filePath, new ColumnarRow(paimonColumnBatch), this::recycle);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.paimon.format.parquet.reader.ColumnReader;
import org.apache.paimon.format.parquet.reader.ParquetDecimalVector;
import org.apache.paimon.format.parquet.reader.ParquetTimestampVector;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.reader.RecordReader.RecordIterator;
Expand Down Expand Up @@ -100,7 +101,8 @@ public ParquetReader createReader(FormatReaderFactory.Context context) throws IO

checkSchema(fileSchema, requestedSchema);

Pool<ParquetReaderBatch> poolOfBatches = createPoolOfBatches(requestedSchema);
Pool<ParquetReaderBatch> poolOfBatches =
createPoolOfBatches(context.filePath(), requestedSchema);

return new ParquetReader(reader, requestedSchema, reader.getRecordCount(), poolOfBatches);
}
Expand Down Expand Up @@ -174,21 +176,24 @@ private void checkSchema(MessageType fileSchema, MessageType requestedSchema)
}
}

private Pool<ParquetReaderBatch> createPoolOfBatches(MessageType requestedSchema) {
private Pool<ParquetReaderBatch> createPoolOfBatches(
Path filePath, MessageType requestedSchema) {
// In a VectorizedColumnBatch, the dictionary will be lazied deserialized.
// If there are multiple batches at the same time, there may be thread safety problems,
// because the deserialization of the dictionary depends on some internal structures.
// We need set poolCapacity to 1.
Pool<ParquetReaderBatch> pool = new Pool<>(1);
pool.add(createReaderBatch(requestedSchema, pool.recycler()));
pool.add(createReaderBatch(filePath, requestedSchema, pool.recycler()));
return pool;
}

private ParquetReaderBatch createReaderBatch(
MessageType requestedSchema, Pool.Recycler<ParquetReaderBatch> recycler) {
Path filePath,
MessageType requestedSchema,
Pool.Recycler<ParquetReaderBatch> recycler) {
WritableColumnVector[] writableVectors = createWritableVectors(requestedSchema);
VectorizedColumnBatch columnarBatch = createVectorizedColumnBatch(writableVectors);
return createReaderBatch(writableVectors, columnarBatch, recycler);
return createReaderBatch(filePath, writableVectors, columnarBatch, recycler);
}

private WritableColumnVector[] createWritableVectors(MessageType requestedSchema) {
Expand Down Expand Up @@ -361,10 +366,11 @@ public void close() throws IOException {
}

private ParquetReaderBatch createReaderBatch(
Path filePath,
WritableColumnVector[] writableVectors,
VectorizedColumnBatch columnarBatch,
Pool.Recycler<ParquetReaderBatch> recycler) {
return new ParquetReaderBatch(writableVectors, columnarBatch, recycler);
return new ParquetReaderBatch(filePath, writableVectors, columnarBatch, recycler);
}

private static class ParquetReaderBatch {
Expand All @@ -376,13 +382,16 @@ private static class ParquetReaderBatch {
private final ColumnarRowIterator result;

protected ParquetReaderBatch(
Path filePath,
WritableColumnVector[] writableVectors,
VectorizedColumnBatch columnarBatch,
Pool.Recycler<ParquetReaderBatch> recycler) {
this.writableVectors = writableVectors;
this.columnarBatch = columnarBatch;
this.recycler = recycler;
this.result = new ColumnarRowIterator(new ColumnarRow(columnarBatch), this::recycle);
this.result =
new ColumnarRowIterator(
filePath, new ColumnarRow(columnarBatch), this::recycle);
}

public void recycle() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ case class PaimonPartitionReader(

private lazy val iterator = {
val reader = readFunc(split)
new RecordReaderIterator[PaimonInternalRow](reader)
PaimonRecordReaderIterator(reader)
}

override def next(): Boolean = {
Expand Down
Loading

0 comments on commit cbe121b

Please sign in to comment.