From f11768e0a9d5399f35a1e29027c41dfa44d6d120 Mon Sep 17 00:00:00 2001 From: yuzelin Date: Thu, 15 Aug 2024 16:51:46 +0800 Subject: [PATCH 1/6] [arrow] Introduce paimon-arrow module to provide utils to convert paimon Row to Arrow FieldVector --- paimon-arrow/pom.xml | 64 ++ .../apache/paimon/arrow/ArrowBatchWriter.java | 95 +++ .../apache/paimon/arrow/ArrowFieldWriter.java | 86 ++ .../paimon/arrow/ArrowFieldWriterFactory.java | 28 + .../paimon/arrow/ArrowFieldWriters.java | 783 ++++++++++++++++++ .../apache/paimon/arrow/ArrowRowWriter.java | 67 ++ .../org/apache/paimon/arrow/ArrowUtils.java | 94 +++ .../org/apache/paimon/arrow/ArrowWriter.java | 74 ++ .../paimon/arrow/ToFieldTypeVisitor.java | 158 ++++ pom.xml | 1 + 10 files changed, 1450 insertions(+) create mode 100644 paimon-arrow/pom.xml create mode 100644 paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowBatchWriter.java create mode 100644 paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldWriter.java create mode 100644 paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldWriterFactory.java create mode 100644 paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldWriters.java create mode 100644 paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowRowWriter.java create mode 100644 paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java create mode 100644 paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowWriter.java create mode 100644 paimon-arrow/src/main/java/org/apache/paimon/arrow/ToFieldTypeVisitor.java diff --git a/paimon-arrow/pom.xml b/paimon-arrow/pom.xml new file mode 100644 index 000000000000..2f5c12ade2b4 --- /dev/null +++ b/paimon-arrow/pom.xml @@ -0,0 +1,64 @@ + + + + 4.0.0 + + + paimon-parent + org.apache.paimon + 0.9-SNAPSHOT + + + paimon-arrow + Paimon : Arrow + + + 14.0.0 + + + + + + + org.apache.paimon + paimon-core + ${project.version} + provided + + + + org.apache.paimon + paimon-common + ${project.version} + provided + + + + org.apache.arrow + arrow-vector + ${arrow.version} + provided + + + + + diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowBatchWriter.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowBatchWriter.java new file mode 100644 index 000000000000..8bb963929b2d --- /dev/null +++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowBatchWriter.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.arrow; + +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.columnar.ColumnVector; +import org.apache.paimon.data.columnar.VectorizedColumnBatch; +import org.apache.paimon.deletionvectors.ApplyDeletionFileRecordIterator; +import org.apache.paimon.deletionvectors.DeletionVector; +import org.apache.paimon.reader.FileRecordIterator; +import org.apache.paimon.reader.VectorizedRecordIterator; +import org.apache.paimon.utils.IntArrayList; + +import org.apache.arrow.vector.VectorSchemaRoot; + +import javax.annotation.Nullable; + +/** To convert {@link VectorizedColumnBatch} to Arrow format. */ +public class ArrowBatchWriter extends ArrowWriter { + + private VectorizedColumnBatch batch; + private @Nullable int[] pickedInColumn; + private int totalNumRows; + private int startIndex; + + public ArrowBatchWriter(VectorSchemaRoot root, ArrowFieldWriter[] fieldWriters) { + super(root, fieldWriters); + } + + @Override + public void doWrite(int maxBatchRows) { + int batchRows = Math.min(maxBatchRows, totalNumRows - startIndex); + ColumnVector[] columns = batch.columns; + for (int i = 0; i < columns.length; i++) { + fieldWriters[i].write(columns[i], pickedInColumn, startIndex, batchRows); + } + root.setRowCount(batchRows); + + startIndex += batchRows; + if (startIndex >= totalNumRows) { + releaseIterator(); + } + } + + public void reset(ApplyDeletionFileRecordIterator iterator) { + this.iterator = iterator; + + FileRecordIterator innerIterator = iterator.iterator(); + this.batch = ((VectorizedRecordIterator) innerIterator).batch(); + + long firstReturnedPosition = innerIterator.returnedPosition() + 1; + DeletionVector deletionVector = iterator.deletionVector(); + int originNumRows = this.batch.getNumRows(); + IntArrayList picked = new IntArrayList(originNumRows); + for (int i = 0; i < originNumRows; i++) { + long returnedPosition = firstReturnedPosition + i; + if (!deletionVector.isDeleted(returnedPosition)) { + picked.add(i); + } + } + if (picked.size() == originNumRows) { + this.pickedInColumn = null; + this.totalNumRows = originNumRows; + } else { + this.pickedInColumn = picked.toArray(); + this.totalNumRows = this.pickedInColumn.length; + } + + this.startIndex = 0; + } + + public void reset(VectorizedRecordIterator iterator) { + this.iterator = iterator; + this.batch = iterator.batch(); + this.pickedInColumn = null; + this.totalNumRows = this.batch.getNumRows(); + this.startIndex = 0; + } +} diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldWriter.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldWriter.java new file mode 100644 index 000000000000..5b7d32c4375a --- /dev/null +++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldWriter.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.arrow; + +import org.apache.paimon.data.DataGetters; +import org.apache.paimon.data.columnar.ColumnVector; + +import org.apache.arrow.vector.FieldVector; + +import javax.annotation.Nullable; + +/** A reusable writer to convert a field into Arrow {@link FieldVector}. */ +public abstract class ArrowFieldWriter { + + // reusable + protected final FieldVector fieldVector; + + public ArrowFieldWriter(FieldVector fieldVector) { + this.fieldVector = fieldVector; + } + + /** Reset the state of the writer to write the next batch of fields. */ + public void reset() { + fieldVector.reset(); + } + + /** + * Write all data of a {@link ColumnVector}. + * + * @param columnVector Which holds the paimon data. + * @param pickedInColumn Which rows is picked to write. Pick all if null. This is used to adapt + * deletion vector. + * @param startIndex From where to start writing. + * @param batchRows How many rows to write. + */ + public void write( + ColumnVector columnVector, + @Nullable int[] pickedInColumn, + int startIndex, + int batchRows) { + doWrite(columnVector, pickedInColumn, startIndex, batchRows); + fieldVector.setValueCount(batchRows); + } + + protected abstract void doWrite( + ColumnVector columnVector, + @Nullable int[] pickedInColumn, + int startIndex, + int batchRows); + + /** Get the value from the row at the given position and write to specified row index. */ + public void write(int rowIndex, DataGetters getters, int pos) { + if (getters.isNullAt(pos)) { + fieldVector.setNull(rowIndex); + } else { + doWrite(rowIndex, getters, pos); + } + fieldVector.setValueCount(fieldVector.getValueCount() + 1); + } + + protected abstract void doWrite(int rowIndex, DataGetters getters, int pos); + + protected int getRowNumber(int startIndex, int currentIndex, @Nullable int[] pickedInColumn) { + int row = currentIndex + startIndex; + if (pickedInColumn != null) { + row = pickedInColumn[row]; + } + return row; + } +} diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldWriterFactory.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldWriterFactory.java new file mode 100644 index 000000000000..6b1672cf18e5 --- /dev/null +++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldWriterFactory.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.arrow; + +import org.apache.arrow.vector.FieldVector; + +/** Factory to create {@link ArrowFieldWriter}. */ +@FunctionalInterface +public interface ArrowFieldWriterFactory { + + ArrowFieldWriter create(FieldVector fieldVector); +} diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldWriters.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldWriters.java new file mode 100644 index 000000000000..38de5a13eacc --- /dev/null +++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldWriters.java @@ -0,0 +1,783 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.arrow; + +import org.apache.paimon.data.DataGetters; +import org.apache.paimon.data.InternalArray; +import org.apache.paimon.data.InternalMap; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.data.columnar.ArrayColumnVector; +import org.apache.paimon.data.columnar.BooleanColumnVector; +import org.apache.paimon.data.columnar.ByteColumnVector; +import org.apache.paimon.data.columnar.BytesColumnVector; +import org.apache.paimon.data.columnar.ColumnVector; +import org.apache.paimon.data.columnar.DecimalColumnVector; +import org.apache.paimon.data.columnar.DoubleColumnVector; +import org.apache.paimon.data.columnar.FloatColumnVector; +import org.apache.paimon.data.columnar.IntColumnVector; +import org.apache.paimon.data.columnar.LongColumnVector; +import org.apache.paimon.data.columnar.MapColumnVector; +import org.apache.paimon.data.columnar.RowColumnVector; +import org.apache.paimon.data.columnar.ShortColumnVector; +import org.apache.paimon.data.columnar.TimestampColumnVector; +import org.apache.paimon.data.columnar.VectorizedColumnBatch; +import org.apache.paimon.utils.IntArrayList; + +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.DateDayVector; +import org.apache.arrow.vector.DecimalVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.Float4Vector; +import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.SmallIntVector; +import org.apache.arrow.vector.TimeMilliVector; +import org.apache.arrow.vector.TimeStampNanoVector; +import org.apache.arrow.vector.TinyIntVector; +import org.apache.arrow.vector.VarBinaryVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.complex.ListVector; +import org.apache.arrow.vector.complex.MapVector; +import org.apache.arrow.vector.complex.StructVector; + +import javax.annotation.Nullable; + +import java.math.BigDecimal; +import java.time.Instant; +import java.time.ZoneId; + +/** Registry of {@link ArrowFieldWriter}s. */ +public class ArrowFieldWriters { + + /** Writer for CHAR & VARCHAR. */ + public static class StringWriter extends ArrowFieldWriter { + + public StringWriter(FieldVector fieldVector) { + super(fieldVector); + } + + @Override + protected void doWrite( + ColumnVector columnVector, + @Nullable int[] pickedInColumn, + int startIndex, + int batchRows) { + VarCharVector varCharVector = (VarCharVector) fieldVector; + for (int i = 0; i < batchRows; i++) { + int row = getRowNumber(startIndex, i, pickedInColumn); + if (columnVector.isNullAt(row)) { + varCharVector.setNull(row); + } else { + byte[] value = ((BytesColumnVector) columnVector).getBytes(row).getBytes(); + varCharVector.setSafe(i, value); + } + } + } + + @Override + protected void doWrite(int rowIndex, DataGetters getters, int pos) { + ((VarCharVector) fieldVector).setSafe(rowIndex, getters.getString(pos).toBytes()); + } + } + + /** Writer for BOOLEAN. */ + public static class BooleanWriter extends ArrowFieldWriter { + + public BooleanWriter(FieldVector fieldVector) { + super(fieldVector); + } + + @Override + protected void doWrite( + ColumnVector columnVector, + @Nullable int[] pickedInColumn, + int startIndex, + int batchRows) { + BitVector bitVector = (BitVector) fieldVector; + for (int i = 0; i < batchRows; i++) { + int row = getRowNumber(startIndex, i, pickedInColumn); + if (columnVector.isNullAt(row)) { + bitVector.setNull(row); + } else { + int value = ((BooleanColumnVector) columnVector).getBoolean(row) ? 1 : 0; + bitVector.setSafe(i, value); + } + } + } + + @Override + protected void doWrite(int rowIndex, DataGetters getters, int pos) { + ((BitVector) fieldVector).setSafe(rowIndex, getters.getBoolean(pos) ? 1 : 0); + } + } + + /** Writer for BINARY & VARBINARY. */ + public static class BinaryWriter extends ArrowFieldWriter { + + public BinaryWriter(FieldVector fieldVector) { + super(fieldVector); + } + + @Override + protected void doWrite( + ColumnVector columnVector, + @Nullable int[] pickedInColumn, + int startIndex, + int batchRows) { + VarBinaryVector varBinaryVector = (VarBinaryVector) fieldVector; + for (int i = 0; i < batchRows; i++) { + int row = getRowNumber(startIndex, i, pickedInColumn); + if (columnVector.isNullAt(row)) { + varBinaryVector.setNull(i); + } else { + byte[] value = ((BytesColumnVector) columnVector).getBytes(row).getBytes(); + varBinaryVector.setSafe(i, value); + } + } + } + + @Override + protected void doWrite(int rowIndex, DataGetters getters, int pos) { + ((VarBinaryVector) fieldVector).setSafe(rowIndex, getters.getBinary(pos)); + } + } + + /** Writer for DECIMAL. */ + public static class DecimalWriter extends ArrowFieldWriter { + + private final int precision; + private final int scale; + + public DecimalWriter(FieldVector fieldVector, int precision, int scale) { + super(fieldVector); + this.precision = precision; + this.scale = scale; + } + + @Override + protected void doWrite( + ColumnVector columnVector, + @Nullable int[] pickedInColumn, + int startIndex, + int batchRows) { + DecimalVector decimalVector = (DecimalVector) fieldVector; + for (int i = 0; i < batchRows; i++) { + int row = getRowNumber(startIndex, i, pickedInColumn); + if (columnVector.isNullAt(row)) { + decimalVector.setNull(i); + } else { + BigDecimal value = + ((DecimalColumnVector) columnVector) + .getDecimal(row, precision, scale) + .toBigDecimal(); + decimalVector.setSafe(i, value); + } + } + } + + @Override + protected void doWrite(int rowIndex, DataGetters getters, int pos) { + ((DecimalVector) fieldVector) + .setSafe(rowIndex, getters.getDecimal(pos, precision, scale).toBigDecimal()); + } + } + + /** Writer for TINYINT. */ + public static class TinyIntWriter extends ArrowFieldWriter { + + public TinyIntWriter(FieldVector fieldVector) { + super(fieldVector); + } + + @Override + protected void doWrite( + ColumnVector columnVector, + @Nullable int[] pickedInColumn, + int startIndex, + int batchRows) { + TinyIntVector tinyIntVector = (TinyIntVector) fieldVector; + for (int i = 0; i < batchRows; i++) { + int row = getRowNumber(startIndex, i, pickedInColumn); + if (columnVector.isNullAt(row)) { + tinyIntVector.setNull(i); + } else { + byte value = ((ByteColumnVector) columnVector).getByte(row); + tinyIntVector.setSafe(i, value); + } + } + } + + @Override + protected void doWrite(int rowIndex, DataGetters getters, int pos) { + ((TinyIntVector) fieldVector).setSafe(rowIndex, getters.getByte(pos)); + } + } + + /** Writer for SMALLINT. */ + public static class SmallIntWriter extends ArrowFieldWriter { + + public SmallIntWriter(FieldVector fieldVector) { + super(fieldVector); + } + + @Override + protected void doWrite( + ColumnVector columnVector, + @Nullable int[] pickedInColumn, + int startIndex, + int batchRows) { + SmallIntVector smallIntVector = (SmallIntVector) fieldVector; + for (int i = 0; i < batchRows; i++) { + int row = getRowNumber(startIndex, i, pickedInColumn); + if (columnVector.isNullAt(row)) { + smallIntVector.setNull(i); + } else { + short value = ((ShortColumnVector) columnVector).getShort(row); + smallIntVector.setSafe(i, value); + } + } + } + + @Override + protected void doWrite(int rowIndex, DataGetters getters, int pos) { + ((SmallIntVector) fieldVector).setSafe(rowIndex, getters.getShort(pos)); + } + } + + /** Writer for INT. */ + public static class IntWriter extends ArrowFieldWriter { + + public IntWriter(FieldVector fieldVector) { + super(fieldVector); + } + + @Override + protected void doWrite( + ColumnVector columnVector, + @Nullable int[] pickedInColumn, + int startIndex, + int batchRows) { + IntVector intVector = (IntVector) fieldVector; + for (int i = 0; i < batchRows; i++) { + int row = getRowNumber(startIndex, i, pickedInColumn); + if (columnVector.isNullAt(row)) { + intVector.setNull(i); + } else { + int value = ((IntColumnVector) columnVector).getInt(row); + intVector.setSafe(i, value); + } + } + } + + @Override + protected void doWrite(int rowIndex, DataGetters getters, int pos) { + ((IntVector) fieldVector).setSafe(rowIndex, getters.getInt(pos)); + } + } + + /** Writer for BIGINT. */ + public static class BigIntWriter extends ArrowFieldWriter { + + public BigIntWriter(FieldVector fieldVector) { + super(fieldVector); + } + + @Override + protected void doWrite( + ColumnVector columnVector, int[] pickedInColumn, int startIndex, int batchRows) { + BigIntVector bigIntVector = (BigIntVector) fieldVector; + for (int i = 0; i < batchRows; i++) { + int row = getRowNumber(startIndex, i, pickedInColumn); + if (columnVector.isNullAt(row)) { + bigIntVector.setNull(i); + } else { + long value = ((LongColumnVector) columnVector).getLong(row); + bigIntVector.setSafe(i, value); + } + } + } + + @Override + protected void doWrite(int rowIndex, DataGetters getters, int pos) { + ((BigIntVector) fieldVector).setSafe(rowIndex, getters.getLong(pos)); + } + } + + /** Writer for FLOAT. */ + public static class FloatWriter extends ArrowFieldWriter { + + public FloatWriter(FieldVector fieldVector) { + super(fieldVector); + } + + @Override + protected void doWrite( + ColumnVector columnVector, + @Nullable int[] pickedInColumn, + int startIndex, + int batchRows) { + Float4Vector float4Vector = (Float4Vector) fieldVector; + for (int i = 0; i < batchRows; i++) { + int row = getRowNumber(startIndex, i, pickedInColumn); + if (columnVector.isNullAt(row)) { + float4Vector.setNull(i); + } else { + float value = ((FloatColumnVector) columnVector).getFloat(row); + float4Vector.setSafe(i, value); + } + } + } + + @Override + protected void doWrite(int rowIndex, DataGetters getters, int pos) { + ((Float4Vector) fieldVector).setSafe(rowIndex, getters.getFloat(pos)); + } + } + + /** Writer for DOUBLE. */ + public static class DoubleWriter extends ArrowFieldWriter { + + public DoubleWriter(FieldVector fieldVector) { + super(fieldVector); + } + + @Override + protected void doWrite( + ColumnVector columnVector, + @Nullable int[] pickedInColumn, + int startIndex, + int batchRows) { + Float8Vector float8Vector = (Float8Vector) fieldVector; + for (int i = 0; i < batchRows; i++) { + int row = getRowNumber(startIndex, i, pickedInColumn); + if (columnVector.isNullAt(row)) { + float8Vector.setNull(i); + } else { + double value = ((DoubleColumnVector) columnVector).getDouble(row); + float8Vector.setSafe(i, value); + } + } + } + + @Override + protected void doWrite(int rowIndex, DataGetters getters, int pos) { + ((Float8Vector) fieldVector).setSafe(rowIndex, getters.getDouble(pos)); + } + } + + /** Writer for DATE. */ + public static class DateWriter extends ArrowFieldWriter { + + public DateWriter(FieldVector fieldVector) { + super(fieldVector); + } + + @Override + protected void doWrite( + ColumnVector columnVector, + @Nullable int[] pickedInColumn, + int startIndex, + int batchRows) { + DateDayVector dateDayVector = (DateDayVector) fieldVector; + for (int i = 0; i < batchRows; i++) { + int row = getRowNumber(startIndex, i, pickedInColumn); + if (columnVector.isNullAt(row)) { + dateDayVector.setNull(i); + } else { + int value = ((IntColumnVector) columnVector).getInt(row); + dateDayVector.setSafe(i, value); + } + } + } + + @Override + protected void doWrite(int rowIndex, DataGetters getters, int pos) { + ((DateDayVector) fieldVector).setSafe(rowIndex, getters.getInt(pos)); + } + } + + /** Writer for TIME. */ + public static class TimeWriter extends ArrowFieldWriter { + + public TimeWriter(FieldVector fieldVector) { + super(fieldVector); + } + + @Override + protected void doWrite( + ColumnVector columnVector, + @Nullable int[] pickedInColumn, + int startIndex, + int batchRows) { + TimeMilliVector timeMilliVector = (TimeMilliVector) fieldVector; + for (int i = 0; i < batchRows; i++) { + int row = getRowNumber(startIndex, i, pickedInColumn); + if (columnVector.isNullAt(row)) { + timeMilliVector.setNull(i); + } else { + int value = ((IntColumnVector) columnVector).getInt(i); + timeMilliVector.setSafe(i, value); + } + } + } + + @Override + protected void doWrite(int rowIndex, DataGetters getters, int pos) { + ((TimeMilliVector) fieldVector).setSafe(rowIndex, getters.getInt(pos)); + } + } + + /** Writer for TIMESTAMP & TIMESTAMP_LTZ. */ + public static class TimestampWriter extends ArrowFieldWriter { + + private final int precision; + @Nullable private final ZoneId castZoneId; + + public TimestampWriter( + FieldVector fieldVector, int precision, @Nullable ZoneId castZoneId) { + super(fieldVector); + this.precision = precision; + this.castZoneId = castZoneId; + } + + @Override + protected void doWrite( + ColumnVector columnVector, + @Nullable int[] pickedInColumn, + int startIndex, + int batchRows) { + TimeStampNanoVector timeStampNanoVector = (TimeStampNanoVector) fieldVector; + for (int i = 0; i < batchRows; i++) { + int row = getRowNumber(startIndex, i, pickedInColumn); + if (columnVector.isNullAt(row)) { + timeStampNanoVector.setNull(i); + } else { + Timestamp timestamp = + ((TimestampColumnVector) columnVector).getTimestamp(row, precision); + long value = timestampToEpochNano(timestamp); + timeStampNanoVector.setSafe(i, value); + } + } + } + + @Override + protected void doWrite(int rowIndex, DataGetters getters, int pos) { + TimeStampNanoVector timeStampNanoVector = (TimeStampNanoVector) fieldVector; + Timestamp timestamp = getters.getTimestamp(pos, precision); + long value = timestampToEpochNano(timestamp); + timeStampNanoVector.setSafe(rowIndex, value); + } + + private long timestampToEpochNano(Timestamp timestamp) { + if (castZoneId != null) { + Instant instant = timestamp.toLocalDateTime().atZone(castZoneId).toInstant(); + return instant.getEpochSecond() * 1_000_000_000 + instant.getNano(); + } else { + return timestamp.getMillisecond() * 1_000_000 + timestamp.getNanoOfMillisecond(); + } + } + } + + /** Writer for ARRAY. */ + public static class ArrayWriter extends ArrowFieldWriter { + + private final ArrowFieldWriter elementWriter; + + public ArrayWriter(FieldVector fieldVector, ArrowFieldWriter elementWriter) { + super(fieldVector); + this.elementWriter = elementWriter; + } + + @Override + protected void doWrite( + ColumnVector columnVector, + @Nullable int[] pickedInColumn, + int startIndex, + int batchRows) { + ArrayColumnVector arrayColumnVector = (ArrayColumnVector) columnVector; + + int lenSize; + if (pickedInColumn == null) { + lenSize = startIndex + batchRows; + } else { + lenSize = pickedInColumn[startIndex + batchRows - 1] + 1; + } + + // length for arrays in [0, startIndex + batchRows) + // TODO: reuse this + int[] lengths = new int[lenSize]; + for (int i = 0; i < lenSize; i++) { + if (arrayColumnVector.isNullAt(i)) { + // null values don't occupy space + lengths[i] = 0; + } else { + int size = arrayColumnVector.getArray(i).size(); + lengths[i] = size; + } + } + + ArrayChildWriteInfo arrayChildWriteInfo = + getArrayChildWriteInfo(pickedInColumn, startIndex, lengths); + elementWriter.write( + arrayColumnVector.getColumnVector(), + arrayChildWriteInfo.pickedInColumn, + arrayChildWriteInfo.startIndex, + arrayChildWriteInfo.batchRows); + + // set ListVector + ListVector listVector = (ListVector) fieldVector; + for (int i = 0; i < batchRows; i++) { + int row = getRowNumber(startIndex, i, pickedInColumn); + if (arrayColumnVector.isNullAt(row)) { + listVector.setNull(i); + } else { + listVector.startNewValue(i); + listVector.endValue(i, lengths[row]); + } + } + } + + @Override + protected void doWrite(int rowIndex, DataGetters getters, int pos) { + InternalArray array = getters.getArray(pos); + ListVector listVector = (ListVector) fieldVector; + FieldVector dataVector = listVector.getDataVector(); + listVector.startNewValue(rowIndex); + int offset = dataVector.getValueCount(); + for (int arrIndex = 0; arrIndex < array.size(); arrIndex++) { + int fieldIndex = offset + arrIndex; + elementWriter.write(fieldIndex, array, arrIndex); + } + listVector.endValue(rowIndex, array.size()); + } + } + + /** Writer for MAP. */ + public static class MapWriter extends ArrowFieldWriter { + + private final ArrowFieldWriter keyWriter; + private final ArrowFieldWriter valueWriter; + + public MapWriter( + FieldVector fieldVector, ArrowFieldWriter keyWriter, ArrowFieldWriter valueWriter) { + super(fieldVector); + this.keyWriter = keyWriter; + this.valueWriter = valueWriter; + } + + @Override + protected void doWrite( + ColumnVector columnVector, + @Nullable int[] pickedInColumn, + int startIndex, + int batchRows) { + MapColumnVector mapColumnVector = (MapColumnVector) columnVector; + + int lenSize; + if (pickedInColumn == null) { + lenSize = startIndex + batchRows; + } else { + lenSize = pickedInColumn[startIndex + batchRows - 1] + 1; + } + + // length for arrays in [0, startIndex + batchRows) + // TODO: reuse this + int[] lengths = new int[lenSize]; + for (int i = 0; i < lenSize; i++) { + if (mapColumnVector.isNullAt(i)) { + // null values don't occupy space + lengths[i] = 0; + } else { + int size = mapColumnVector.getMap(i).size(); + lengths[i] = size; + } + } + + ArrayChildWriteInfo arrayChildWriteInfo = + getArrayChildWriteInfo(pickedInColumn, startIndex, lengths); + keyWriter.write( + mapColumnVector.getKeyColumnVector(), + arrayChildWriteInfo.pickedInColumn, + arrayChildWriteInfo.startIndex, + arrayChildWriteInfo.batchRows); + valueWriter.write( + mapColumnVector.getValueColumnVector(), + arrayChildWriteInfo.pickedInColumn, + arrayChildWriteInfo.startIndex, + arrayChildWriteInfo.batchRows); + + // set inner struct and map + MapVector mapVector = (MapVector) fieldVector; + StructVector innerStructVector = (StructVector) mapVector.getDataVector(); + for (int i = 0; i < arrayChildWriteInfo.batchRows; i++) { + innerStructVector.setIndexDefined(i); + } + + ListVector listVector = (ListVector) fieldVector; + for (int i = 0; i < batchRows; i++) { + int row = getRowNumber(startIndex, i, pickedInColumn); + if (mapColumnVector.isNullAt(row)) { + listVector.setNull(i); + } else { + listVector.startNewValue(i); + listVector.endValue(i, lengths[row]); + } + } + } + + @Override + protected void doWrite(int rowIndex, DataGetters getters, int pos) { + InternalMap map = getters.getMap(pos); + InternalArray keyArray = map.keyArray(); + InternalArray valueArray = map.valueArray(); + MapVector mapVector = (MapVector) fieldVector; + StructVector structVector = (StructVector) mapVector.getDataVector(); + + mapVector.startNewValue(rowIndex); + int offset = structVector.getValueCount(); + for (int mapIndex = 0; mapIndex < map.size(); mapIndex++) { + int fieldIndex = offset + mapIndex; + keyWriter.write(fieldIndex, keyArray, mapIndex); + valueWriter.write(fieldIndex, valueArray, mapIndex); + structVector.setIndexDefined(fieldIndex); + } + mapVector.endValue(rowIndex, map.size()); + } + } + + private static class ArrayChildWriteInfo { + @Nullable final int[] pickedInColumn; + final int startIndex; + final int batchRows; + + ArrayChildWriteInfo(@Nullable int[] pickedInColumn, int startIndex, int batchRows) { + this.pickedInColumn = pickedInColumn; + this.startIndex = startIndex; + this.batchRows = batchRows; + } + } + + private static ArrayChildWriteInfo getArrayChildWriteInfo( + @Nullable int[] pickedInParentColumn, int parentStartIndex, int[] parentLengths) { + return pickedInParentColumn == null + ? getArrayChildWriteInfoWithoutDelete(parentStartIndex, parentLengths) + : getArrayChildWriteInfoWithDelete( + pickedInParentColumn, parentStartIndex, parentLengths); + } + + private static ArrayChildWriteInfo getArrayChildWriteInfoWithoutDelete( + int parentStartIndex, int[] parentLengths) { + // the first element index which is to be written + int firstElementIndex = 0; + // batchRows of child column vector + int childBatchRows = 0; + for (int i = 0; i < parentLengths.length; i++) { + if (i < parentStartIndex) { + firstElementIndex += parentLengths[i]; + } else { + childBatchRows += parentLengths[i]; + } + } + return new ArrayChildWriteInfo(null, firstElementIndex, childBatchRows); + } + + private static ArrayChildWriteInfo getArrayChildWriteInfoWithDelete( + int[] pickedInParentColumn, int parentStartIndex, int[] parentLengths) { + // the first element index which is to be written + int firstElementIndex = 0; + // objects to calculate child pickedInColumn + IntArrayList childPicked = new IntArrayList(1024); + int offset = 0; + int currentParentPickedIndex = parentStartIndex; + for (int i = 0; i < parentLengths.length; i++) { + if (i < pickedInParentColumn[parentStartIndex]) { + firstElementIndex += parentLengths[i]; + offset = firstElementIndex; + } else { + if (i == pickedInParentColumn[currentParentPickedIndex]) { + for (int pick = 0; pick < parentLengths[i]; pick++) { + childPicked.add(pick + offset); + } + currentParentPickedIndex += 1; + } + offset += parentLengths[i]; + } + } + return new ArrayChildWriteInfo(childPicked.toArray(), 0, childPicked.size()); + } + + /** Writer for ROW. */ + public static class RowWriter extends ArrowFieldWriter { + + private final ArrowFieldWriter[] fieldWriters; + + public RowWriter(FieldVector fieldVector, ArrowFieldWriter[] fieldWriters) { + super(fieldVector); + this.fieldWriters = fieldWriters; + } + + @Override + protected void doWrite( + ColumnVector columnVector, + @Nullable int[] pickedInColumn, + int startIndex, + int batchRows) { + boolean[] isNull = new boolean[batchRows]; + boolean allNull = true; + for (int i = 0; i < batchRows; i++) { + int row = getRowNumber(startIndex, i, pickedInColumn); + if (columnVector.isNullAt(row)) { + isNull[i] = true; + } else { + allNull = false; + } + } + + RowColumnVector rowColumnVector = (RowColumnVector) columnVector; + VectorizedColumnBatch batch = rowColumnVector.getBatch(); + int nestedBatchRows = allNull ? 0 : batchRows; + for (int i = 0; i < fieldWriters.length; i++) { + fieldWriters[i].write( + batch.columns[i], pickedInColumn, startIndex, nestedBatchRows); + } + + StructVector structVector = (StructVector) fieldVector; + for (int i = 0; i < batchRows; i++) { + if (isNull[i]) { + structVector.setNull(i); + } else { + structVector.setIndexDefined(i); + } + } + } + + @Override + protected void doWrite(int rowIndex, DataGetters getters, int pos) { + int fieldCount = fieldWriters.length; + InternalRow row = getters.getRow(pos, fieldCount); + StructVector structVector = (StructVector) fieldVector; + for (int i = 0; i < fieldWriters.length; i++) { + fieldWriters[i].write(rowIndex, row, i); + } + structVector.setIndexDefined(rowIndex); + } + } +} diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowRowWriter.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowRowWriter.java new file mode 100644 index 000000000000..920b32270824 --- /dev/null +++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowRowWriter.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.arrow; + +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.reader.RecordReader; + +import org.apache.arrow.vector.VectorSchemaRoot; + +import java.io.IOException; +import java.io.UncheckedIOException; + +/** To convert iterator row by row. */ +public class ArrowRowWriter extends ArrowWriter { + + private InternalRow currentRow; + + public ArrowRowWriter(VectorSchemaRoot root, ArrowFieldWriter[] fieldWriters) { + super(root, fieldWriters); + } + + @Override + public void doWrite(int maxBatchRows) { + int rowIndex = 0; + while (currentRow != null && rowIndex < maxBatchRows) { + for (int i = 0; i < fieldWriters.length; i++) { + fieldWriters[i].write(rowIndex, currentRow, i); + } + nextRow(); + rowIndex++; + } + root.setRowCount(rowIndex); + + if (currentRow == null) { + releaseIterator(); + } + } + + public void reset(RecordReader.RecordIterator iterator) { + this.iterator = iterator; + nextRow(); + } + + private void nextRow() { + try { + currentRow = iterator.next(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } +} diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java new file mode 100644 index 000000000000..a9da0f782bc1 --- /dev/null +++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.arrow; + +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.MapType; +import org.apache.paimon.types.RowType; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.complex.ListVector; +import org.apache.arrow.vector.complex.MapVector; +import org.apache.arrow.vector.types.Types; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +/** Utilities for creating Arrow objects. */ +public class ArrowUtils { + + public static VectorSchemaRoot createVectorSchemaRoot( + RowType rowType, BufferAllocator allocator) { + return createVectorSchemaRoot(rowType, allocator, true); + } + + public static VectorSchemaRoot createVectorSchemaRoot( + RowType rowType, BufferAllocator allocator, boolean allowUpperCase) { + List fields = + rowType.getFields().stream() + .map( + f -> + toArrowField( + allowUpperCase ? f.name() : f.name().toLowerCase(), + f.type())) + .collect(Collectors.toList()); + return VectorSchemaRoot.create(new Schema(fields), allocator); + } + + private static Field toArrowField(String fieldName, DataType dataType) { + FieldType fieldType = dataType.accept(ToFieldTypeVisitor.INSTANCE); + List children = null; + if (dataType instanceof ArrayType) { + children = + Collections.singletonList( + toArrowField( + ListVector.DATA_VECTOR_NAME, + ((ArrayType) dataType).getElementType())); + } else if (dataType instanceof MapType) { + MapType mapType = (MapType) dataType; + children = + Collections.singletonList( + new Field( + MapVector.DATA_VECTOR_NAME, + // data vector, key vector and value vector CANNOT be null + new FieldType(false, Types.MinorType.STRUCT.getType(), null), + Arrays.asList( + toArrowField( + MapVector.KEY_NAME, + mapType.getKeyType().notNull()), + toArrowField( + MapVector.VALUE_NAME, + mapType.getValueType().notNull())))); + } else if (dataType instanceof RowType) { + RowType rowType = (RowType) dataType; + children = + rowType.getFields().stream() + .map(f -> toArrowField(f.name(), f.type())) + .collect(Collectors.toList()); + } + return new Field(fieldName, fieldType, children); + } +} diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowWriter.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowWriter.java new file mode 100644 index 000000000000..dc52cc5ba3c9 --- /dev/null +++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowWriter.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.arrow; + +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.reader.RecordReader; + +import org.apache.arrow.vector.VectorSchemaRoot; + +import javax.annotation.Nullable; + +/** + * A reusable writer to convert Paimon data in {@link RecordReader.RecordIterator} into Arrow + * format. + */ +public abstract class ArrowWriter { + + // reusable + protected final VectorSchemaRoot root; + protected final ArrowFieldWriter[] fieldWriters; + + protected RecordReader.RecordIterator iterator; + + ArrowWriter(VectorSchemaRoot root, ArrowFieldWriter[] fieldWriters) { + this.root = root; + this.fieldWriters = fieldWriters; + } + + /** + * Write and get at most {@code maxBatchRows} data. Return null when finishing writing current + * iterator. + * + *

NOTE: the returned value will be reused, and it's lifecycle is managed by this writer. + */ + @Nullable + public VectorSchemaRoot next(int maxBatchRows) { + if (iterator == null) { + return null; + } + + for (ArrowFieldWriter fieldWriter : fieldWriters) { + fieldWriter.reset(); + } + doWrite(maxBatchRows); + return root; + } + + protected abstract void doWrite(int maxBatchRows); + + public void close() { + root.close(); + } + + protected void releaseIterator() { + iterator.releaseBatch(); + iterator = null; + } +} diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ToFieldTypeVisitor.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ToFieldTypeVisitor.java new file mode 100644 index 000000000000..a6f303378987 --- /dev/null +++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ToFieldTypeVisitor.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.arrow; + +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.BinaryType; +import org.apache.paimon.types.BooleanType; +import org.apache.paimon.types.CharType; +import org.apache.paimon.types.DataTypeVisitor; +import org.apache.paimon.types.DateType; +import org.apache.paimon.types.DecimalType; +import org.apache.paimon.types.DoubleType; +import org.apache.paimon.types.FloatType; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.LocalZonedTimestampType; +import org.apache.paimon.types.MapType; +import org.apache.paimon.types.MultisetType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.types.SmallIntType; +import org.apache.paimon.types.TimeType; +import org.apache.paimon.types.TimestampType; +import org.apache.paimon.types.TinyIntType; +import org.apache.paimon.types.VarBinaryType; +import org.apache.paimon.types.VarCharType; + +import org.apache.arrow.vector.types.Types; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.FieldType; + +/** Convert Paimon type to {@link FieldType}. */ +public class ToFieldTypeVisitor implements DataTypeVisitor { + + public static final ToFieldTypeVisitor INSTANCE = new ToFieldTypeVisitor(); + + @Override + public FieldType visit(CharType charType) { + return new FieldType(charType.isNullable(), Types.MinorType.VARCHAR.getType(), null); + } + + @Override + public FieldType visit(VarCharType varCharType) { + return new FieldType(varCharType.isNullable(), Types.MinorType.VARCHAR.getType(), null); + } + + @Override + public FieldType visit(BooleanType booleanType) { + return new FieldType(booleanType.isNullable(), Types.MinorType.BIT.getType(), null); + } + + @Override + public FieldType visit(BinaryType binaryType) { + return new FieldType(binaryType.isNullable(), Types.MinorType.VARBINARY.getType(), null); + } + + @Override + public FieldType visit(VarBinaryType varBinaryType) { + return new FieldType(varBinaryType.isNullable(), Types.MinorType.VARBINARY.getType(), null); + } + + @Override + public FieldType visit(DecimalType decimalType) { + return new FieldType( + decimalType.isNullable(), + new ArrowType.Decimal(decimalType.getPrecision(), decimalType.getScale(), 128), + null); + } + + @Override + public FieldType visit(TinyIntType tinyIntType) { + return new FieldType(tinyIntType.isNullable(), Types.MinorType.TINYINT.getType(), null); + } + + @Override + public FieldType visit(SmallIntType smallIntType) { + return new FieldType(smallIntType.isNullable(), Types.MinorType.SMALLINT.getType(), null); + } + + @Override + public FieldType visit(IntType intType) { + return new FieldType(intType.isNullable(), Types.MinorType.INT.getType(), null); + } + + @Override + public FieldType visit(BigIntType bigIntType) { + return new FieldType(bigIntType.isNullable(), Types.MinorType.BIGINT.getType(), null); + } + + @Override + public FieldType visit(FloatType floatType) { + return new FieldType(floatType.isNullable(), Types.MinorType.FLOAT4.getType(), null); + } + + @Override + public FieldType visit(DoubleType doubleType) { + return new FieldType(doubleType.isNullable(), Types.MinorType.FLOAT8.getType(), null); + } + + @Override + public FieldType visit(DateType dateType) { + return new FieldType(dateType.isNullable(), Types.MinorType.DATEDAY.getType(), null); + } + + @Override + public FieldType visit(TimeType timeType) { + return new FieldType(timeType.isNullable(), Types.MinorType.TIMEMILLI.getType(), null); + } + + @Override + public FieldType visit(TimestampType timestampType) { + return new FieldType( + timestampType.isNullable(), Types.MinorType.TIMESTAMPNANO.getType(), null); + } + + @Override + public FieldType visit(LocalZonedTimestampType localZonedTimestampType) { + return new FieldType( + localZonedTimestampType.isNullable(), + Types.MinorType.TIMESTAMPNANO.getType(), + null); + } + + @Override + public FieldType visit(ArrayType arrayType) { + return new FieldType(arrayType.isNullable(), Types.MinorType.LIST.getType(), null); + } + + @Override + public FieldType visit(MultisetType multisetType) { + throw new UnsupportedOperationException("Doesn't support MultisetType."); + } + + @Override + public FieldType visit(MapType mapType) { + return new FieldType(mapType.isNullable(), new ArrowType.Map(false), null); + } + + @Override + public FieldType visit(RowType rowType) { + return new FieldType(rowType.isNullable(), Types.MinorType.STRUCT.getType(), null); + } +} diff --git a/pom.xml b/pom.xml index d7d9c5430c0f..6213d059d571 100644 --- a/pom.xml +++ b/pom.xml @@ -67,6 +67,7 @@ under the License. paimon-spark paimon-service paimon-test-utils + paimon-arrow tools/ci/paimon-ci-tools From c61054ba1f0ce3517b325204a94fb7dd39a9ec1e Mon Sep 17 00:00:00 2001 From: yuzelin Date: Thu, 15 Aug 2024 18:17:40 +0800 Subject: [PATCH 2/6] add test --- paimon-arrow/pom.xml | 47 + .../org/apache/paimon/arrow/ArrowUtils.java | 18 + .../arrow/{ => writer}/ArrowBatchWriter.java | 2 +- .../arrow/{ => writer}/ArrowFieldWriter.java | 2 +- .../{ => writer}/ArrowFieldWriterFactory.java | 2 +- .../ArrowFieldWriterFactoryVisitor.java | 180 +++ .../arrow/{ => writer}/ArrowFieldWriters.java | 2 +- .../arrow/{ => writer}/ArrowRowWriter.java | 2 +- .../arrow/{ => writer}/ArrowWriter.java | 2 +- .../src/test/java/ArrowWriterTest.java | 1015 +++++++++++++++++ .../src/test/resources/log4j2-test.properties | 28 + 11 files changed, 1294 insertions(+), 6 deletions(-) rename paimon-arrow/src/main/java/org/apache/paimon/arrow/{ => writer}/ArrowBatchWriter.java (98%) rename paimon-arrow/src/main/java/org/apache/paimon/arrow/{ => writer}/ArrowFieldWriter.java (98%) rename paimon-arrow/src/main/java/org/apache/paimon/arrow/{ => writer}/ArrowFieldWriterFactory.java (96%) create mode 100644 paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriterFactoryVisitor.java rename paimon-arrow/src/main/java/org/apache/paimon/arrow/{ => writer}/ArrowFieldWriters.java (99%) rename paimon-arrow/src/main/java/org/apache/paimon/arrow/{ => writer}/ArrowRowWriter.java (98%) rename paimon-arrow/src/main/java/org/apache/paimon/arrow/{ => writer}/ArrowWriter.java (98%) create mode 100644 paimon-arrow/src/test/java/ArrowWriterTest.java create mode 100644 paimon-arrow/src/test/resources/log4j2-test.properties diff --git a/paimon-arrow/pom.xml b/paimon-arrow/pom.xml index 2f5c12ade2b4..be2fbeb5631c 100644 --- a/paimon-arrow/pom.xml +++ b/paimon-arrow/pom.xml @@ -59,6 +59,53 @@ under the License. provided + + + + org.apache.paimon + paimon-format + ${project.version} + test + + + + org.apache.paimon + paimon-test-utils + ${project.version} + test + + + + org.apache.arrow + arrow-memory-unsafe + ${arrow.version} + test + + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + test + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + + + + org.apache.hadoop + hadoop-hdfs-client + ${hadoop.version} + test + + diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java index a9da0f782bc1..009dca8c63e4 100644 --- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java +++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java @@ -18,12 +18,15 @@ package org.apache.paimon.arrow; +import org.apache.paimon.arrow.writer.ArrowFieldWriter; +import org.apache.paimon.arrow.writer.ArrowFieldWriterFactoryVisitor; import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.DataType; import org.apache.paimon.types.MapType; import org.apache.paimon.types.RowType; import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.complex.MapVector; @@ -91,4 +94,19 @@ private static Field toArrowField(String fieldName, DataType dataType) { } return new Field(fieldName, fieldType, children); } + + public static ArrowFieldWriter[] createArrowFieldWriters( + VectorSchemaRoot vectorSchemaRoot, RowType rowType) { + ArrowFieldWriter[] fieldWriters = new ArrowFieldWriter[rowType.getFieldCount()]; + List vectors = vectorSchemaRoot.getFieldVectors(); + + for (int i = 0; i < rowType.getFieldCount(); i++) { + fieldWriters[i] = + rowType.getTypeAt(i) + .accept(ArrowFieldWriterFactoryVisitor.INSTANCE) + .create(vectors.get(i)); + } + + return fieldWriters; + } } diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowBatchWriter.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowBatchWriter.java similarity index 98% rename from paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowBatchWriter.java rename to paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowBatchWriter.java index 8bb963929b2d..3e765f618529 100644 --- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowBatchWriter.java +++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowBatchWriter.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.paimon.arrow; +package org.apache.paimon.arrow.writer; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.columnar.ColumnVector; diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldWriter.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriter.java similarity index 98% rename from paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldWriter.java rename to paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriter.java index 5b7d32c4375a..f7b766d61c5e 100644 --- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldWriter.java +++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriter.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.paimon.arrow; +package org.apache.paimon.arrow.writer; import org.apache.paimon.data.DataGetters; import org.apache.paimon.data.columnar.ColumnVector; diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldWriterFactory.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriterFactory.java similarity index 96% rename from paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldWriterFactory.java rename to paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriterFactory.java index 6b1672cf18e5..651a8d8590f2 100644 --- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldWriterFactory.java +++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriterFactory.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.paimon.arrow; +package org.apache.paimon.arrow.writer; import org.apache.arrow.vector.FieldVector; diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriterFactoryVisitor.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriterFactoryVisitor.java new file mode 100644 index 000000000000..9c1a55ec33ea --- /dev/null +++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriterFactoryVisitor.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.arrow.writer; + +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.BinaryType; +import org.apache.paimon.types.BooleanType; +import org.apache.paimon.types.CharType; +import org.apache.paimon.types.DataTypeVisitor; +import org.apache.paimon.types.DateType; +import org.apache.paimon.types.DecimalType; +import org.apache.paimon.types.DoubleType; +import org.apache.paimon.types.FloatType; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.LocalZonedTimestampType; +import org.apache.paimon.types.MapType; +import org.apache.paimon.types.MultisetType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.types.SmallIntType; +import org.apache.paimon.types.TimeType; +import org.apache.paimon.types.TimestampType; +import org.apache.paimon.types.TinyIntType; +import org.apache.paimon.types.VarBinaryType; +import org.apache.paimon.types.VarCharType; + +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.complex.ListVector; +import org.apache.arrow.vector.complex.MapVector; + +import java.util.List; + +/** Visitor to create {@link ArrowFieldWriterFactory}. */ +public class ArrowFieldWriterFactoryVisitor implements DataTypeVisitor { + + public static final ArrowFieldWriterFactoryVisitor INSTANCE = + new ArrowFieldWriterFactoryVisitor(); + + @Override + public ArrowFieldWriterFactory visit(CharType charType) { + return ArrowFieldWriters.StringWriter::new; + } + + @Override + public ArrowFieldWriterFactory visit(VarCharType varCharType) { + return ArrowFieldWriters.StringWriter::new; + } + + @Override + public ArrowFieldWriterFactory visit(BooleanType booleanType) { + return ArrowFieldWriters.BooleanWriter::new; + } + + @Override + public ArrowFieldWriterFactory visit(BinaryType binaryType) { + return ArrowFieldWriters.BinaryWriter::new; + } + + @Override + public ArrowFieldWriterFactory visit(VarBinaryType varBinaryType) { + return ArrowFieldWriters.BinaryWriter::new; + } + + @Override + public ArrowFieldWriterFactory visit(DecimalType decimalType) { + return fieldVector -> + new ArrowFieldWriters.DecimalWriter( + fieldVector, decimalType.getPrecision(), decimalType.getScale()); + } + + @Override + public ArrowFieldWriterFactory visit(TinyIntType tinyIntType) { + return ArrowFieldWriters.TinyIntWriter::new; + } + + @Override + public ArrowFieldWriterFactory visit(SmallIntType smallIntType) { + return ArrowFieldWriters.SmallIntWriter::new; + } + + @Override + public ArrowFieldWriterFactory visit(IntType intType) { + return ArrowFieldWriters.IntWriter::new; + } + + @Override + public ArrowFieldWriterFactory visit(BigIntType bigIntType) { + return ArrowFieldWriters.BigIntWriter::new; + } + + @Override + public ArrowFieldWriterFactory visit(FloatType floatType) { + return ArrowFieldWriters.FloatWriter::new; + } + + @Override + public ArrowFieldWriterFactory visit(DoubleType doubleType) { + return ArrowFieldWriters.DoubleWriter::new; + } + + @Override + public ArrowFieldWriterFactory visit(DateType dateType) { + return ArrowFieldWriters.DateWriter::new; + } + + @Override + public ArrowFieldWriterFactory visit(TimeType timeType) { + return ArrowFieldWriters.TimeWriter::new; + } + + @Override + public ArrowFieldWriterFactory visit(TimestampType timestampType) { + return fieldVector -> + new ArrowFieldWriters.TimestampWriter( + fieldVector, timestampType.getPrecision(), null); + } + + @Override + public ArrowFieldWriterFactory visit(LocalZonedTimestampType localZonedTimestampType) { + return fieldVector -> + new ArrowFieldWriters.TimestampWriter( + fieldVector, localZonedTimestampType.getPrecision(), null); + } + + @Override + public ArrowFieldWriterFactory visit(ArrayType arrayType) { + ArrowFieldWriterFactory elementWriterFactory = arrayType.getElementType().accept(this); + return fieldVector -> + new ArrowFieldWriters.ArrayWriter( + fieldVector, + elementWriterFactory.create(((ListVector) fieldVector).getDataVector())); + } + + @Override + public ArrowFieldWriterFactory visit(MultisetType multisetType) { + throw new UnsupportedOperationException("Doesn't support MultisetType."); + } + + @Override + public ArrowFieldWriterFactory visit(MapType mapType) { + ArrowFieldWriterFactory keyWriterFactory = mapType.getKeyType().accept(this); + ArrowFieldWriterFactory valueWriterFactory = mapType.getValueType().accept(this); + return fieldVector -> { + MapVector mapVector = (MapVector) fieldVector; + List keyValueVectors = mapVector.getDataVector().getChildrenFromFields(); + return new ArrowFieldWriters.MapWriter( + fieldVector, + keyWriterFactory.create(keyValueVectors.get(0)), + valueWriterFactory.create(keyValueVectors.get(1))); + }; + } + + @Override + public ArrowFieldWriterFactory visit(RowType rowType) { + return fieldVector -> { + List children = fieldVector.getChildrenFromFields(); + ArrowFieldWriter[] fieldWriters = new ArrowFieldWriter[children.size()]; + for (int i = 0; i < children.size(); i++) { + fieldWriters[i] = rowType.getTypeAt(i).accept(this).create(children.get(i)); + } + return new ArrowFieldWriters.RowWriter(fieldVector, fieldWriters); + }; + } +} diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldWriters.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriters.java similarity index 99% rename from paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldWriters.java rename to paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriters.java index 38de5a13eacc..100c9eb20d05 100644 --- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldWriters.java +++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriters.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.paimon.arrow; +package org.apache.paimon.arrow.writer; import org.apache.paimon.data.DataGetters; import org.apache.paimon.data.InternalArray; diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowRowWriter.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowRowWriter.java similarity index 98% rename from paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowRowWriter.java rename to paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowRowWriter.java index 920b32270824..34bec6847268 100644 --- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowRowWriter.java +++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowRowWriter.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.paimon.arrow; +package org.apache.paimon.arrow.writer; import org.apache.paimon.data.InternalRow; import org.apache.paimon.reader.RecordReader; diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowWriter.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowWriter.java similarity index 98% rename from paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowWriter.java rename to paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowWriter.java index dc52cc5ba3c9..c02c77a10af3 100644 --- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowWriter.java +++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowWriter.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.paimon.arrow; +package org.apache.paimon.arrow.writer; import org.apache.paimon.data.InternalRow; import org.apache.paimon.reader.RecordReader; diff --git a/paimon-arrow/src/test/java/ArrowWriterTest.java b/paimon-arrow/src/test/java/ArrowWriterTest.java new file mode 100644 index 000000000000..016f10fc800e --- /dev/null +++ b/paimon-arrow/src/test/java/ArrowWriterTest.java @@ -0,0 +1,1015 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.arrow.ArrowUtils; +import org.apache.paimon.arrow.writer.ArrowBatchWriter; +import org.apache.paimon.arrow.writer.ArrowFieldWriter; +import org.apache.paimon.arrow.writer.ArrowRowWriter; +import org.apache.paimon.arrow.writer.ArrowWriter; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.GenericArray; +import org.apache.paimon.data.GenericMap; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.deletionvectors.ApplyDeletionFileRecordIterator; +import org.apache.paimon.disk.IOManagerImpl; +import org.apache.paimon.fs.Path; +import org.apache.paimon.reader.FileRecordIterator; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.reader.VectorizedRecordIterator; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.StreamTableCommit; +import org.apache.paimon.table.sink.StreamTableWrite; +import org.apache.paimon.testutils.junit.parameterized.ParameterizedTestExtension; +import org.apache.paimon.testutils.junit.parameterized.Parameters; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowKind; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.DateTimeUtils; +import org.apache.paimon.utils.StringUtils; + +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.ValueVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.complex.ListVector; +import org.apache.arrow.vector.complex.MapVector; +import org.apache.arrow.vector.complex.StructVector; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.util.JsonStringArrayList; +import org.apache.arrow.vector.util.JsonStringHashMap; +import org.apache.arrow.vector.util.Text; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; + +import javax.annotation.Nullable; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assumptions.assumeThat; + +/** UT for {@link ArrowWriter}. */ +@ExtendWith(ParameterizedTestExtension.class) +public class ArrowWriterTest { + + private static final Random RND = ThreadLocalRandom.current(); + private @TempDir java.nio.file.Path tempDir; + private final String testMode; + private Catalog catalog; + + private static final boolean[] NULLABLE; + private static final RowType PRIMITIVE_TYPE; + + static { + int cnt = 18; + NULLABLE = new boolean[cnt]; + for (int i = 0; i < cnt; i++) { + NULLABLE[i] = RND.nextBoolean(); + } + + List dataFields = new ArrayList<>(); + dataFields.add(new DataField(0, "char", DataTypes.CHAR(10).copy(NULLABLE[0]))); + dataFields.add(new DataField(1, "varchar", DataTypes.VARCHAR(20).copy(NULLABLE[1]))); + dataFields.add(new DataField(2, "boolean", DataTypes.BOOLEAN().copy(NULLABLE[2]))); + dataFields.add(new DataField(3, "binary", DataTypes.BINARY(10).copy(NULLABLE[3]))); + dataFields.add(new DataField(4, "varbinary", DataTypes.VARBINARY(20).copy(NULLABLE[4]))); + dataFields.add(new DataField(5, "decimal1", DataTypes.DECIMAL(2, 2).copy(NULLABLE[5]))); + dataFields.add(new DataField(6, "decimal2", DataTypes.DECIMAL(38, 2).copy(NULLABLE[6]))); + dataFields.add(new DataField(7, "decimal3", DataTypes.DECIMAL(10, 1).copy(NULLABLE[7]))); + dataFields.add(new DataField(8, "tinyint", DataTypes.TINYINT().copy(NULLABLE[8]))); + dataFields.add(new DataField(9, "smallint", DataTypes.SMALLINT().copy(NULLABLE[9]))); + dataFields.add(new DataField(10, "int", DataTypes.INT().copy(NULLABLE[10]))); + dataFields.add(new DataField(11, "bigint", DataTypes.BIGINT().copy(NULLABLE[11]))); + dataFields.add(new DataField(12, "float", DataTypes.FLOAT().copy(NULLABLE[12]))); + dataFields.add(new DataField(13, "double", DataTypes.DOUBLE().copy(NULLABLE[13]))); + dataFields.add(new DataField(14, "date", DataTypes.DATE().copy(NULLABLE[14]))); + dataFields.add(new DataField(15, "timestamp3", DataTypes.TIMESTAMP(3).copy(NULLABLE[15]))); + dataFields.add(new DataField(16, "timestamp6", DataTypes.TIMESTAMP(6).copy(NULLABLE[16]))); + dataFields.add( + new DataField( + 17, + "timestampLZ9", + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(9).copy(NULLABLE[17]))); + PRIMITIVE_TYPE = new RowType(dataFields); + } + + public ArrowWriterTest(String testMode) { + this.testMode = testMode; + } + + @SuppressWarnings("unused") + @Parameters(name = "test-mode = {0}") + public static List testMode() { + return Arrays.asList("vectorized_without_dv", "per_row", "vectorized_with_dv"); + } + + private void testDv(boolean testDv) { + assumeThat(testMode.equals("vectorized_with_dv")).isEqualTo(testDv); + } + + @BeforeEach + public void reset() throws Exception { + catalog = + CatalogFactory.createCatalog( + CatalogContext.create(new Path(tempDir.toUri().toString()))); + catalog.createDatabase("default", false); + } + + private RecordReader.RecordIterator createPrimitiveIterator( + List expected, @Nullable int[] projection) throws Exception { + // create InternalRows + int numRows = RND.nextInt(5) + 5; + List rows = new ArrayList<>(numRows); + for (int i = 0; i < numRows; i++) { + Object[] randomRowValues = randomRowValues(NULLABLE); + expected.add(randomRowValues); + rows.add(GenericRow.of(randomRowValues)); + } + + return getRecordIterator(PRIMITIVE_TYPE, rows, projection); + } + + @TestTemplate + public void testReadEmpty() throws Exception { + testDv(false); + List expected = new ArrayList<>(); + RecordReader.RecordIterator iterator = + createPrimitiveIterator(expected, new int[0]); + testReadEmpty(iterator, expected.size()); + } + + @TestTemplate + public void testPrimitiveTypes() throws Exception { + testDv(false); + List expected = new ArrayList<>(); + RecordReader.RecordIterator iterator = createPrimitiveIterator(expected, null); + int numRows = expected.size(); + try (RootAllocator allocator = new RootAllocator()) { + VectorSchemaRoot vsr = ArrowUtils.createVectorSchemaRoot(PRIMITIVE_TYPE, allocator); + ArrowWriter arrowWriter = createArrowWriter(iterator, PRIMITIVE_TYPE, vsr); + arrowWriter.next(numRows); + assertThat(vsr.getRowCount()).isEqualTo(numRows); + + // validate field names + List actualNames = + vsr.getSchema().getFields().stream() + .map(Field::getName) + .collect(Collectors.toList()); + assertThat(actualNames).isEqualTo(PRIMITIVE_TYPE.getFieldNames()); + + // validate rows + List fieldVectors = vsr.getFieldVectors(); + for (int i = 0; i < numRows; i++) { + String expectedString = paimonObjectsToString(expected.get(i), PRIMITIVE_TYPE); + String actualString = arrowObjectsToString(fieldVectors, PRIMITIVE_TYPE, i); + assertThat(actualString).isEqualTo(expectedString); + } + + arrowWriter.close(); + } + } + + @TestTemplate + public void testArrayType() throws Exception { + testDv(false); + // build RowType + boolean nullable = RND.nextBoolean(); + RowType nestedArrayType = + RowType.builder() + .field("string_array", DataTypes.ARRAY(DataTypes.STRING()).copy(nullable)) + .build(); + + // create InternalRows + int numRows = RND.nextInt(5) + 5; + List> expectedStrings = new ArrayList<>(numRows); + List rows = new ArrayList<>(numRows); + for (int i = 0; i < numRows; i++) { + if (nullable && RND.nextBoolean()) { + expectedStrings.add(null); + rows.add(GenericRow.of((Object) null)); + continue; + } + + int currentSize = RND.nextInt(5); + List currentStringArray = + IntStream.range(0, currentSize) + .mapToObj(idx -> StringUtils.getRandomString(RND, 1, 10)) + .collect(Collectors.toList()); + GenericArray array = + new GenericArray( + currentStringArray.stream().map(BinaryString::fromString).toArray()); + rows.add(GenericRow.of(array)); + expectedStrings.add(currentStringArray); + } + + RecordReader.RecordIterator iterator = + getRecordIterator(nestedArrayType, rows); + try (RootAllocator allocator = new RootAllocator()) { + VectorSchemaRoot vsr = ArrowUtils.createVectorSchemaRoot(nestedArrayType, allocator); + ArrowWriter arrowWriter = createArrowWriter(iterator, nestedArrayType, vsr); + arrowWriter.next(numRows); + assertThat(vsr.getRowCount()).isEqualTo(numRows); + + List fieldVectors = vsr.getFieldVectors(); + assertThat(fieldVectors.size()).isEqualTo(1); + FieldVector fieldVector = fieldVectors.get(0); + assertThat(fieldVector.getName()).isEqualTo("string_array"); + for (int i = 0; i < numRows; i++) { + Object obj = fieldVector.getObject(i); + List expected = expectedStrings.get(i); + if (obj == null) { + assertThat(expected).isNull(); + } else { + JsonStringArrayList actual = (JsonStringArrayList) obj; + assertThat(actual.size()).isEqualTo(expected.size()); + for (int j = 0; j < actual.size(); j++) { + assertThat(actual.get(j).toString()).isEqualTo(expected.get(j)); + } + } + } + + arrowWriter.close(); + } + } + + @TestTemplate + public void testMapType() throws Exception { + testDv(false); + // build RowType + boolean nullable = RND.nextBoolean(); + RowType nestedMapType = + RowType.builder() + .field( + "map", + DataTypes.MAP(DataTypes.INT(), DataTypes.STRING()).copy(nullable)) + .build(); + + // create InternalRows + int numRows = RND.nextInt(5) + 5; + List> expectedMaps = new ArrayList<>(numRows); + List rows = new ArrayList<>(numRows); + for (int i = 0; i < numRows; i++) { + if (nullable && RND.nextBoolean()) { + expectedMaps.add(null); + rows.add(GenericRow.of((Object) null)); + continue; + } + + int currentSize = RND.nextInt(5); + Map map1 = new HashMap<>(currentSize); + Map map2 = new HashMap<>(currentSize); + for (int j = 0; j < currentSize; j++) { + String value = StringUtils.getRandomString(RND, 1, 10); + map1.put(j, value); + map2.put(j, BinaryString.fromString(value)); + } + rows.add(GenericRow.of(new GenericMap(map2))); + expectedMaps.add(map1); + } + + RecordReader.RecordIterator iterator = getRecordIterator(nestedMapType, rows); + try (RootAllocator allocator = new RootAllocator()) { + VectorSchemaRoot vsr = ArrowUtils.createVectorSchemaRoot(nestedMapType, allocator); + ArrowWriter arrowWriter = createArrowWriter(iterator, nestedMapType, vsr); + arrowWriter.next(numRows); + assertThat(vsr.getRowCount()).isEqualTo(numRows); + + List fieldVectors = vsr.getFieldVectors(); + assertThat(fieldVectors.size()).isEqualTo(1); + FieldVector fieldVector = fieldVectors.get(0); + assertThat(fieldVector.getName()).isEqualTo("map"); + for (int i = 0; i < numRows; i++) { + Object obj = fieldVector.getObject(i); + Map expected = expectedMaps.get(i); + if (obj == null) { + assertThat(expected).isNull(); + } else { + JsonStringArrayList> actual = + (JsonStringArrayList>) obj; + assertThat(actual.size()).isEqualTo(expected.size()); + for (JsonStringHashMap actualMap : actual) { + int key = (int) actualMap.get(MapVector.KEY_NAME); + String value = actualMap.get(MapVector.VALUE_NAME).toString(); + assertThat(expected.get(key)).isEqualTo(value); + } + } + } + + arrowWriter.close(); + } + } + + @TestTemplate + public void testMapRowType() throws Exception { + testDv(false); + // build RowType + RowType nestedMapRowType = + new RowType( + Collections.singletonList( + DataTypes.FIELD( + 2, + "map_row", + DataTypes.MAP( + DataTypes.INT(), + DataTypes.ROW(DataTypes.INT(), DataTypes.INT()))))); + + // create InternalRows + InternalRow row1 = GenericRow.of((Object) null); + InternalRow row2 = GenericRow.of(new GenericMap(new HashMap<>())); + + Map map3 = new HashMap<>(); + map3.put(1, GenericRow.of(1, 1)); + map3.put(2, GenericRow.of(2, null)); + map3.put(3, null); + InternalRow row3 = GenericRow.of(new GenericMap(map3)); + + RecordReader.RecordIterator iterator = + getRecordIterator(nestedMapRowType, Arrays.asList(row1, row2, row3)); + try (RootAllocator allocator = new RootAllocator()) { + VectorSchemaRoot vsr = ArrowUtils.createVectorSchemaRoot(nestedMapRowType, allocator); + ArrowWriter arrowWriter = createArrowWriter(iterator, nestedMapRowType, vsr); + arrowWriter.next(3); + assertThat(vsr.getRowCount()).isEqualTo(3); + + List fieldVectors = vsr.getFieldVectors(); + assertThat(fieldVectors.size()).isEqualTo(1); + FieldVector fieldVector = fieldVectors.get(0); + assertThat(fieldVector.getName()).isEqualTo("map_row"); + + assertThat(fieldVector.getObject(0)).isNull(); + assertThat(fieldVector.getObject(1).toString()).isEqualTo("[]"); + assertThat(fieldVector.getObject(2).toString()) + .isEqualTo( + "[{\"key\":1,\"value\":{\"f0\":1,\"f1\":1}},{\"key\":2,\"value\":{\"f0\":2}},{\"key\":3}]"); + + arrowWriter.close(); + } + } + + @TestTemplate + public void testRowType() throws Exception { + testDv(false); + testRowTypeImpl(false); + } + + @TestTemplate + public void testRowTypeWithAllNull() throws Exception { + testDv(false); + testRowTypeImpl(true); + } + + private void testRowTypeImpl(boolean allNull) throws Exception { + testDv(false); + // build RowType + boolean nullable = allNull || RND.nextBoolean(); + RowType nestedRowType = + new RowType( + Collections.singletonList( + DataTypes.FIELD(19, "row", PRIMITIVE_TYPE.copy(nullable)))); + + // create InternalRows + int numRows = RND.nextInt(5) + 5; + List expected = new ArrayList<>(numRows); + List rows = new ArrayList<>(numRows); + for (int i = 0; i < numRows; i++) { + if (allNull || (nullable && RND.nextBoolean())) { + expected.add(null); + rows.add(GenericRow.of((Object) null)); + continue; + } + Object[] randomRowValues = randomRowValues(NULLABLE); + expected.add(randomRowValues); + rows.add(GenericRow.of(GenericRow.of(randomRowValues))); + } + + RecordReader.RecordIterator iterator = getRecordIterator(nestedRowType, rows); + try (RootAllocator allocator = new RootAllocator()) { + VectorSchemaRoot vsr = ArrowUtils.createVectorSchemaRoot(nestedRowType, allocator); + ArrowWriter arrowWriter = createArrowWriter(iterator, nestedRowType, vsr); + arrowWriter.next(numRows); + assertThat(vsr.getRowCount()).isEqualTo(numRows); + + List fieldVectors = vsr.getFieldVectors(); + assertThat(fieldVectors.size()).isEqualTo(1); + StructVector structVector = (StructVector) fieldVectors.get(0); + assertThat(structVector.getName()).isEqualTo("row"); + List children = structVector.getChildrenFromFields(); + // validate field names + List actualNames = + children.stream().map(ValueVector::getName).collect(Collectors.toList()); + assertThat(actualNames).isEqualTo(PRIMITIVE_TYPE.getFieldNames()); + // validate nested rows + for (int i = 0; i < numRows; i++) { + if (structVector.isNull(i)) { + assertThat(expected.get(i)).isNull(); + } else { + String expectedString = paimonObjectsToString(expected.get(i), PRIMITIVE_TYPE); + String actualString = arrowObjectsToString(children, PRIMITIVE_TYPE, i); + assertThat(actualString).isEqualTo(expectedString); + } + } + + arrowWriter.close(); + } + } + + @TestTemplate + public void testSliceIntType() throws Exception { + testDv(false); + RowType rowType = RowType.builder().field("int", DataTypes.INT()).build(); + List rows = new ArrayList<>(8); + + for (int i = 0; i < 8; i++) { + rows.add(GenericRow.of(i)); + } + + RecordReader.RecordIterator iterator = getRecordIterator(rowType, rows); + try (RootAllocator allocator = new RootAllocator()) { + VectorSchemaRoot vsr = ArrowUtils.createVectorSchemaRoot(rowType, allocator); + ArrowWriter arrowWriter = createArrowWriter(iterator, rowType, vsr); + + // write 3 times + arrowWriter.next(3); + assertThat(vsr.getRowCount()).isEqualTo(3); + assertThat(vsr.getFieldVectors().get(0).toString()).isEqualTo("[0, 1, 2]"); + + arrowWriter.next(3); + assertThat(vsr.getRowCount()).isEqualTo(3); + assertThat(vsr.getFieldVectors().get(0).toString()).isEqualTo("[3, 4, 5]"); + + arrowWriter.next(3); + assertThat(vsr.getRowCount()).isEqualTo(2); + assertThat(vsr.getFieldVectors().get(0).toString()).isEqualTo("[6, 7]"); + + arrowWriter.close(); + } + } + + @TestTemplate + public void testDvWithSimpleRowType() throws Exception { + testDv(true); + boolean nullable = RND.nextBoolean(); + RowType rowType = + RowType.builder() + .field("pk", DataTypes.INT()) + .field("value", DataTypes.STRING().copy(nullable)) + .build(); + + int numRows = RND.nextInt(100) + 200; + List rows = new ArrayList<>(); + Map strings = new HashMap<>(); + for (int i = 0; i < numRows; i++) { + if (nullable && RND.nextBoolean()) { + rows.add(GenericRow.of(i, null)); + strings.put(i, null); + } else { + String value = StringUtils.getRandomString(RND, 50, 50); + strings.put(i, value); + rows.add(GenericRow.of(i, BinaryString.fromString(value))); + } + } + + Set deleted = getDeletedPks(numRows); + boolean readEmpty = RND.nextBoolean(); + int[] projection = readEmpty ? new int[0] : null; + RecordReader.RecordIterator iterator = + getApplyDeletionFileRecordIterator( + rowType, rows, deleted, Collections.singletonList("pk"), projection); + if (readEmpty) { + testReadEmpty(iterator, numRows - deleted.size()); + } else { + try (RootAllocator allocator = new RootAllocator()) { + Set expectedPks = getExpectedPks(numRows, deleted); + VectorSchemaRoot vsr = ArrowUtils.createVectorSchemaRoot(rowType, allocator); + ArrowWriter arrowWriter = createArrowWriter(iterator, rowType, vsr); + int expectedRowCount = rows.size() - deleted.size(); + int averageBatchRows = RND.nextInt(expectedRowCount) + 1; + int readRows = 0; + Set readPks = new HashSet<>(); + while (readRows < expectedRowCount) { + arrowWriter.next(averageBatchRows); + readRows += vsr.getRowCount(); + // validate current result + IntVector pkVector = (IntVector) vsr.getVector(0); + FieldVector valueVector = vsr.getVector(1); + for (int i = 0; i < vsr.getRowCount(); i++) { + int pk = pkVector.get(i); + assertThat(expectedPks).contains(pk); + String value = strings.get(pk); + if (value == null) { + assertThat(valueVector.isNull(i)).isTrue(); + } else { + assertThat(valueVector.getObject(i).toString()).isEqualTo(value); + } + readPks.add(pk); + } + } + assertThat(readRows).isEqualTo(expectedRowCount); + assertThat(readPks).isEqualTo(expectedPks); + arrowWriter.close(); + } + } + } + + @TestTemplate + public void testDvWithArrayType() throws Exception { + testDv(true); + // build RowType + boolean nullable = RND.nextBoolean(); + RowType nestedArrayType = + RowType.builder() + .field("pk", DataTypes.INT()) + .field("string_array", DataTypes.ARRAY(DataTypes.STRING()).copy(nullable)) + .build(); + + int numRows = RND.nextInt(100) + 200; + List rows = new ArrayList<>(); + Map> arrayStrings = new HashMap<>(); + for (int i = 0; i < numRows; i++) { + if (nullable && RND.nextBoolean()) { + rows.add(GenericRow.of(i, null)); + arrayStrings.put(i, null); + continue; + } + int currentSize = RND.nextInt(5); + List currentStrings = + IntStream.range(0, currentSize) + .mapToObj(idx -> StringUtils.getRandomString(RND, 50, 50)) + .collect(Collectors.toList()); + arrayStrings.put(i, currentStrings); + BinaryString[] binaryStrings = + currentStrings.stream() + .map(BinaryString::fromString) + .toArray(it -> new BinaryString[currentSize]); + rows.add(GenericRow.of(i, new GenericArray(binaryStrings))); + } + + Set deleted = getDeletedPks(numRows); + RecordReader.RecordIterator iterator = + getApplyDeletionFileRecordIterator( + nestedArrayType, rows, deleted, Collections.singletonList("pk"), null); + try (RootAllocator allocator = new RootAllocator()) { + Set expectedPks = getExpectedPks(numRows, deleted); + VectorSchemaRoot vsr = ArrowUtils.createVectorSchemaRoot(nestedArrayType, allocator); + ArrowWriter arrowWriter = createArrowWriter(iterator, nestedArrayType, vsr); + int expectedRowCount = rows.size() - deleted.size(); + int averageBatchRows = RND.nextInt(expectedRowCount) + 1; + int readRows = 0; + Set readPks = new HashSet<>(); + while ((vsr = arrowWriter.next(averageBatchRows)) != null) { + readRows += vsr.getRowCount(); + // validate current result + IntVector pkVector = (IntVector) vsr.getVector(0); + ListVector listVector = (ListVector) vsr.getVector(1); + for (int i = 0; i < vsr.getRowCount(); i++) { + int pk = pkVector.get(i); + assertThat(expectedPks).contains(pk); + List value = arrayStrings.get(pk); + if (value == null) { + assertThat(listVector.isNull(i)).isTrue(); + } else { + List objects = listVector.getObject(i); + assertThat(objects.size()).isEqualTo(value.size()); + for (int j = 0; j < objects.size(); j++) { + assertThat(objects.get(j).toString()).isEqualTo(value.get(j)); + } + } + readPks.add(pk); + } + } + assertThat(readRows).isEqualTo(expectedRowCount); + assertThat(readPks).isEqualTo(expectedPks); + arrowWriter.close(); + } + } + + @TestTemplate + public void testDvWithMapType() throws Exception { + testDv(true); + // build RowType + boolean nullable = RND.nextBoolean(); + RowType nestedMapType = + RowType.builder() + .field("pk", DataTypes.INT()) + .field( + "map", + DataTypes.MAP(DataTypes.INT(), DataTypes.STRING()).copy(nullable)) + .build(); + + int numRows = RND.nextInt(100) + 200; + List rows = new ArrayList<>(); + Map> mapValueStrings = new HashMap<>(); + for (int i = 0; i < numRows; i++) { + if (nullable && RND.nextBoolean()) { + rows.add(GenericRow.of(i, null)); + mapValueStrings.put(i, null); + continue; + } + int currentSize = RND.nextInt(5); + List currentStrings = + IntStream.range(0, currentSize) + .mapToObj(idx -> StringUtils.getRandomString(RND, 50, 50)) + .collect(Collectors.toList()); + mapValueStrings.put(i, currentStrings); + Map map = + IntStream.range(0, currentSize) + .boxed() + .collect( + Collectors.toMap( + idx -> idx, + idx -> + BinaryString.fromString( + currentStrings.get(idx)))); + rows.add(GenericRow.of(i, new GenericMap(map))); + } + Set deleted = getDeletedPks(numRows); + RecordReader.RecordIterator iterator = + getApplyDeletionFileRecordIterator( + nestedMapType, rows, deleted, Collections.singletonList("pk"), null); + try (RootAllocator allocator = new RootAllocator()) { + Set expectedPks = getExpectedPks(numRows, deleted); + VectorSchemaRoot vsr = ArrowUtils.createVectorSchemaRoot(nestedMapType, allocator); + ArrowWriter arrowWriter = createArrowWriter(iterator, nestedMapType, vsr); + int expectedRowCount = rows.size() - deleted.size(); + int averageBatchRows = RND.nextInt(expectedRowCount) + 1; + int readRows = 0; + Set readPks = new HashSet<>(); + while ((vsr = arrowWriter.next(averageBatchRows)) != null) { + readRows += vsr.getRowCount(); + // validate current result + IntVector pkVector = (IntVector) vsr.getVector(0); + MapVector mapVector = (MapVector) vsr.getVector(1); + for (int i = 0; i < vsr.getRowCount(); i++) { + int pk = pkVector.get(i); + assertThat(expectedPks).contains(pk); + List value = mapValueStrings.get(pk); + if (value == null) { + assertThat(mapVector.isNull(i)).isTrue(); + } else { + JsonStringArrayList list = + (JsonStringArrayList) mapVector.getObject(i); + assertThat(list.size()).isEqualTo(value.size()); + for (int j = 0; j < list.size(); j++) { + JsonStringHashMap map = (JsonStringHashMap) list.get(j); + assertThat(map.size()).isEqualTo(2); + assertThat(map.get(MapVector.KEY_NAME)).isEqualTo(j); + assertThat(map.get(MapVector.VALUE_NAME).toString()) + .isEqualTo(value.get(j)); + } + } + readPks.add(pk); + } + } + assertThat(readRows).isEqualTo(expectedRowCount); + assertThat(readPks).isEqualTo(expectedPks); + arrowWriter.close(); + } + } + + @TestTemplate + public void testDvWithRowType() throws Exception { + testDv(true); + // build RowType + boolean nullable = RND.nextBoolean(); + RowType nestedRowType = + new RowType( + Arrays.asList( + DataTypes.FIELD(19, "pk", DataTypes.INT()), + DataTypes.FIELD(20, "row", PRIMITIVE_TYPE.copy(nullable)))); + + int numRows = RND.nextInt(100) + 200; + List rows = new ArrayList<>(); + Map expectedValues = new HashMap<>(); + for (int i = 0; i < numRows; i++) { + if (nullable && RND.nextBoolean()) { + rows.add(GenericRow.of(i, null)); + expectedValues.put(i, null); + continue; + } + Object[] randomRowValues = randomRowValues(NULLABLE); + expectedValues.put(i, randomRowValues); + rows.add(GenericRow.of(i, GenericRow.of(randomRowValues))); + } + + Set deleted = getDeletedPks(numRows); + RecordReader.RecordIterator iterator = + getApplyDeletionFileRecordIterator( + nestedRowType, rows, deleted, Collections.singletonList("pk"), null); + try (RootAllocator allocator = new RootAllocator()) { + Set expectedPks = getExpectedPks(numRows, deleted); + VectorSchemaRoot vsr = ArrowUtils.createVectorSchemaRoot(nestedRowType, allocator); + ArrowWriter arrowWriter = createArrowWriter(iterator, nestedRowType, vsr); + int expectedRowCount = rows.size() - deleted.size(); + int averageBatchRows = RND.nextInt(expectedRowCount) + 1; + int readRows = 0; + Set readPks = new HashSet<>(); + while ((vsr = arrowWriter.next(averageBatchRows)) != null) { + readRows += vsr.getRowCount(); + // validate current result + IntVector pkVector = (IntVector) vsr.getVector(0); + StructVector structVector = (StructVector) vsr.getVector(1); + for (int i = 0; i < vsr.getRowCount(); i++) { + int pk = pkVector.get(i); + assertThat(expectedPks).contains(pk); + Object[] expected = expectedValues.get(pk); + if (expected == null) { + assertThat(structVector.isNull(i)).isTrue(); + } else { + String expectedString = paimonObjectsToString(expected, PRIMITIVE_TYPE); + String actualString = + arrowObjectsToString( + structVector.getChildrenFromFields(), PRIMITIVE_TYPE, i); + assertThat(actualString).isEqualTo(expectedString); + } + readPks.add(pk); + } + } + assertThat(readRows).isEqualTo(expectedRowCount); + assertThat(readPks).isEqualTo(expectedPks); + arrowWriter.close(); + } + } + + private Set getDeletedPks(int numRows) { + int numDeleted = RND.nextInt(50) + 1; + Set deleted = new HashSet<>(); + while (deleted.size() < numDeleted) { + deleted.add(RND.nextInt(numRows)); + } + return deleted; + } + + private Set getExpectedPks(int numRows, Set deletedPks) { + return IntStream.range(0, numRows) + .filter(i -> !deletedPks.contains(i)) + .boxed() + .collect(Collectors.toSet()); + } + + private void testReadEmpty( + RecordReader.RecordIterator iterator, int expectedRowCount) { + try (RootAllocator allocator = new RootAllocator()) { + RowType rowType = RowType.of(); + VectorSchemaRoot vsr = ArrowUtils.createVectorSchemaRoot(rowType, allocator); + ArrowWriter arrowWriter = createArrowWriter(iterator, rowType, vsr); + arrowWriter.next(expectedRowCount); + assertThat(vsr.getRowCount()).isEqualTo(expectedRowCount); + assertThat(vsr.getSchema().getFields()).isEmpty(); + assertThat(vsr.getFieldVectors()).isEmpty(); + arrowWriter.close(); + } + } + + private RecordReader.RecordIterator getRecordIterator( + RowType rowType, List rows) throws Exception { + return getRecordIterator(rowType, rows, null); + } + + private RecordReader.RecordIterator getRecordIterator( + RowType rowType, List rows, @Nullable int[] projection) throws Exception { + Map options = new HashMap<>(); + options.put(CoreOptions.FILE_FORMAT.key(), RND.nextBoolean() ? "orc" : "parquet"); + FileStoreTable table = createFileStoreTable(rowType, Collections.emptyList(), options); + + StreamTableWrite write = table.newStreamWriteBuilder().newWrite(); + write.withIOManager(new IOManagerImpl(tempDir.toString())); + StreamTableCommit commit = table.newStreamWriteBuilder().newCommit(); + for (InternalRow row : rows) { + write.write(row); + } + commit.commit(0, write.prepareCommit(false, 0)); + + return table.newRead() + .withProjection(projection) + .createReader(table.newReadBuilder().newScan().plan()) + .readBatch(); + } + + private RecordReader.RecordIterator getApplyDeletionFileRecordIterator( + RowType rowType, + List rows, + Set deletedPks, + List primaryKeys, + @Nullable int[] projection) + throws Exception { + Map options = new HashMap<>(); + options.put(CoreOptions.DELETION_VECTORS_ENABLED.key(), "true"); + options.put(CoreOptions.BUCKET.key(), "1"); + options.put(CoreOptions.FILE_FORMAT.key(), RND.nextBoolean() ? "orc" : "parquet"); + FileStoreTable table = createFileStoreTable(rowType, primaryKeys, options); + + StreamTableWrite write = table.newStreamWriteBuilder().newWrite(); + write.withIOManager(new IOManagerImpl(tempDir.toString())); + StreamTableCommit commit = table.newStreamWriteBuilder().newCommit(); + + for (InternalRow row : rows) { + write.write(row); + } + commit.commit(0, write.prepareCommit(true, 0)); + + for (int delete : deletedPks) { + GenericRow row = rows.get(delete); + Object[] fields = + IntStream.range(0, row.getFieldCount()).mapToObj(row::getField).toArray(); + write.write(GenericRow.ofKind(RowKind.DELETE, fields)); + } + commit.commit(1, write.prepareCommit(true, 1)); + + RecordReader.RecordIterator iterator = + table.newRead() + .withProjection(projection) + .createReader(table.newReadBuilder().newScan().plan()) + .readBatch(); + assertThat(isVectorizedWithDv(iterator)).isTrue(); + return iterator; + } + + private FileStoreTable createFileStoreTable( + RowType rowType, List primaryKeys, Map options) + throws Exception { + Schema schema = + new Schema(rowType.getFields(), Collections.emptyList(), primaryKeys, options, ""); + Identifier identifier = Identifier.create("default", UUID.randomUUID().toString()); + catalog.createTable(identifier, schema, false); + return (FileStoreTable) catalog.getTable(identifier); + } + + private ArrowWriter createArrowWriter( + RecordReader.RecordIterator iterator, + RowType rowType, + VectorSchemaRoot vsr) { + ArrowFieldWriter[] fieldWriters = ArrowUtils.createArrowFieldWriters(vsr, rowType); + if (testMode.equals("vectorized_without_dv")) { + ArrowBatchWriter batchWriter = new ArrowBatchWriter(vsr, fieldWriters); + batchWriter.reset((VectorizedRecordIterator) iterator); + return batchWriter; + } else if (testMode.equals("vectorized_with_dv")) { + ArrowBatchWriter batchWriter = new ArrowBatchWriter(vsr, fieldWriters); + batchWriter.reset((ApplyDeletionFileRecordIterator) iterator); + return batchWriter; + } else { + ArrowRowWriter rowWriter = new ArrowRowWriter(vsr, fieldWriters); + rowWriter.reset(iterator); + return rowWriter; + } + } + + private boolean isVectorizedWithDv(RecordReader.RecordIterator iterator) { + if (iterator instanceof ApplyDeletionFileRecordIterator) { + ApplyDeletionFileRecordIterator deletionIterator = + (ApplyDeletionFileRecordIterator) iterator; + FileRecordIterator innerIterator = deletionIterator.iterator(); + return innerIterator instanceof VectorizedRecordIterator; + } + return false; + } + + private Object[] randomRowValues(boolean[] nullable) { + Object[] values = new Object[18]; + values[0] = BinaryString.fromString(StringUtils.getRandomString(RND, 10, 10)); + values[1] = BinaryString.fromString(StringUtils.getRandomString(RND, 1, 20)); + values[2] = RND.nextBoolean(); + values[3] = randomBytes(10, 10); + values[4] = randomBytes(1, 20); + values[5] = Decimal.fromBigDecimal(new BigDecimal("0.22"), 2, 2); + values[6] = Decimal.fromBigDecimal(new BigDecimal("12312455.22"), 38, 2); + values[7] = Decimal.fromBigDecimal(new BigDecimal("12455.1"), 10, 1); + values[8] = (byte) RND.nextInt(Byte.MAX_VALUE); + values[9] = (short) RND.nextInt(Short.MAX_VALUE); + values[10] = RND.nextInt(); + values[11] = RND.nextLong(); + values[12] = RND.nextFloat(); + values[13] = RND.nextDouble(); + values[14] = RND.nextInt(); + values[15] = Timestamp.fromEpochMillis(RND.nextInt(1000)); + values[16] = Timestamp.fromEpochMillis(RND.nextInt(1000), RND.nextInt(1000) * 1000); + values[17] = Timestamp.fromEpochMillis(RND.nextInt(1000), RND.nextInt(1000_000)); + + for (int i = 0; i < 18; i++) { + if (nullable[i] && RND.nextBoolean()) { + values[i] = null; + } + } + + return values; + } + + private byte[] randomBytes(int minLength, int maxLength) { + int len = RND.nextInt(maxLength - minLength + 1) + minLength; + byte[] bytes = new byte[len]; + for (int i = 0; i < len; i++) { + bytes[i] = (byte) RND.nextInt(10); + } + return bytes; + } + + private String paimonObjectsToString(Object[] values, RowType rowType) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < 18; i++) { + if (i > 0) { + sb.append(" | "); + } + + Object value = values[i]; + + if (value == null) { + sb.append("null"); + continue; + } + + switch (rowType.getTypeAt(i).getTypeRoot()) { + case BINARY: + case VARBINARY: + // transfer bytes to string + sb.append(new String((byte[]) value)); + continue; + case TIME_WITHOUT_TIME_ZONE: + // transfer integer to LocalDateTime string + sb.append( + DateTimeUtils.toLocalDateTime( + (int) value, DateTimeUtils.UTC_ZONE.toZoneId())); + continue; + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + // transfer Timestamp to LocalDateTime string + sb.append(((Timestamp) value).toLocalDateTime()); + continue; + default: + sb.append(value); + } + } + + return sb.toString(); + } + + private String arrowObjectsToString(List fieldVectors, RowType rowType, int row) { + StringBuilder sb = new StringBuilder(); + + for (int i = 0; i < 18; i++) { + if (i > 0) { + sb.append(" | "); + } + + FieldVector currentVector = fieldVectors.get(i); + Object value = currentVector.getObject(row); + + if (value == null) { + sb.append("null"); + continue; + } + + switch (rowType.getTypeAt(i).getTypeRoot()) { + case BINARY: + case VARBINARY: + // transfer bytes to string + sb.append(new String((byte[]) value)); + continue; + default: + sb.append(value); + } + } + + return sb.toString(); + } +} diff --git a/paimon-arrow/src/test/resources/log4j2-test.properties b/paimon-arrow/src/test/resources/log4j2-test.properties new file mode 100644 index 000000000000..1b3980d15104 --- /dev/null +++ b/paimon-arrow/src/test/resources/log4j2-test.properties @@ -0,0 +1,28 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level = OFF +rootLogger.appenderRef.test.ref = TestLogger + +appender.testlogger.name = TestLogger +appender.testlogger.type = CONSOLE +appender.testlogger.target = SYSTEM_ERR +appender.testlogger.layout.type = PatternLayout +appender.testlogger.layout.pattern = %-4r [%tid %t] %-5p %c %x - %m%n From 7fd41a3fdfc4b3720918b3439ac957c52b43d1ac Mon Sep 17 00:00:00 2001 From: yuzelin Date: Thu, 15 Aug 2024 20:50:50 +0800 Subject: [PATCH 3/6] fix --- paimon-arrow/pom.xml | 3 +- .../arrow/ArrowFieldTypeConversion.java | 184 ++++++++++++++++++ .../org/apache/paimon/arrow/ArrowUtils.java | 40 +++- .../paimon/arrow/ToFieldTypeVisitor.java | 158 --------------- .../arrow/writer/ArrowFieldWriters.java | 25 +-- .../paimon/arrow/writer}/ArrowWriterTest.java | 14 +- 6 files changed, 240 insertions(+), 184 deletions(-) create mode 100644 paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldTypeConversion.java delete mode 100644 paimon-arrow/src/main/java/org/apache/paimon/arrow/ToFieldTypeVisitor.java rename paimon-arrow/src/test/java/{ => org/apache/paimon/arrow/writer}/ArrowWriterTest.java (99%) diff --git a/paimon-arrow/pom.xml b/paimon-arrow/pom.xml index be2fbeb5631c..81183895c4f0 100644 --- a/paimon-arrow/pom.xml +++ b/paimon-arrow/pom.xml @@ -31,11 +31,12 @@ under the License. paimon-arrow Paimon : Arrow + jar + 14.0.0 - diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldTypeConversion.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldTypeConversion.java new file mode 100644 index 000000000000..e8079fe4d8c4 --- /dev/null +++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldTypeConversion.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.arrow; + +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.BinaryType; +import org.apache.paimon.types.BooleanType; +import org.apache.paimon.types.CharType; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypeVisitor; +import org.apache.paimon.types.DateType; +import org.apache.paimon.types.DecimalType; +import org.apache.paimon.types.DoubleType; +import org.apache.paimon.types.FloatType; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.LocalZonedTimestampType; +import org.apache.paimon.types.MapType; +import org.apache.paimon.types.MultisetType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.types.SmallIntType; +import org.apache.paimon.types.TimeType; +import org.apache.paimon.types.TimestampType; +import org.apache.paimon.types.TinyIntType; +import org.apache.paimon.types.VarBinaryType; +import org.apache.paimon.types.VarCharType; + +import org.apache.arrow.vector.types.TimeUnit; +import org.apache.arrow.vector.types.Types; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.FieldType; + +import java.util.TimeZone; + +/** Utils for conversion between Paimon {@link DataType} and Arrow {@link FieldType}. */ +public class ArrowFieldTypeConversion { + + public static final ArrowFieldTypeVisitor ARROW_FIELD_TYPE_VISITOR = + new ArrowFieldTypeVisitor(); + + private static class ArrowFieldTypeVisitor implements DataTypeVisitor { + + @Override + public FieldType visit(CharType charType) { + return new FieldType(charType.isNullable(), Types.MinorType.VARCHAR.getType(), null); + } + + @Override + public FieldType visit(VarCharType varCharType) { + return new FieldType(varCharType.isNullable(), Types.MinorType.VARCHAR.getType(), null); + } + + @Override + public FieldType visit(BooleanType booleanType) { + return new FieldType(booleanType.isNullable(), Types.MinorType.BIT.getType(), null); + } + + @Override + public FieldType visit(BinaryType binaryType) { + return new FieldType( + binaryType.isNullable(), Types.MinorType.VARBINARY.getType(), null); + } + + @Override + public FieldType visit(VarBinaryType varBinaryType) { + return new FieldType( + varBinaryType.isNullable(), Types.MinorType.VARBINARY.getType(), null); + } + + @Override + public FieldType visit(DecimalType decimalType) { + return new FieldType( + decimalType.isNullable(), + new ArrowType.Decimal(decimalType.getPrecision(), decimalType.getScale(), 128), + null); + } + + @Override + public FieldType visit(TinyIntType tinyIntType) { + return new FieldType(tinyIntType.isNullable(), Types.MinorType.TINYINT.getType(), null); + } + + @Override + public FieldType visit(SmallIntType smallIntType) { + return new FieldType( + smallIntType.isNullable(), Types.MinorType.SMALLINT.getType(), null); + } + + @Override + public FieldType visit(IntType intType) { + return new FieldType(intType.isNullable(), Types.MinorType.INT.getType(), null); + } + + @Override + public FieldType visit(BigIntType bigIntType) { + return new FieldType(bigIntType.isNullable(), Types.MinorType.BIGINT.getType(), null); + } + + @Override + public FieldType visit(FloatType floatType) { + return new FieldType(floatType.isNullable(), Types.MinorType.FLOAT4.getType(), null); + } + + @Override + public FieldType visit(DoubleType doubleType) { + return new FieldType(doubleType.isNullable(), Types.MinorType.FLOAT8.getType(), null); + } + + @Override + public FieldType visit(DateType dateType) { + return new FieldType(dateType.isNullable(), Types.MinorType.DATEDAY.getType(), null); + } + + @Override + public FieldType visit(TimeType timeType) { + return new FieldType(timeType.isNullable(), Types.MinorType.TIMEMILLI.getType(), null); + } + + @Override + public FieldType visit(TimestampType timestampType) { + int precision = timestampType.getPrecision(); + TimeUnit timeUnit = getTimeUnit(precision); + ArrowType arrowType = new ArrowType.Timestamp(timeUnit, null); + return new FieldType(timestampType.isNullable(), arrowType, null); + } + + @Override + public FieldType visit(LocalZonedTimestampType localZonedTimestampType) { + int precision = localZonedTimestampType.getPrecision(); + TimeUnit timeUnit = getTimeUnit(precision); + ArrowType arrowType = + new ArrowType.Timestamp(timeUnit, TimeZone.getDefault().toString()); + return new FieldType(localZonedTimestampType.isNullable(), arrowType, null); + } + + private TimeUnit getTimeUnit(int precision) { + if (precision == 0) { + return TimeUnit.SECOND; + } else if (precision >= 1 && precision <= 3) { + return TimeUnit.MILLISECOND; + } else if (precision >= 4 && precision <= 6) { + return TimeUnit.MICROSECOND; + } else { + return TimeUnit.NANOSECOND; + } + } + + @Override + public FieldType visit(ArrayType arrayType) { + return new FieldType(arrayType.isNullable(), Types.MinorType.LIST.getType(), null); + } + + @Override + public FieldType visit(MultisetType multisetType) { + throw new UnsupportedOperationException("Doesn't support MultisetType."); + } + + @Override + public FieldType visit(MapType mapType) { + return new FieldType(mapType.isNullable(), new ArrowType.Map(false), null); + } + + @Override + public FieldType visit(RowType rowType) { + return new FieldType(rowType.isNullable(), Types.MinorType.STRUCT.getType(), null); + } + } +} diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java index 009dca8c63e4..462de41f1859 100644 --- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java +++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java @@ -20,6 +20,7 @@ import org.apache.paimon.arrow.writer.ArrowFieldWriter; import org.apache.paimon.arrow.writer.ArrowFieldWriterFactoryVisitor; +import org.apache.paimon.data.Timestamp; import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.DataType; import org.apache.paimon.types.MapType; @@ -35,6 +36,10 @@ import org.apache.arrow.vector.types.pojo.FieldType; import org.apache.arrow.vector.types.pojo.Schema; +import javax.annotation.Nullable; + +import java.time.Instant; +import java.time.ZoneId; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -62,7 +67,7 @@ public static VectorSchemaRoot createVectorSchemaRoot( } private static Field toArrowField(String fieldName, DataType dataType) { - FieldType fieldType = dataType.accept(ToFieldTypeVisitor.INSTANCE); + FieldType fieldType = dataType.accept(ArrowFieldTypeConversion.ARROW_FIELD_TYPE_VISITOR); List children = null; if (dataType instanceof ArrayType) { children = @@ -109,4 +114,37 @@ public static ArrowFieldWriter[] createArrowFieldWriters( return fieldWriters; } + + public static long timestampToEpoch( + Timestamp timestamp, int precision, @Nullable ZoneId castZoneId) { + return castZoneId == null + ? nonCastedTimestampToEpoch(timestamp, precision) + : zoneCastedTimestampZoneCastToEpoch(timestamp, precision, castZoneId); + } + + private static long nonCastedTimestampToEpoch(Timestamp timestamp, int precision) { + if (precision == 0) { + return timestamp.getMillisecond() / 1000; + } else if (precision >= 1 && precision <= 3) { + return timestamp.getMillisecond(); + } else if (precision >= 4 && precision <= 6) { + return timestamp.toMicros(); + } else { + return timestamp.getMillisecond() * 1_000_000 + timestamp.getNanoOfMillisecond(); + } + } + + private static long zoneCastedTimestampZoneCastToEpoch( + Timestamp timestamp, int precision, ZoneId castZoneId) { + Instant instant = timestamp.toLocalDateTime().atZone(castZoneId).toInstant(); + if (precision == 0) { + return instant.getEpochSecond(); + } else if (precision >= 1 && precision <= 3) { + return instant.getEpochSecond() * 1_000 + instant.getNano() / 1_000_000; + } else if (precision >= 4 && precision <= 6) { + return instant.getEpochSecond() * 1_000_000 + instant.getNano() / 1_000; + } else { + return instant.getEpochSecond() * 1_000_000_000 + instant.getNano(); + } + } } diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ToFieldTypeVisitor.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ToFieldTypeVisitor.java deleted file mode 100644 index a6f303378987..000000000000 --- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ToFieldTypeVisitor.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.arrow; - -import org.apache.paimon.types.ArrayType; -import org.apache.paimon.types.BigIntType; -import org.apache.paimon.types.BinaryType; -import org.apache.paimon.types.BooleanType; -import org.apache.paimon.types.CharType; -import org.apache.paimon.types.DataTypeVisitor; -import org.apache.paimon.types.DateType; -import org.apache.paimon.types.DecimalType; -import org.apache.paimon.types.DoubleType; -import org.apache.paimon.types.FloatType; -import org.apache.paimon.types.IntType; -import org.apache.paimon.types.LocalZonedTimestampType; -import org.apache.paimon.types.MapType; -import org.apache.paimon.types.MultisetType; -import org.apache.paimon.types.RowType; -import org.apache.paimon.types.SmallIntType; -import org.apache.paimon.types.TimeType; -import org.apache.paimon.types.TimestampType; -import org.apache.paimon.types.TinyIntType; -import org.apache.paimon.types.VarBinaryType; -import org.apache.paimon.types.VarCharType; - -import org.apache.arrow.vector.types.Types; -import org.apache.arrow.vector.types.pojo.ArrowType; -import org.apache.arrow.vector.types.pojo.FieldType; - -/** Convert Paimon type to {@link FieldType}. */ -public class ToFieldTypeVisitor implements DataTypeVisitor { - - public static final ToFieldTypeVisitor INSTANCE = new ToFieldTypeVisitor(); - - @Override - public FieldType visit(CharType charType) { - return new FieldType(charType.isNullable(), Types.MinorType.VARCHAR.getType(), null); - } - - @Override - public FieldType visit(VarCharType varCharType) { - return new FieldType(varCharType.isNullable(), Types.MinorType.VARCHAR.getType(), null); - } - - @Override - public FieldType visit(BooleanType booleanType) { - return new FieldType(booleanType.isNullable(), Types.MinorType.BIT.getType(), null); - } - - @Override - public FieldType visit(BinaryType binaryType) { - return new FieldType(binaryType.isNullable(), Types.MinorType.VARBINARY.getType(), null); - } - - @Override - public FieldType visit(VarBinaryType varBinaryType) { - return new FieldType(varBinaryType.isNullable(), Types.MinorType.VARBINARY.getType(), null); - } - - @Override - public FieldType visit(DecimalType decimalType) { - return new FieldType( - decimalType.isNullable(), - new ArrowType.Decimal(decimalType.getPrecision(), decimalType.getScale(), 128), - null); - } - - @Override - public FieldType visit(TinyIntType tinyIntType) { - return new FieldType(tinyIntType.isNullable(), Types.MinorType.TINYINT.getType(), null); - } - - @Override - public FieldType visit(SmallIntType smallIntType) { - return new FieldType(smallIntType.isNullable(), Types.MinorType.SMALLINT.getType(), null); - } - - @Override - public FieldType visit(IntType intType) { - return new FieldType(intType.isNullable(), Types.MinorType.INT.getType(), null); - } - - @Override - public FieldType visit(BigIntType bigIntType) { - return new FieldType(bigIntType.isNullable(), Types.MinorType.BIGINT.getType(), null); - } - - @Override - public FieldType visit(FloatType floatType) { - return new FieldType(floatType.isNullable(), Types.MinorType.FLOAT4.getType(), null); - } - - @Override - public FieldType visit(DoubleType doubleType) { - return new FieldType(doubleType.isNullable(), Types.MinorType.FLOAT8.getType(), null); - } - - @Override - public FieldType visit(DateType dateType) { - return new FieldType(dateType.isNullable(), Types.MinorType.DATEDAY.getType(), null); - } - - @Override - public FieldType visit(TimeType timeType) { - return new FieldType(timeType.isNullable(), Types.MinorType.TIMEMILLI.getType(), null); - } - - @Override - public FieldType visit(TimestampType timestampType) { - return new FieldType( - timestampType.isNullable(), Types.MinorType.TIMESTAMPNANO.getType(), null); - } - - @Override - public FieldType visit(LocalZonedTimestampType localZonedTimestampType) { - return new FieldType( - localZonedTimestampType.isNullable(), - Types.MinorType.TIMESTAMPNANO.getType(), - null); - } - - @Override - public FieldType visit(ArrayType arrayType) { - return new FieldType(arrayType.isNullable(), Types.MinorType.LIST.getType(), null); - } - - @Override - public FieldType visit(MultisetType multisetType) { - throw new UnsupportedOperationException("Doesn't support MultisetType."); - } - - @Override - public FieldType visit(MapType mapType) { - return new FieldType(mapType.isNullable(), new ArrowType.Map(false), null); - } - - @Override - public FieldType visit(RowType rowType) { - return new FieldType(rowType.isNullable(), Types.MinorType.STRUCT.getType(), null); - } -} diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriters.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriters.java index 100c9eb20d05..f5e56e99bfc6 100644 --- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriters.java +++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriters.java @@ -18,6 +18,7 @@ package org.apache.paimon.arrow.writer; +import org.apache.paimon.arrow.ArrowUtils; import org.apache.paimon.data.DataGetters; import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalMap; @@ -50,7 +51,7 @@ import org.apache.arrow.vector.IntVector; import org.apache.arrow.vector.SmallIntVector; import org.apache.arrow.vector.TimeMilliVector; -import org.apache.arrow.vector.TimeStampNanoVector; +import org.apache.arrow.vector.TimeStampVector; import org.apache.arrow.vector.TinyIntVector; import org.apache.arrow.vector.VarBinaryVector; import org.apache.arrow.vector.VarCharVector; @@ -61,7 +62,6 @@ import javax.annotation.Nullable; import java.math.BigDecimal; -import java.time.Instant; import java.time.ZoneId; /** Registry of {@link ArrowFieldWriter}s. */ @@ -464,36 +464,27 @@ protected void doWrite( @Nullable int[] pickedInColumn, int startIndex, int batchRows) { - TimeStampNanoVector timeStampNanoVector = (TimeStampNanoVector) fieldVector; + TimeStampVector timeStampVector = (TimeStampVector) fieldVector; for (int i = 0; i < batchRows; i++) { int row = getRowNumber(startIndex, i, pickedInColumn); if (columnVector.isNullAt(row)) { - timeStampNanoVector.setNull(i); + timeStampVector.setNull(i); } else { Timestamp timestamp = ((TimestampColumnVector) columnVector).getTimestamp(row, precision); - long value = timestampToEpochNano(timestamp); - timeStampNanoVector.setSafe(i, value); + long value = ArrowUtils.timestampToEpoch(timestamp, precision, castZoneId); + timeStampVector.setSafe(i, value); } } } @Override protected void doWrite(int rowIndex, DataGetters getters, int pos) { - TimeStampNanoVector timeStampNanoVector = (TimeStampNanoVector) fieldVector; + TimeStampVector timeStampNanoVector = (TimeStampVector) fieldVector; Timestamp timestamp = getters.getTimestamp(pos, precision); - long value = timestampToEpochNano(timestamp); + long value = ArrowUtils.timestampToEpoch(timestamp, precision, castZoneId); timeStampNanoVector.setSafe(rowIndex, value); } - - private long timestampToEpochNano(Timestamp timestamp) { - if (castZoneId != null) { - Instant instant = timestamp.toLocalDateTime().atZone(castZoneId).toInstant(); - return instant.getEpochSecond() * 1_000_000_000 + instant.getNano(); - } else { - return timestamp.getMillisecond() * 1_000_000 + timestamp.getNanoOfMillisecond(); - } - } } /** Writer for ARRAY. */ diff --git a/paimon-arrow/src/test/java/ArrowWriterTest.java b/paimon-arrow/src/test/java/org/apache/paimon/arrow/writer/ArrowWriterTest.java similarity index 99% rename from paimon-arrow/src/test/java/ArrowWriterTest.java rename to paimon-arrow/src/test/java/org/apache/paimon/arrow/writer/ArrowWriterTest.java index 016f10fc800e..001f711d5271 100644 --- a/paimon-arrow/src/test/java/ArrowWriterTest.java +++ b/paimon-arrow/src/test/java/org/apache/paimon/arrow/writer/ArrowWriterTest.java @@ -16,12 +16,10 @@ * limitations under the License. */ +package org.apache.paimon.arrow.writer; + import org.apache.paimon.CoreOptions; import org.apache.paimon.arrow.ArrowUtils; -import org.apache.paimon.arrow.writer.ArrowBatchWriter; -import org.apache.paimon.arrow.writer.ArrowFieldWriter; -import org.apache.paimon.arrow.writer.ArrowRowWriter; -import org.apache.paimon.arrow.writer.ArrowWriter; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogFactory; @@ -47,6 +45,7 @@ import org.apache.paimon.testutils.junit.parameterized.Parameters; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.LocalZonedTimestampType; import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.DateTimeUtils; @@ -970,10 +969,11 @@ private String paimonObjectsToString(Object[] values, RowType rowType) { DateTimeUtils.toLocalDateTime( (int) value, DateTimeUtils.UTC_ZONE.toZoneId())); continue; - case TIMESTAMP_WITHOUT_TIME_ZONE: case TIMESTAMP_WITH_LOCAL_TIME_ZONE: - // transfer Timestamp to LocalDateTime string - sb.append(((Timestamp) value).toLocalDateTime()); + // transfer Timestamp to epoch value + Timestamp timestamp = (Timestamp) value; + LocalZonedTimestampType type = (LocalZonedTimestampType) rowType.getTypeAt(i); + sb.append(ArrowUtils.timestampToEpoch(timestamp, type.getPrecision(), null)); continue; default: sb.append(value); From 85765203e64e621c29863500f6e03edd8db91c81 Mon Sep 17 00:00:00 2001 From: yuzelin Date: Fri, 16 Aug 2024 11:12:19 +0800 Subject: [PATCH 4/6] fix --- .../org/apache/paimon/arrow/ArrowFieldTypeConversion.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldTypeConversion.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldTypeConversion.java index e8079fe4d8c4..c4c6d4bc7860 100644 --- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldTypeConversion.java +++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldTypeConversion.java @@ -46,7 +46,7 @@ import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.FieldType; -import java.util.TimeZone; +import java.time.ZoneId; /** Utils for conversion between Paimon {@link DataType} and Arrow {@link FieldType}. */ public class ArrowFieldTypeConversion { @@ -145,7 +145,7 @@ public FieldType visit(LocalZonedTimestampType localZonedTimestampType) { int precision = localZonedTimestampType.getPrecision(); TimeUnit timeUnit = getTimeUnit(precision); ArrowType arrowType = - new ArrowType.Timestamp(timeUnit, TimeZone.getDefault().toString()); + new ArrowType.Timestamp(timeUnit, ZoneId.systemDefault().toString()); return new FieldType(localZonedTimestampType.isNullable(), arrowType, null); } From c5be77402560301db026b505b77036f04f8e9b8e Mon Sep 17 00:00:00 2001 From: yuzelin Date: Fri, 16 Aug 2024 11:24:40 +0800 Subject: [PATCH 5/6] fix --- .../java/org/apache/paimon/arrow/ArrowFieldTypeConversion.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldTypeConversion.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldTypeConversion.java index c4c6d4bc7860..69b42c9e85e0 100644 --- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldTypeConversion.java +++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldTypeConversion.java @@ -54,7 +54,7 @@ public class ArrowFieldTypeConversion { public static final ArrowFieldTypeVisitor ARROW_FIELD_TYPE_VISITOR = new ArrowFieldTypeVisitor(); - private static class ArrowFieldTypeVisitor implements DataTypeVisitor { + public static class ArrowFieldTypeVisitor implements DataTypeVisitor { @Override public FieldType visit(CharType charType) { From b1b3fc6bf34c80faffd5456f92e68da417394a29 Mon Sep 17 00:00:00 2001 From: yuzelin Date: Fri, 16 Aug 2024 11:29:06 +0800 Subject: [PATCH 6/6] fix --- .../java/org/apache/paimon/arrow/ArrowFieldTypeConversion.java | 1 + 1 file changed, 1 insertion(+) diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldTypeConversion.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldTypeConversion.java index 69b42c9e85e0..6572fcadb9d9 100644 --- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldTypeConversion.java +++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldTypeConversion.java @@ -54,6 +54,7 @@ public class ArrowFieldTypeConversion { public static final ArrowFieldTypeVisitor ARROW_FIELD_TYPE_VISITOR = new ArrowFieldTypeVisitor(); + /** Convert {@link DataType} to {@link FieldType}. */ public static class ArrowFieldTypeVisitor implements DataTypeVisitor { @Override