Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin committed Dec 20, 2024
1 parent bb9bfbc commit 1a3cebc
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
import java.util.Arrays;

/** Heap vector that nullable shared structure. */
public abstract class AbstractHeapVector extends AbstractWritableVector {
public abstract class AbstractHeapVector extends AbstractWritableVector
implements ElementCountable {

public static final boolean LITTLE_ENDIAN = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN;

Expand Down Expand Up @@ -116,6 +117,7 @@ public HeapIntVector getDictionaryIds() {
return dictionaryIds;
}

@Override
public int getLen() {
return this.len;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.data.columnar.heap;

/** Container with a known number of elements. */
public interface ElementCountable {

int getLen();
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.data.columnar.ColumnarRow;
import org.apache.paimon.data.columnar.ColumnarRowIterator;
import org.apache.paimon.data.columnar.VectorizedColumnBatch;
import org.apache.paimon.data.columnar.heap.ElementCountable;
import org.apache.paimon.data.columnar.writable.WritableColumnVector;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.format.parquet.reader.ColumnReader;
Expand Down Expand Up @@ -294,7 +295,10 @@ private VectorizedColumnBatch createVectorizedColumnBatch(
for (int i = 0; i < writableVectors.length; i++) {
switch (projectedFields[i].type().getTypeRoot()) {
case DECIMAL:
vectors[i] = new ParquetDecimalVector(writableVectors[i]);
vectors[i] =
new ParquetDecimalVector(
writableVectors[i],
((ElementCountable) writableVectors[i]).getLen());
break;
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.data.columnar.ColumnVector;
import org.apache.paimon.data.columnar.heap.AbstractHeapVector;
import org.apache.paimon.data.columnar.heap.ElementCountable;
import org.apache.paimon.data.columnar.heap.HeapArrayVector;
import org.apache.paimon.data.columnar.heap.HeapMapVector;
import org.apache.paimon.data.columnar.heap.HeapRowVector;
Expand All @@ -41,6 +42,7 @@
import org.apache.parquet.column.page.PageReadStore;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -112,11 +114,30 @@ private Pair<LevelDelegation, WritableColumnVector> readRow(
WritableColumnVector[] childrenVectors = heapRowVector.getFields();
WritableColumnVector[] finalChildrenVectors =
new WritableColumnVector[childrenVectors.length];

int len = -1;
boolean[] isNull = null;
boolean hasNull = false;

for (int i = 0; i < children.size(); i++) {
Pair<LevelDelegation, WritableColumnVector> tuple =
readData(children.get(i), readNumber, childrenVectors[i], true);
levelDelegation = tuple.getLeft();
finalChildrenVectors[i] = tuple.getRight();

WritableColumnVector writableColumnVector = tuple.getRight();
if (len == -1) {
len = ((ElementCountable) writableColumnVector).getLen();
isNull = new boolean[len];
Arrays.fill(isNull, true);
}

for (int j = 0; j < len; j++) {
isNull[j] = isNull[j] && writableColumnVector.isNullAt(j);
if (isNull[j]) {
hasNull = true;
}
}
}
if (levelDelegation == null) {
throw new RuntimeException(
Expand All @@ -138,8 +159,8 @@ private Pair<LevelDelegation, WritableColumnVector> readRow(
heapRowVector.setFields(finalChildrenVectors);
}

if (rowPosition.getIsNull() != null) {
setFieldNullFalg(rowPosition.getIsNull(), heapRowVector);
if (hasNull) {
setFieldNullFalg(isNull, heapRowVector);
}
return Pair.of(levelDelegation, heapRowVector);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ private WritableColumnVector fillColumnVector(int total, List valueList) {
phiv.vector[i] = ((List<Integer>) valueList).get(i);
}
}
return new ParquetDecimalVector(phiv);
return new ParquetDecimalVector(phiv, total);
case INT64:
HeapLongVector phlv = new HeapLongVector(total);
for (int i = 0; i < valueList.size(); i++) {
Expand All @@ -499,10 +499,10 @@ private WritableColumnVector fillColumnVector(int total, List valueList) {
phlv.vector[i] = ((List<Long>) valueList).get(i);
}
}
return new ParquetDecimalVector(phlv);
return new ParquetDecimalVector(phlv, total);
default:
HeapBytesVector phbv = getHeapBytesVector(total, valueList);
return new ParquetDecimalVector(phbv);
return new ParquetDecimalVector(phbv, total);
}
default:
throw new RuntimeException("Unsupported type in the list: " + type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.paimon.data.columnar.Dictionary;
import org.apache.paimon.data.columnar.IntColumnVector;
import org.apache.paimon.data.columnar.LongColumnVector;
import org.apache.paimon.data.columnar.heap.ElementCountable;
import org.apache.paimon.data.columnar.writable.WritableBytesVector;
import org.apache.paimon.data.columnar.writable.WritableColumnVector;
import org.apache.paimon.data.columnar.writable.WritableIntVector;
Expand All @@ -38,12 +39,18 @@
* {@link DecimalColumnVector} interface.
*/
public class ParquetDecimalVector
implements DecimalColumnVector, WritableLongVector, WritableIntVector, WritableBytesVector {
implements DecimalColumnVector,
WritableLongVector,
WritableIntVector,
WritableBytesVector,
ElementCountable {

private final ColumnVector vector;
private final int len;

public ParquetDecimalVector(ColumnVector vector) {
public ParquetDecimalVector(ColumnVector vector, int len) {
this.vector = vector;
this.len = len;
}

@Override
Expand Down Expand Up @@ -225,4 +232,9 @@ public void fill(long value) {
((WritableLongVector) vector).fill(value);
}
}

@Override
public int getLen() {
return len;
}
}

0 comments on commit 1a3cebc

Please sign in to comment.