diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/AbstractHeapVector.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/AbstractHeapVector.java index 702877642327..f0e82eac4fb1 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/AbstractHeapVector.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/AbstractHeapVector.java @@ -25,7 +25,8 @@ import java.util.Arrays; /** Heap vector that nullable shared structure. */ -public abstract class AbstractHeapVector extends AbstractWritableVector { +public abstract class AbstractHeapVector extends AbstractWritableVector + implements ElementCountable { public static final boolean LITTLE_ENDIAN = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN; @@ -116,6 +117,7 @@ public HeapIntVector getDictionaryIds() { return dictionaryIds; } + @Override public int getLen() { return this.len; } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/position/RowPosition.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/ElementCountable.java similarity index 60% rename from paimon-format/src/main/java/org/apache/paimon/format/parquet/position/RowPosition.java rename to paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/ElementCountable.java index fb6378349007..a32762d659fd 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/position/RowPosition.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/ElementCountable.java @@ -16,25 +16,10 @@ * limitations under the License. */ -package org.apache.paimon.format.parquet.position; +package org.apache.paimon.data.columnar.heap; -import javax.annotation.Nullable; +/** Container with a known number of elements. */ +public interface ElementCountable { -/** To represent struct's position in repeated type. */ -public class RowPosition { - @Nullable private final boolean[] isNull; - private final int positionsCount; - - public RowPosition(boolean[] isNull, int positionsCount) { - this.isNull = isNull; - this.positionsCount = positionsCount; - } - - public boolean[] getIsNull() { - return isNull; - } - - public int getPositionsCount() { - return positionsCount; - } + int getLen(); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java index c30e6cd5612d..cdc114b048a1 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java @@ -34,6 +34,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import java.math.BigDecimal; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -573,6 +574,24 @@ public void testCountStarPK() { validateCount1NotPushDown(sql); } + @Test + public void testParquetRowDecimalAndTimestamp() { + sql( + "CREATE TABLE parquet_row_decimal(`row` ROW) WITH ('file.format' = 'parquet')"); + sql("INSERT INTO parquet_row_decimal VALUES ( (ROW(1.2)) )"); + + assertThat(sql("SELECT * FROM parquet_row_decimal")) + .containsExactly(Row.of(Row.of(new BigDecimal("1.2")))); + + sql( + "CREATE TABLE parquet_row_timestamp(`row` ROW) WITH ('file.format' = 'parquet')"); + sql("INSERT INTO parquet_row_timestamp VALUES ( (ROW(TIMESTAMP'2024-11-13 18:00:00')) )"); + + assertThat(sql("SELECT * FROM parquet_row_timestamp")) + .containsExactly( + Row.of(Row.of(DateTimeUtils.toLocalDateTime("2024-11-13 18:00:00", 0)))); + } + private void validateCount1PushDown(String sql) { Transformation transformation = AbstractTestBase.translate(tEnv, sql); while (!transformation.getInputs().isEmpty()) { diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java index 53b4b1634b5f..f0151d6f3d8f 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java @@ -23,6 +23,7 @@ import org.apache.paimon.data.columnar.ColumnarRow; import org.apache.paimon.data.columnar.ColumnarRowIterator; import org.apache.paimon.data.columnar.VectorizedColumnBatch; +import org.apache.paimon.data.columnar.heap.ElementCountable; import org.apache.paimon.data.columnar.writable.WritableColumnVector; import org.apache.paimon.format.FormatReaderFactory; import org.apache.paimon.format.parquet.reader.ColumnReader; @@ -293,7 +294,10 @@ private VectorizedColumnBatch createVectorizedColumnBatch( for (int i = 0; i < writableVectors.length; i++) { switch (projectedFields[i].type().getTypeRoot()) { case DECIMAL: - vectors[i] = new ParquetDecimalVector(writableVectors[i]); + vectors[i] = + new ParquetDecimalVector( + writableVectors[i], + ((ElementCountable) writableVectors[i]).getLen()); break; case TIMESTAMP_WITHOUT_TIME_ZONE: case TIMESTAMP_WITH_LOCAL_TIME_ZONE: diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java index c89c77603dac..68225fbd1320 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java @@ -20,6 +20,7 @@ import org.apache.paimon.data.columnar.ColumnVector; import org.apache.paimon.data.columnar.heap.AbstractHeapVector; +import org.apache.paimon.data.columnar.heap.ElementCountable; import org.apache.paimon.data.columnar.heap.HeapArrayVector; import org.apache.paimon.data.columnar.heap.HeapMapVector; import org.apache.paimon.data.columnar.heap.HeapRowVector; @@ -134,7 +135,7 @@ private Pair readRow( String.format("Row field does not have any children: %s.", field)); } - int len = ((AbstractHeapVector) finalChildrenVectors[0]).getLen(); + int len = ((ElementCountable) finalChildrenVectors[0]).getLen(); boolean[] isNull = new boolean[len]; Arrays.fill(isNull, true); boolean hasNull = false; diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java index 7ee33a0bb5cc..7d00ff79234a 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java @@ -495,7 +495,7 @@ private WritableColumnVector fillColumnVector(int total, List valueList) { phiv.vector[i] = ((List) valueList).get(i); } } - return new ParquetDecimalVector(phiv); + return new ParquetDecimalVector(phiv, total); case INT64: HeapLongVector phlv = new HeapLongVector(total); for (int i = 0; i < valueList.size(); i++) { @@ -505,10 +505,10 @@ private WritableColumnVector fillColumnVector(int total, List valueList) { phlv.vector[i] = ((List) valueList).get(i); } } - return new ParquetDecimalVector(phlv); + return new ParquetDecimalVector(phlv, total); default: HeapBytesVector phbv = getHeapBytesVector(total, valueList); - return new ParquetDecimalVector(phbv); + return new ParquetDecimalVector(phbv, total); } default: throw new RuntimeException("Unsupported type in the list: " + type); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDecimalVector.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDecimalVector.java index 28d308bac61f..42714ab066da 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDecimalVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDecimalVector.java @@ -25,6 +25,7 @@ import org.apache.paimon.data.columnar.Dictionary; import org.apache.paimon.data.columnar.IntColumnVector; import org.apache.paimon.data.columnar.LongColumnVector; +import org.apache.paimon.data.columnar.heap.ElementCountable; import org.apache.paimon.data.columnar.writable.WritableBytesVector; import org.apache.paimon.data.columnar.writable.WritableColumnVector; import org.apache.paimon.data.columnar.writable.WritableIntVector; @@ -38,12 +39,18 @@ * {@link DecimalColumnVector} interface. */ public class ParquetDecimalVector - implements DecimalColumnVector, WritableLongVector, WritableIntVector, WritableBytesVector { + implements DecimalColumnVector, + WritableLongVector, + WritableIntVector, + WritableBytesVector, + ElementCountable { private final ColumnVector vector; + private final int len; - public ParquetDecimalVector(ColumnVector vector) { + public ParquetDecimalVector(ColumnVector vector, int len) { this.vector = vector; + this.len = len; } @Override @@ -225,4 +232,9 @@ public void fill(long value) { ((WritableLongVector) vector).fill(value); } } + + @Override + public int getLen() { + return len; + } } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/RowColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/RowColumnReader.java deleted file mode 100644 index fa2da03ef312..000000000000 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/RowColumnReader.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.format.parquet.reader; - -import org.apache.paimon.data.columnar.heap.HeapRowVector; -import org.apache.paimon.data.columnar.writable.WritableColumnVector; - -import java.io.IOException; -import java.util.List; - -/** Row {@link ColumnReader}. */ -public class RowColumnReader implements ColumnReader { - - private final List fieldReaders; - - public RowColumnReader(List fieldReaders) { - this.fieldReaders = fieldReaders; - } - - @Override - public void readToVector(int readNumber, WritableColumnVector vector) throws IOException { - HeapRowVector rowVector = (HeapRowVector) vector; - WritableColumnVector[] vectors = rowVector.getFields(); - // row vector null array - boolean[] isNulls = new boolean[readNumber]; - for (int i = 0; i < vectors.length; i++) { - fieldReaders.get(i).readToVector(readNumber, vectors[i]); - - for (int j = 0; j < readNumber; j++) { - if (i == 0) { - isNulls[j] = vectors[i].isNullAt(j); - } else { - isNulls[j] = isNulls[j] && vectors[i].isNullAt(j); - } - if (i == vectors.length - 1 && isNulls[j]) { - // rowColumnVector[j] is null only when all fields[j] of rowColumnVector[j] is - // null - rowVector.setNullAt(j); - } - } - } - } -}