From 629edfed75622f19fef55d620660530970118c94 Mon Sep 17 00:00:00 2001 From: Zouxxyy Date: Wed, 27 Mar 2024 16:26:41 +0800 Subject: [PATCH] [core] Support dv with avro format (#3105) --- .../paimon/reader/FileRecordIterator.java | 3 +- .../paimon/utils/IteratorResultIterator.java | 26 ++++++++++- .../paimon/schema/SchemaValidation.java | 6 --- .../paimon/format/avro/AvroBulkFormat.java | 9 +++- .../format/avro/AvroFileFormatTest.java | 43 +++++++++++++++++++ .../paimon/spark/sql/DeletionVectorTest.scala | 2 +- 6 files changed, 78 insertions(+), 11 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/reader/FileRecordIterator.java b/paimon-common/src/main/java/org/apache/paimon/reader/FileRecordIterator.java index 0cef8cc001e6..d22b27053f98 100644 --- a/paimon-common/src/main/java/org/apache/paimon/reader/FileRecordIterator.java +++ b/paimon-common/src/main/java/org/apache/paimon/reader/FileRecordIterator.java @@ -27,7 +27,8 @@ import java.util.function.Function; /** - * Wrap {@link RecordReader.RecordIterator} to support returning the record's row position. + * Wrap {@link RecordReader.RecordIterator} to support returning the record's row position and file + * Path. * * @param The type of the record. */ diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/IteratorResultIterator.java b/paimon-common/src/main/java/org/apache/paimon/utils/IteratorResultIterator.java index a2bffd31da05..cb42a371fa2f 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/IteratorResultIterator.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/IteratorResultIterator.java @@ -18,6 +18,8 @@ package org.apache.paimon.utils; +import org.apache.paimon.fs.Path; +import org.apache.paimon.reader.FileRecordIterator; import org.apache.paimon.reader.RecordReader; import javax.annotation.Nullable; @@ -25,22 +27,42 @@ import java.util.Iterator; /** A simple {@link RecordReader.RecordIterator} that returns the elements of an iterator. */ -public final class IteratorResultIterator extends RecyclableIterator { +public final class IteratorResultIterator extends RecyclableIterator + implements FileRecordIterator { private final Iterator records; + private final Path filePath; + private long nextFilePos; - public IteratorResultIterator(final Iterator records, final @Nullable Runnable recycler) { + public IteratorResultIterator( + final Iterator records, + final @Nullable Runnable recycler, + final Path filePath, + long pos) { super(recycler); this.records = records; + this.filePath = filePath; + this.nextFilePos = pos; } @Nullable @Override public E next() { if (records.hasNext()) { + nextFilePos++; return records.next(); } else { return null; } } + + @Override + public long returnedPosition() { + return nextFilePos - 1; + } + + @Override + public Path filePath() { + return filePath; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index 7e59bb7b51b8..18c95cd2f048 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -49,8 +49,6 @@ import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER; import static org.apache.paimon.CoreOptions.FIELDS_PREFIX; import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS; -import static org.apache.paimon.CoreOptions.FileFormatType.ORC; -import static org.apache.paimon.CoreOptions.FileFormatType.PARQUET; import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN; import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP; import static org.apache.paimon.CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS; @@ -473,10 +471,6 @@ private static void validateForDeletionVectors(TableSchema schema, CoreOptions o !schema.primaryKeys().isEmpty(), "Deletion vectors mode is only supported for tables with primary keys."); - checkArgument( - options.formatType().equals(ORC) || options.formatType().equals(PARQUET), - "Deletion vectors mode is only supported for orc or parquet file format now."); - checkArgument( options.changelogProducer() == ChangelogProducer.NONE || options.changelogProducer() == ChangelogProducer.LOOKUP, diff --git a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java index abf82342a5aa..0838912141c1 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java @@ -61,6 +61,8 @@ private class AvroReader implements RecordReader { private final long end; private final Pool pool; + private final Path filePath; + private long currentRowPosition; private AvroReader(FileIO fileIO, Path path, long fileSize) throws IOException { this.fileIO = fileIO; @@ -69,6 +71,8 @@ private AvroReader(FileIO fileIO, Path path, long fileSize) throws IOException { this.reader.sync(0); this.pool = new Pool<>(1); this.pool.add(new Object()); + this.filePath = path; + this.currentRowPosition = 0; } private DataFileReader createReaderFromPath(Path path, long fileSize) @@ -101,8 +105,11 @@ public RecordIterator readBatch() throws IOException { return null; } + long rowPosition = currentRowPosition; + currentRowPosition += reader.getBlockCount(); Iterator iterator = new AvroBlockIterator(reader.getBlockCount(), reader); - return new IteratorResultIterator<>(iterator, () -> pool.recycler().recycle(ticket)); + return new IteratorResultIterator<>( + iterator, () -> pool.recycler().recycle(ticket), filePath, rowPosition); } private boolean readNextBlock() throws IOException { diff --git a/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFileFormatTest.java b/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFileFormatTest.java index 0aa15f7f227a..e9caf3b24b91 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFileFormatTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFileFormatTest.java @@ -18,19 +18,35 @@ package org.apache.paimon.format.avro; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.format.FileFormat; +import org.apache.paimon.format.FormatReaderContext; +import org.apache.paimon.format.FormatWriter; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.PositionOutputStream; +import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.options.Options; +import org.apache.paimon.reader.RecordReader; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import java.io.IOException; import java.util.ArrayList; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; /** Test for avro file format. */ public class AvroFileFormatTest { + @TempDir java.nio.file.Path tempPath; + private static AvroFileFormat fileFormat; @BeforeAll @@ -85,4 +101,31 @@ public void testSupportedComplexDataTypes() { RowType rowType = new RowType(dataFields); fileFormat.validateDataFields(rowType); } + + @Test + void testReadRowPosition() throws IOException { + RowType rowType = DataTypes.ROW(DataTypes.INT().notNull()); + FileFormat format = new AvroFileFormat(new Options()); + + LocalFileIO fileIO = LocalFileIO.create(); + Path file = new Path(new Path(tempPath.toUri()), UUID.randomUUID().toString()); + + try (PositionOutputStream out = fileIO.newOutputStream(file, false)) { + FormatWriter writer = format.createWriterFactory(rowType).create(out, null); + for (int i = 0; i < 1000000; i++) { + writer.addElement(GenericRow.of(i)); + } + writer.flush(); + writer.finish(); + } + + try (RecordReader reader = + format.createReaderFactory(rowType) + .createReader( + new FormatReaderContext( + fileIO, file, fileIO.getFileSize(file))); ) { + reader.forEachRemainingWithPosition( + (rowPosition, row) -> assertThat(row.getInt(0) == rowPosition).isTrue()); + } + } } diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala index 887b6c8dfe61..f45a22caee2d 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala @@ -101,7 +101,7 @@ class DeletionVectorTest extends PaimonSparkTestBase { test("Paimon deletionVector: e2e random write") { val bucket = Random.shuffle(Seq("-1", "1", "3")).head val changelogProducer = Random.shuffle(Seq("none", "lookup")).head - val format = Random.shuffle(Seq("orc", "parquet")).head + val format = Random.shuffle(Seq("orc", "parquet", "avro")).head val batchSize = Random.nextInt(1024) + 1 val dvTbl = "deletion_vector_tbl"