iterator;
+
+ ArrowWriter(VectorSchemaRoot root, ArrowFieldWriter[] fieldWriters) {
+ this.root = root;
+ this.fieldWriters = fieldWriters;
+ }
+
+ /**
+ * Write and get at most {@code maxBatchRows} data. Return null when finishing writing current
+ * iterator.
+ *
+ * NOTE: the returned value will be reused, and it's lifecycle is managed by this writer.
+ */
+ @Nullable
+ public VectorSchemaRoot next(int maxBatchRows) {
+ if (iterator == null) {
+ return null;
+ }
+
+ for (ArrowFieldWriter fieldWriter : fieldWriters) {
+ fieldWriter.reset();
+ }
+ doWrite(maxBatchRows);
+ return root;
+ }
+
+ protected abstract void doWrite(int maxBatchRows);
+
+ public void close() {
+ root.close();
+ }
+
+ protected void releaseIterator() {
+ iterator.releaseBatch();
+ iterator = null;
+ }
+}
diff --git a/paimon-arrow/src/test/java/org/apache/paimon/arrow/writer/ArrowWriterTest.java b/paimon-arrow/src/test/java/org/apache/paimon/arrow/writer/ArrowWriterTest.java
new file mode 100644
index 000000000000..001f711d5271
--- /dev/null
+++ b/paimon-arrow/src/test/java/org/apache/paimon/arrow/writer/ArrowWriterTest.java
@@ -0,0 +1,1015 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.arrow.writer;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.arrow.ArrowUtils;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.GenericArray;
+import org.apache.paimon.data.GenericMap;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.deletionvectors.ApplyDeletionFileRecordIterator;
+import org.apache.paimon.disk.IOManagerImpl;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.reader.FileRecordIterator;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.reader.VectorizedRecordIterator;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.StreamTableCommit;
+import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.testutils.junit.parameterized.ParameterizedTestExtension;
+import org.apache.paimon.testutils.junit.parameterized.Parameters;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.LocalZonedTimestampType;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.DateTimeUtils;
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.MapVector;
+import org.apache.arrow.vector.complex.StructVector;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.util.JsonStringArrayList;
+import org.apache.arrow.vector.util.JsonStringHashMap;
+import org.apache.arrow.vector.util.Text;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+
+import javax.annotation.Nullable;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
+
+/** UT for {@link ArrowWriter}. */
+@ExtendWith(ParameterizedTestExtension.class)
+public class ArrowWriterTest {
+
+ private static final Random RND = ThreadLocalRandom.current();
+ private @TempDir java.nio.file.Path tempDir;
+ private final String testMode;
+ private Catalog catalog;
+
+ private static final boolean[] NULLABLE;
+ private static final RowType PRIMITIVE_TYPE;
+
+ static {
+ int cnt = 18;
+ NULLABLE = new boolean[cnt];
+ for (int i = 0; i < cnt; i++) {
+ NULLABLE[i] = RND.nextBoolean();
+ }
+
+ List dataFields = new ArrayList<>();
+ dataFields.add(new DataField(0, "char", DataTypes.CHAR(10).copy(NULLABLE[0])));
+ dataFields.add(new DataField(1, "varchar", DataTypes.VARCHAR(20).copy(NULLABLE[1])));
+ dataFields.add(new DataField(2, "boolean", DataTypes.BOOLEAN().copy(NULLABLE[2])));
+ dataFields.add(new DataField(3, "binary", DataTypes.BINARY(10).copy(NULLABLE[3])));
+ dataFields.add(new DataField(4, "varbinary", DataTypes.VARBINARY(20).copy(NULLABLE[4])));
+ dataFields.add(new DataField(5, "decimal1", DataTypes.DECIMAL(2, 2).copy(NULLABLE[5])));
+ dataFields.add(new DataField(6, "decimal2", DataTypes.DECIMAL(38, 2).copy(NULLABLE[6])));
+ dataFields.add(new DataField(7, "decimal3", DataTypes.DECIMAL(10, 1).copy(NULLABLE[7])));
+ dataFields.add(new DataField(8, "tinyint", DataTypes.TINYINT().copy(NULLABLE[8])));
+ dataFields.add(new DataField(9, "smallint", DataTypes.SMALLINT().copy(NULLABLE[9])));
+ dataFields.add(new DataField(10, "int", DataTypes.INT().copy(NULLABLE[10])));
+ dataFields.add(new DataField(11, "bigint", DataTypes.BIGINT().copy(NULLABLE[11])));
+ dataFields.add(new DataField(12, "float", DataTypes.FLOAT().copy(NULLABLE[12])));
+ dataFields.add(new DataField(13, "double", DataTypes.DOUBLE().copy(NULLABLE[13])));
+ dataFields.add(new DataField(14, "date", DataTypes.DATE().copy(NULLABLE[14])));
+ dataFields.add(new DataField(15, "timestamp3", DataTypes.TIMESTAMP(3).copy(NULLABLE[15])));
+ dataFields.add(new DataField(16, "timestamp6", DataTypes.TIMESTAMP(6).copy(NULLABLE[16])));
+ dataFields.add(
+ new DataField(
+ 17,
+ "timestampLZ9",
+ DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(9).copy(NULLABLE[17])));
+ PRIMITIVE_TYPE = new RowType(dataFields);
+ }
+
+ public ArrowWriterTest(String testMode) {
+ this.testMode = testMode;
+ }
+
+ @SuppressWarnings("unused")
+ @Parameters(name = "test-mode = {0}")
+ public static List testMode() {
+ return Arrays.asList("vectorized_without_dv", "per_row", "vectorized_with_dv");
+ }
+
+ private void testDv(boolean testDv) {
+ assumeThat(testMode.equals("vectorized_with_dv")).isEqualTo(testDv);
+ }
+
+ @BeforeEach
+ public void reset() throws Exception {
+ catalog =
+ CatalogFactory.createCatalog(
+ CatalogContext.create(new Path(tempDir.toUri().toString())));
+ catalog.createDatabase("default", false);
+ }
+
+ private RecordReader.RecordIterator createPrimitiveIterator(
+ List