Skip to content

Commit

Permalink
[core] Support dv with avro format (#3105)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy authored Mar 27, 2024
1 parent 3237e1a commit 629edfe
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
import java.util.function.Function;

/**
* Wrap {@link RecordReader.RecordIterator} to support returning the record's row position.
* Wrap {@link RecordReader.RecordIterator} to support returning the record's row position and file
* Path.
*
* @param <T> The type of the record.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,51 @@

package org.apache.paimon.utils;

import org.apache.paimon.fs.Path;
import org.apache.paimon.reader.FileRecordIterator;
import org.apache.paimon.reader.RecordReader;

import javax.annotation.Nullable;

import java.util.Iterator;

/** A simple {@link RecordReader.RecordIterator} that returns the elements of an iterator. */
public final class IteratorResultIterator<E> extends RecyclableIterator<E> {
public final class IteratorResultIterator<E> extends RecyclableIterator<E>
implements FileRecordIterator<E> {

private final Iterator<E> records;
private final Path filePath;
private long nextFilePos;

public IteratorResultIterator(final Iterator<E> records, final @Nullable Runnable recycler) {
public IteratorResultIterator(
final Iterator<E> records,
final @Nullable Runnable recycler,
final Path filePath,
long pos) {
super(recycler);
this.records = records;
this.filePath = filePath;
this.nextFilePos = pos;
}

@Nullable
@Override
public E next() {
if (records.hasNext()) {
nextFilePos++;
return records.next();
} else {
return null;
}
}

@Override
public long returnedPosition() {
return nextFilePos - 1;
}

@Override
public Path filePath() {
return filePath;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@
import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
import static org.apache.paimon.CoreOptions.FileFormatType.ORC;
import static org.apache.paimon.CoreOptions.FileFormatType.PARQUET;
import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN;
import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP;
import static org.apache.paimon.CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS;
Expand Down Expand Up @@ -473,10 +471,6 @@ private static void validateForDeletionVectors(TableSchema schema, CoreOptions o
!schema.primaryKeys().isEmpty(),
"Deletion vectors mode is only supported for tables with primary keys.");

checkArgument(
options.formatType().equals(ORC) || options.formatType().equals(PARQUET),
"Deletion vectors mode is only supported for orc or parquet file format now.");

checkArgument(
options.changelogProducer() == ChangelogProducer.NONE
|| options.changelogProducer() == ChangelogProducer.LOOKUP,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ private class AvroReader implements RecordReader<InternalRow> {

private final long end;
private final Pool<Object> pool;
private final Path filePath;
private long currentRowPosition;

private AvroReader(FileIO fileIO, Path path, long fileSize) throws IOException {
this.fileIO = fileIO;
Expand All @@ -69,6 +71,8 @@ private AvroReader(FileIO fileIO, Path path, long fileSize) throws IOException {
this.reader.sync(0);
this.pool = new Pool<>(1);
this.pool.add(new Object());
this.filePath = path;
this.currentRowPosition = 0;
}

private DataFileReader<InternalRow> createReaderFromPath(Path path, long fileSize)
Expand Down Expand Up @@ -101,8 +105,11 @@ public RecordIterator<InternalRow> readBatch() throws IOException {
return null;
}

long rowPosition = currentRowPosition;
currentRowPosition += reader.getBlockCount();
Iterator<InternalRow> iterator = new AvroBlockIterator(reader.getBlockCount(), reader);
return new IteratorResultIterator<>(iterator, () -> pool.recycler().recycle(ticket));
return new IteratorResultIterator<>(
iterator, () -> pool.recycler().recycle(ticket), filePath, rowPosition);
}

private boolean readNextBlock() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,35 @@

package org.apache.paimon.format.avro;

import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.format.FormatReaderContext;
import org.apache.paimon.format.FormatWriter;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.Options;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;

import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.IOException;
import java.util.ArrayList;
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;

/** Test for avro file format. */
public class AvroFileFormatTest {

@TempDir java.nio.file.Path tempPath;

private static AvroFileFormat fileFormat;

@BeforeAll
Expand Down Expand Up @@ -85,4 +101,31 @@ public void testSupportedComplexDataTypes() {
RowType rowType = new RowType(dataFields);
fileFormat.validateDataFields(rowType);
}

@Test
void testReadRowPosition() throws IOException {
RowType rowType = DataTypes.ROW(DataTypes.INT().notNull());
FileFormat format = new AvroFileFormat(new Options());

LocalFileIO fileIO = LocalFileIO.create();
Path file = new Path(new Path(tempPath.toUri()), UUID.randomUUID().toString());

try (PositionOutputStream out = fileIO.newOutputStream(file, false)) {
FormatWriter writer = format.createWriterFactory(rowType).create(out, null);
for (int i = 0; i < 1000000; i++) {
writer.addElement(GenericRow.of(i));
}
writer.flush();
writer.finish();
}

try (RecordReader<InternalRow> reader =
format.createReaderFactory(rowType)
.createReader(
new FormatReaderContext(
fileIO, file, fileIO.getFileSize(file))); ) {
reader.forEachRemainingWithPosition(
(rowPosition, row) -> assertThat(row.getInt(0) == rowPosition).isTrue());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class DeletionVectorTest extends PaimonSparkTestBase {
test("Paimon deletionVector: e2e random write") {
val bucket = Random.shuffle(Seq("-1", "1", "3")).head
val changelogProducer = Random.shuffle(Seq("none", "lookup")).head
val format = Random.shuffle(Seq("orc", "parquet")).head
val format = Random.shuffle(Seq("orc", "parquet", "avro")).head
val batchSize = Random.nextInt(1024) + 1

val dvTbl = "deletion_vector_tbl"
Expand Down

0 comments on commit 629edfe

Please sign in to comment.