Skip to content

Commit

Permalink
temp
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin committed Aug 2, 2024
1 parent 8db12ef commit 68df379
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.paimon.utils.BooleanArrayList;
import org.apache.paimon.utils.LongArrayList;

import java.util.Arrays;

import static java.lang.String.format;

/** Utils to calculate nested type position. */
Expand Down Expand Up @@ -109,8 +111,18 @@ public static CollectionPosition calculateCollectionOffsets(
if (definitionLevels[i] >= collectionDefinitionLevel - 1) {
boolean isNull =
isOptionalFieldValueNull(definitionLevels[i], collectionDefinitionLevel);
nullCollectionFlags.add(isNull);
nullValuesCount += isNull ? 1 : 0;
if (isNull) {
nullCollectionFlags.add(true);
nullValuesCount++;
// 1. don't increase offset for null values
// 2. offsets and emptyCollectionFlags are meaningless for null values, but they
// must be set at each index for calculating lengths later
offsets.add(offset);
emptyCollectionFlags.add(false);
continue;
}

nullCollectionFlags.add(false);
// definitionLevels[i] > collectionDefinitionLevel => Collection is defined and not
// empty
// definitionLevels[i] == collectionDefinitionLevel => Collection is defined but
Expand All @@ -119,21 +131,22 @@ public static CollectionPosition calculateCollectionOffsets(
emptyCollectionFlags.add(false);
offset += getCollectionSize(repetitionLevels, collectionRepetitionLevel, i + 1);
} else if (definitionLevels[i] == collectionDefinitionLevel) {
offset++;
// don't increase offset for empty values
emptyCollectionFlags.add(true);
} else {
offset++;
emptyCollectionFlags.add(false);
throw new IllegalStateException(
String.format(
"This case should be handled as null value. "
+ "index: %d, definitionLevels: %s, collectionDefinitionLevel: %s.",
i,
Arrays.toString(definitionLevels),
collectionDefinitionLevel));
}
offsets.add(offset);
} else {
// when definitionLevels[i] < collectionDefinitionLevel - 1, it means the collection
// is
// not defined, but we need to regard it as null to avoid getting value wrong.
nullCollectionFlags.add(true);
nullValuesCount++;
offsets.add(++offset);
emptyCollectionFlags.add(false);
// is not defined, just ignore it
valueCount--;
}
}
long[] offsetsArray = offsets.toArray();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,12 @@ public WritableColumnVector readAndNewVector(int readNumber, WritableColumnVecto
// repeated type need two loops to read data.
while (!eof && index < readNumber) {
do {
valueList.add(lastValue);
valueIndex++;
// Children ColumnVector for OrcArrayColumnVector and OrcMapColumnVector don't set
// null value. To stay consistent, skip null value here.
if (lastValue != null) {
valueList.add(lastValue);
valueIndex++;
}
} while (readValue() && (repetitionLevel != 0));
index++;
}
Expand Down Expand Up @@ -349,96 +353,59 @@ private WritableColumnVector fillColumnVector(int total, List valueList) {
HeapBytesVector heapBytesVector = new HeapBytesVector(total);
for (int i = 0; i < valueList.size(); i++) {
byte[] src = ((List<byte[]>) valueList).get(i);
if (src == null) {
heapBytesVector.setNullAt(i);
} else {
heapBytesVector.appendBytes(i, src, 0, src.length);
}
heapBytesVector.appendBytes(i, src, 0, src.length);
}
return heapBytesVector;
case BOOLEAN:
HeapBooleanVector heapBooleanVector = new HeapBooleanVector(total);
for (int i = 0; i < valueList.size(); i++) {
if (valueList.get(i) == null) {
heapBooleanVector.setNullAt(i);
} else {
heapBooleanVector.vector[i] = ((List<Boolean>) valueList).get(i);
}
heapBooleanVector.vector[i] = ((List<Boolean>) valueList).get(i);
}
return heapBooleanVector;
case TINYINT:
HeapByteVector heapByteVector = new HeapByteVector(total);
for (int i = 0; i < valueList.size(); i++) {
if (valueList.get(i) == null) {
heapByteVector.setNullAt(i);
} else {
heapByteVector.vector[i] =
(byte) ((List<Integer>) valueList).get(i).intValue();
}
heapByteVector.vector[i] = (byte) ((List<Integer>) valueList).get(i).intValue();
}
return heapByteVector;
case SMALLINT:
HeapShortVector heapShortVector = new HeapShortVector(total);
for (int i = 0; i < valueList.size(); i++) {
if (valueList.get(i) == null) {
heapShortVector.setNullAt(i);
} else {
heapShortVector.vector[i] =
(short) ((List<Integer>) valueList).get(i).intValue();
}
heapShortVector.vector[i] =
(short) ((List<Integer>) valueList).get(i).intValue();
}
return heapShortVector;
case INTEGER:
case DATE:
case TIME_WITHOUT_TIME_ZONE:
HeapIntVector heapIntVector = new HeapIntVector(total);
for (int i = 0; i < valueList.size(); i++) {
if (valueList.get(i) == null) {
heapIntVector.setNullAt(i);
} else {
heapIntVector.vector[i] = ((List<Integer>) valueList).get(i);
}
heapIntVector.vector[i] = ((List<Integer>) valueList).get(i);
}
return heapIntVector;
case FLOAT:
HeapFloatVector heapFloatVector = new HeapFloatVector(total);
for (int i = 0; i < valueList.size(); i++) {
if (valueList.get(i) == null) {
heapFloatVector.setNullAt(i);
} else {
heapFloatVector.vector[i] = ((List<Float>) valueList).get(i);
}
heapFloatVector.vector[i] = ((List<Float>) valueList).get(i);
}
return heapFloatVector;
case BIGINT:
HeapLongVector heapLongVector = new HeapLongVector(total);
for (int i = 0; i < valueList.size(); i++) {
if (valueList.get(i) == null) {
heapLongVector.setNullAt(i);
} else {
heapLongVector.vector[i] = ((List<Long>) valueList).get(i);
}
heapLongVector.vector[i] = ((List<Long>) valueList).get(i);
}
return heapLongVector;
case DOUBLE:
HeapDoubleVector heapDoubleVector = new HeapDoubleVector(total);
for (int i = 0; i < valueList.size(); i++) {
if (valueList.get(i) == null) {
heapDoubleVector.setNullAt(i);
} else {
heapDoubleVector.vector[i] = ((List<Double>) valueList).get(i);
}
heapDoubleVector.vector[i] = ((List<Double>) valueList).get(i);
}
return heapDoubleVector;
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
HeapTimestampVector heapTimestampVector = new HeapTimestampVector(total);
for (int i = 0; i < valueList.size(); i++) {
if (valueList.get(i) == null) {
heapTimestampVector.setNullAt(i);
} else {
heapTimestampVector.setTimestamp(i, ((List<Timestamp>) valueList).get(i));
}
heapTimestampVector.setTimestamp(i, ((List<Timestamp>) valueList).get(i));
}
return heapTimestampVector;
case DECIMAL:
Expand All @@ -448,21 +415,13 @@ private WritableColumnVector fillColumnVector(int total, List valueList) {
case INT32:
HeapIntVector phiv = new HeapIntVector(total);
for (int i = 0; i < valueList.size(); i++) {
if (valueList.get(i) == null) {
phiv.setNullAt(i);
} else {
phiv.vector[i] = ((List<Integer>) valueList).get(i);
}
phiv.vector[i] = ((List<Integer>) valueList).get(i);
}
return new ParquetDecimalVector(phiv);
case INT64:
HeapLongVector phlv = new HeapLongVector(total);
for (int i = 0; i < valueList.size(); i++) {
if (valueList.get(i) == null) {
phlv.setNullAt(i);
} else {
phlv.vector[i] = ((List<Long>) valueList).get(i);
}
phlv.vector[i] = ((List<Long>) valueList).get(i);
}
return new ParquetDecimalVector(phlv);
default:
Expand Down
Loading

0 comments on commit 68df379

Please sign in to comment.