Skip to content

Commit

Permalink
[arrow] Rename ArrowWriter to ArrowBatchConverter
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Aug 16, 2024
1 parent 974834d commit ad9be43
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
* limitations under the License.
*/

package org.apache.paimon.arrow.writer;
package org.apache.paimon.arrow.converter;

import org.apache.paimon.arrow.writer.ArrowFieldWriter;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.reader.RecordReader;

Expand All @@ -26,18 +27,18 @@
import javax.annotation.Nullable;

/**
* A reusable writer to convert Paimon data in {@link RecordReader.RecordIterator} into Arrow
* A reusable converter to convert Paimon data in {@link RecordReader.RecordIterator} into Arrow
* format.
*/
public abstract class ArrowWriter {
public abstract class ArrowBatchConverter {

// reusable
protected final VectorSchemaRoot root;
protected final ArrowFieldWriter[] fieldWriters;

protected RecordReader.RecordIterator<InternalRow> iterator;

ArrowWriter(VectorSchemaRoot root, ArrowFieldWriter[] fieldWriters) {
ArrowBatchConverter(VectorSchemaRoot root, ArrowFieldWriter[] fieldWriters) {
this.root = root;
this.fieldWriters = fieldWriters;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
* limitations under the License.
*/

package org.apache.paimon.arrow.writer;
package org.apache.paimon.arrow.converter;

import org.apache.paimon.arrow.writer.ArrowFieldWriter;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.reader.RecordReader;

Expand All @@ -27,11 +28,11 @@
import java.io.UncheckedIOException;

/** To convert iterator row by row. */
public class ArrowRowWriter extends ArrowWriter {
public class ArrowPerRowBatchConverter extends ArrowBatchConverter {

private InternalRow currentRow;

public ArrowRowWriter(VectorSchemaRoot root, ArrowFieldWriter[] fieldWriters) {
public ArrowPerRowBatchConverter(VectorSchemaRoot root, ArrowFieldWriter[] fieldWriters) {
super(root, fieldWriters);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
* limitations under the License.
*/

package org.apache.paimon.arrow.writer;
package org.apache.paimon.arrow.converter;

import org.apache.paimon.arrow.writer.ArrowFieldWriter;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.columnar.ColumnVector;
import org.apache.paimon.data.columnar.VectorizedColumnBatch;
Expand All @@ -32,14 +33,14 @@
import javax.annotation.Nullable;

/** To convert {@link VectorizedColumnBatch} to Arrow format. */
public class ArrowBatchWriter extends ArrowWriter {
public class ArrowVectorizedBatchConverter extends ArrowBatchConverter {

private VectorizedColumnBatch batch;
private @Nullable int[] pickedInColumn;
private int totalNumRows;
private int startIndex;

public ArrowBatchWriter(VectorSchemaRoot root, ArrowFieldWriter[] fieldWriters) {
public ArrowVectorizedBatchConverter(VectorSchemaRoot root, ArrowFieldWriter[] fieldWriters) {
super(root, fieldWriters);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
* limitations under the License.
*/

package org.apache.paimon.arrow.writer;
package org.apache.paimon.arrow.converter;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.arrow.ArrowUtils;
import org.apache.paimon.arrow.writer.ArrowFieldWriter;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
Expand Down Expand Up @@ -88,9 +89,9 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assumptions.assumeThat;

/** UT for {@link ArrowWriter}. */
/** UT for {@link ArrowBatchConverter}. */
@ExtendWith(ParameterizedTestExtension.class)
public class ArrowWriterTest {
public class ArrowBatchConverterTest {

private static final Random RND = ThreadLocalRandom.current();
private @TempDir java.nio.file.Path tempDir;
Expand Down Expand Up @@ -133,7 +134,7 @@ public class ArrowWriterTest {
PRIMITIVE_TYPE = new RowType(dataFields);
}

public ArrowWriterTest(String testMode) {
public ArrowBatchConverterTest(String testMode) {
this.testMode = testMode;
}

Expand Down Expand Up @@ -186,7 +187,7 @@ public void testPrimitiveTypes() throws Exception {
int numRows = expected.size();
try (RootAllocator allocator = new RootAllocator()) {
VectorSchemaRoot vsr = ArrowUtils.createVectorSchemaRoot(PRIMITIVE_TYPE, allocator);
ArrowWriter arrowWriter = createArrowWriter(iterator, PRIMITIVE_TYPE, vsr);
ArrowBatchConverter arrowWriter = createArrowWriter(iterator, PRIMITIVE_TYPE, vsr);
arrowWriter.next(numRows);
assertThat(vsr.getRowCount()).isEqualTo(numRows);

Expand Down Expand Up @@ -246,7 +247,7 @@ public void testArrayType() throws Exception {
getRecordIterator(nestedArrayType, rows);
try (RootAllocator allocator = new RootAllocator()) {
VectorSchemaRoot vsr = ArrowUtils.createVectorSchemaRoot(nestedArrayType, allocator);
ArrowWriter arrowWriter = createArrowWriter(iterator, nestedArrayType, vsr);
ArrowBatchConverter arrowWriter = createArrowWriter(iterator, nestedArrayType, vsr);
arrowWriter.next(numRows);
assertThat(vsr.getRowCount()).isEqualTo(numRows);

Expand Down Expand Up @@ -310,7 +311,7 @@ public void testMapType() throws Exception {
RecordReader.RecordIterator<InternalRow> iterator = getRecordIterator(nestedMapType, rows);
try (RootAllocator allocator = new RootAllocator()) {
VectorSchemaRoot vsr = ArrowUtils.createVectorSchemaRoot(nestedMapType, allocator);
ArrowWriter arrowWriter = createArrowWriter(iterator, nestedMapType, vsr);
ArrowBatchConverter arrowWriter = createArrowWriter(iterator, nestedMapType, vsr);
arrowWriter.next(numRows);
assertThat(vsr.getRowCount()).isEqualTo(numRows);

Expand Down Expand Up @@ -367,7 +368,7 @@ public void testMapRowType() throws Exception {
getRecordIterator(nestedMapRowType, Arrays.asList(row1, row2, row3));
try (RootAllocator allocator = new RootAllocator()) {
VectorSchemaRoot vsr = ArrowUtils.createVectorSchemaRoot(nestedMapRowType, allocator);
ArrowWriter arrowWriter = createArrowWriter(iterator, nestedMapRowType, vsr);
ArrowBatchConverter arrowWriter = createArrowWriter(iterator, nestedMapRowType, vsr);
arrowWriter.next(3);
assertThat(vsr.getRowCount()).isEqualTo(3);

Expand Down Expand Up @@ -425,7 +426,7 @@ private void testRowTypeImpl(boolean allNull) throws Exception {
RecordReader.RecordIterator<InternalRow> iterator = getRecordIterator(nestedRowType, rows);
try (RootAllocator allocator = new RootAllocator()) {
VectorSchemaRoot vsr = ArrowUtils.createVectorSchemaRoot(nestedRowType, allocator);
ArrowWriter arrowWriter = createArrowWriter(iterator, nestedRowType, vsr);
ArrowBatchConverter arrowWriter = createArrowWriter(iterator, nestedRowType, vsr);
arrowWriter.next(numRows);
assertThat(vsr.getRowCount()).isEqualTo(numRows);

Expand Down Expand Up @@ -466,7 +467,7 @@ public void testSliceIntType() throws Exception {
RecordReader.RecordIterator<InternalRow> iterator = getRecordIterator(rowType, rows);
try (RootAllocator allocator = new RootAllocator()) {
VectorSchemaRoot vsr = ArrowUtils.createVectorSchemaRoot(rowType, allocator);
ArrowWriter arrowWriter = createArrowWriter(iterator, rowType, vsr);
ArrowBatchConverter arrowWriter = createArrowWriter(iterator, rowType, vsr);

// write 3 times
arrowWriter.next(3);
Expand Down Expand Up @@ -521,7 +522,7 @@ public void testDvWithSimpleRowType() throws Exception {
try (RootAllocator allocator = new RootAllocator()) {
Set<Integer> expectedPks = getExpectedPks(numRows, deleted);
VectorSchemaRoot vsr = ArrowUtils.createVectorSchemaRoot(rowType, allocator);
ArrowWriter arrowWriter = createArrowWriter(iterator, rowType, vsr);
ArrowBatchConverter arrowWriter = createArrowWriter(iterator, rowType, vsr);
int expectedRowCount = rows.size() - deleted.size();
int averageBatchRows = RND.nextInt(expectedRowCount) + 1;
int readRows = 0;
Expand Down Expand Up @@ -591,7 +592,7 @@ public void testDvWithArrayType() throws Exception {
try (RootAllocator allocator = new RootAllocator()) {
Set<Integer> expectedPks = getExpectedPks(numRows, deleted);
VectorSchemaRoot vsr = ArrowUtils.createVectorSchemaRoot(nestedArrayType, allocator);
ArrowWriter arrowWriter = createArrowWriter(iterator, nestedArrayType, vsr);
ArrowBatchConverter arrowWriter = createArrowWriter(iterator, nestedArrayType, vsr);
int expectedRowCount = rows.size() - deleted.size();
int averageBatchRows = RND.nextInt(expectedRowCount) + 1;
int readRows = 0;
Expand Down Expand Up @@ -669,7 +670,7 @@ public void testDvWithMapType() throws Exception {
try (RootAllocator allocator = new RootAllocator()) {
Set<Integer> expectedPks = getExpectedPks(numRows, deleted);
VectorSchemaRoot vsr = ArrowUtils.createVectorSchemaRoot(nestedMapType, allocator);
ArrowWriter arrowWriter = createArrowWriter(iterator, nestedMapType, vsr);
ArrowBatchConverter arrowWriter = createArrowWriter(iterator, nestedMapType, vsr);
int expectedRowCount = rows.size() - deleted.size();
int averageBatchRows = RND.nextInt(expectedRowCount) + 1;
int readRows = 0;
Expand Down Expand Up @@ -738,12 +739,13 @@ public void testDvWithRowType() throws Exception {
try (RootAllocator allocator = new RootAllocator()) {
Set<Integer> expectedPks = getExpectedPks(numRows, deleted);
VectorSchemaRoot vsr = ArrowUtils.createVectorSchemaRoot(nestedRowType, allocator);
ArrowWriter arrowWriter = createArrowWriter(iterator, nestedRowType, vsr);
ArrowBatchConverter arrowBatchConverter =
createArrowWriter(iterator, nestedRowType, vsr);
int expectedRowCount = rows.size() - deleted.size();
int averageBatchRows = RND.nextInt(expectedRowCount) + 1;
int readRows = 0;
Set<Integer> readPks = new HashSet<>();
while ((vsr = arrowWriter.next(averageBatchRows)) != null) {
while ((vsr = arrowBatchConverter.next(averageBatchRows)) != null) {
readRows += vsr.getRowCount();
// validate current result
IntVector pkVector = (IntVector) vsr.getVector(0);
Expand All @@ -766,7 +768,7 @@ public void testDvWithRowType() throws Exception {
}
assertThat(readRows).isEqualTo(expectedRowCount);
assertThat(readPks).isEqualTo(expectedPks);
arrowWriter.close();
arrowBatchConverter.close();
}
}

Expand All @@ -791,12 +793,12 @@ private void testReadEmpty(
try (RootAllocator allocator = new RootAllocator()) {
RowType rowType = RowType.of();
VectorSchemaRoot vsr = ArrowUtils.createVectorSchemaRoot(rowType, allocator);
ArrowWriter arrowWriter = createArrowWriter(iterator, rowType, vsr);
arrowWriter.next(expectedRowCount);
ArrowBatchConverter arrowBatchConverter = createArrowWriter(iterator, rowType, vsr);
arrowBatchConverter.next(expectedRowCount);
assertThat(vsr.getRowCount()).isEqualTo(expectedRowCount);
assertThat(vsr.getSchema().getFields()).isEmpty();
assertThat(vsr.getFieldVectors()).isEmpty();
arrowWriter.close();
arrowBatchConverter.close();
}
}

Expand Down Expand Up @@ -874,21 +876,23 @@ private FileStoreTable createFileStoreTable(
return (FileStoreTable) catalog.getTable(identifier);
}

private ArrowWriter createArrowWriter(
private ArrowBatchConverter createArrowWriter(
RecordReader.RecordIterator<InternalRow> iterator,
RowType rowType,
VectorSchemaRoot vsr) {
ArrowFieldWriter[] fieldWriters = ArrowUtils.createArrowFieldWriters(vsr, rowType);
if (testMode.equals("vectorized_without_dv")) {
ArrowBatchWriter batchWriter = new ArrowBatchWriter(vsr, fieldWriters);
ArrowVectorizedBatchConverter batchWriter =
new ArrowVectorizedBatchConverter(vsr, fieldWriters);
batchWriter.reset((VectorizedRecordIterator) iterator);
return batchWriter;
} else if (testMode.equals("vectorized_with_dv")) {
ArrowBatchWriter batchWriter = new ArrowBatchWriter(vsr, fieldWriters);
ArrowVectorizedBatchConverter batchWriter =
new ArrowVectorizedBatchConverter(vsr, fieldWriters);
batchWriter.reset((ApplyDeletionFileRecordIterator) iterator);
return batchWriter;
} else {
ArrowRowWriter rowWriter = new ArrowRowWriter(vsr, fieldWriters);
ArrowPerRowBatchConverter rowWriter = new ArrowPerRowBatchConverter(vsr, fieldWriters);
rowWriter.reset(iterator);
return rowWriter;
}
Expand Down

0 comments on commit ad9be43

Please sign in to comment.