From 1a3cebcc6850fd955a7d7ffe3fd9b71cb4da3eb1 Mon Sep 17 00:00:00 2001 From: yuzelin Date: Sat, 21 Dec 2024 00:43:49 +0800 Subject: [PATCH] fix --- .../columnar/heap/AbstractHeapVector.java | 4 ++- .../data/columnar/heap/ElementCountable.java | 25 +++++++++++++++++++ .../format/parquet/ParquetReaderFactory.java | 6 ++++- .../parquet/reader/NestedColumnReader.java | 25 +++++++++++++++++-- .../reader/NestedPrimitiveColumnReader.java | 6 ++--- .../parquet/reader/ParquetDecimalVector.java | 16 ++++++++++-- 6 files changed, 73 insertions(+), 9 deletions(-) create mode 100644 paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/ElementCountable.java diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/AbstractHeapVector.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/AbstractHeapVector.java index 702877642327..f0e82eac4fb1 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/AbstractHeapVector.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/AbstractHeapVector.java @@ -25,7 +25,8 @@ import java.util.Arrays; /** Heap vector that nullable shared structure. */ -public abstract class AbstractHeapVector extends AbstractWritableVector { +public abstract class AbstractHeapVector extends AbstractWritableVector + implements ElementCountable { public static final boolean LITTLE_ENDIAN = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN; @@ -116,6 +117,7 @@ public HeapIntVector getDictionaryIds() { return dictionaryIds; } + @Override public int getLen() { return this.len; } diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/ElementCountable.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/ElementCountable.java new file mode 100644 index 000000000000..a32762d659fd --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/ElementCountable.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.data.columnar.heap; + +/** Container with a known number of elements. */ +public interface ElementCountable { + + int getLen(); +} diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java index 3f0ae426f528..6f8cab2202d6 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java @@ -23,6 +23,7 @@ import org.apache.paimon.data.columnar.ColumnarRow; import org.apache.paimon.data.columnar.ColumnarRowIterator; import org.apache.paimon.data.columnar.VectorizedColumnBatch; +import org.apache.paimon.data.columnar.heap.ElementCountable; import org.apache.paimon.data.columnar.writable.WritableColumnVector; import org.apache.paimon.format.FormatReaderFactory; import org.apache.paimon.format.parquet.reader.ColumnReader; @@ -294,7 +295,10 @@ private VectorizedColumnBatch createVectorizedColumnBatch( for (int i = 0; i < writableVectors.length; i++) { switch (projectedFields[i].type().getTypeRoot()) { case DECIMAL: - vectors[i] = new ParquetDecimalVector(writableVectors[i]); + vectors[i] = + new ParquetDecimalVector( + writableVectors[i], + ((ElementCountable) writableVectors[i]).getLen()); break; case TIMESTAMP_WITHOUT_TIME_ZONE: case TIMESTAMP_WITH_LOCAL_TIME_ZONE: diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java index 82d5543b13ea..3724014e6287 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java @@ -20,6 +20,7 @@ import org.apache.paimon.data.columnar.ColumnVector; import org.apache.paimon.data.columnar.heap.AbstractHeapVector; +import org.apache.paimon.data.columnar.heap.ElementCountable; import org.apache.paimon.data.columnar.heap.HeapArrayVector; import org.apache.paimon.data.columnar.heap.HeapMapVector; import org.apache.paimon.data.columnar.heap.HeapRowVector; @@ -41,6 +42,7 @@ import org.apache.parquet.column.page.PageReadStore; import java.io.IOException; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -112,11 +114,30 @@ private Pair readRow( WritableColumnVector[] childrenVectors = heapRowVector.getFields(); WritableColumnVector[] finalChildrenVectors = new WritableColumnVector[childrenVectors.length]; + + int len = -1; + boolean[] isNull = null; + boolean hasNull = false; + for (int i = 0; i < children.size(); i++) { Pair tuple = readData(children.get(i), readNumber, childrenVectors[i], true); levelDelegation = tuple.getLeft(); finalChildrenVectors[i] = tuple.getRight(); + + WritableColumnVector writableColumnVector = tuple.getRight(); + if (len == -1) { + len = ((ElementCountable) writableColumnVector).getLen(); + isNull = new boolean[len]; + Arrays.fill(isNull, true); + } + + for (int j = 0; j < len; j++) { + isNull[j] = isNull[j] && writableColumnVector.isNullAt(j); + if (isNull[j]) { + hasNull = true; + } + } } if (levelDelegation == null) { throw new RuntimeException( @@ -138,8 +159,8 @@ private Pair readRow( heapRowVector.setFields(finalChildrenVectors); } - if (rowPosition.getIsNull() != null) { - setFieldNullFalg(rowPosition.getIsNull(), heapRowVector); + if (hasNull) { + setFieldNullFalg(isNull, heapRowVector); } return Pair.of(levelDelegation, heapRowVector); } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java index 6d9b4b1e374d..69b0fa574418 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java @@ -489,7 +489,7 @@ private WritableColumnVector fillColumnVector(int total, List valueList) { phiv.vector[i] = ((List) valueList).get(i); } } - return new ParquetDecimalVector(phiv); + return new ParquetDecimalVector(phiv, total); case INT64: HeapLongVector phlv = new HeapLongVector(total); for (int i = 0; i < valueList.size(); i++) { @@ -499,10 +499,10 @@ private WritableColumnVector fillColumnVector(int total, List valueList) { phlv.vector[i] = ((List) valueList).get(i); } } - return new ParquetDecimalVector(phlv); + return new ParquetDecimalVector(phlv, total); default: HeapBytesVector phbv = getHeapBytesVector(total, valueList); - return new ParquetDecimalVector(phbv); + return new ParquetDecimalVector(phbv, total); } default: throw new RuntimeException("Unsupported type in the list: " + type); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDecimalVector.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDecimalVector.java index 28d308bac61f..42714ab066da 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDecimalVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDecimalVector.java @@ -25,6 +25,7 @@ import org.apache.paimon.data.columnar.Dictionary; import org.apache.paimon.data.columnar.IntColumnVector; import org.apache.paimon.data.columnar.LongColumnVector; +import org.apache.paimon.data.columnar.heap.ElementCountable; import org.apache.paimon.data.columnar.writable.WritableBytesVector; import org.apache.paimon.data.columnar.writable.WritableColumnVector; import org.apache.paimon.data.columnar.writable.WritableIntVector; @@ -38,12 +39,18 @@ * {@link DecimalColumnVector} interface. */ public class ParquetDecimalVector - implements DecimalColumnVector, WritableLongVector, WritableIntVector, WritableBytesVector { + implements DecimalColumnVector, + WritableLongVector, + WritableIntVector, + WritableBytesVector, + ElementCountable { private final ColumnVector vector; + private final int len; - public ParquetDecimalVector(ColumnVector vector) { + public ParquetDecimalVector(ColumnVector vector, int len) { this.vector = vector; + this.len = len; } @Override @@ -225,4 +232,9 @@ public void fill(long value) { ((WritableLongVector) vector).fill(value); } } + + @Override + public int getLen() { + return len; + } }