Skip to content

Commit

Permalink
[WIP][parquet] Child vector of complex type should arrange elements c…
Browse files Browse the repository at this point in the history
…ompactly (like orc)
  • Loading branch information
yuzelin committed Aug 2, 2024
1 parent 39ca57d commit 3c15586
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.paimon.utils.BooleanArrayList;
import org.apache.paimon.utils.LongArrayList;

import java.util.Arrays;

import static java.lang.String.format;

/** Utils to calculate nested type position. */
Expand Down Expand Up @@ -105,12 +107,21 @@ public static CollectionPosition calculateCollectionOffsets(
for (int i = 0;
i < definitionLevels.length;
i = getNextCollectionStartIndex(repetitionLevels, collectionRepetitionLevel, i)) {
valueCount++;
if (definitionLevels[i] >= collectionDefinitionLevel - 1) {
boolean isNull =
isOptionalFieldValueNull(definitionLevels[i], collectionDefinitionLevel);
nullCollectionFlags.add(isNull);
nullValuesCount += isNull ? 1 : 0;
if (isNull) {
nullCollectionFlags.add(true);
nullValuesCount++;
// 1. don't increase offset for null values
// 2. offsets and emptyCollectionFlags are meaningless for null values, but they
// must be set at each index for calculating lengths later
offsets.add(offset);
emptyCollectionFlags.add(false);
continue;
}

nullCollectionFlags.add(false);
// definitionLevels[i] > collectionDefinitionLevel => Collection is defined and not
// empty
// definitionLevels[i] == collectionDefinitionLevel => Collection is defined but
Expand All @@ -119,22 +130,22 @@ public static CollectionPosition calculateCollectionOffsets(
emptyCollectionFlags.add(false);
offset += getCollectionSize(repetitionLevels, collectionRepetitionLevel, i + 1);
} else if (definitionLevels[i] == collectionDefinitionLevel) {
offset++;
// don't increase offset for empty values
emptyCollectionFlags.add(true);
} else {
offset++;
emptyCollectionFlags.add(false);
throw new IllegalStateException(
String.format(
"This case should be handled as null value. "
+ "index: %d, definitionLevels: %s, collectionDefinitionLevel: %s.",
i,
Arrays.toString(definitionLevels),
collectionDefinitionLevel));
}
offsets.add(offset);
} else {
// when definitionLevels[i] < collectionDefinitionLevel - 1, it means the collection
// is
// not defined, but we need to regard it as null to avoid getting value wrong.
nullCollectionFlags.add(true);
nullValuesCount++;
offsets.add(++offset);
emptyCollectionFlags.add(false);
valueCount++;
}
// else when definitionLevels[i] < collectionDefinitionLevel - 1, it means the
// collection is not defined, just ignore it
}
long[] offsetsArray = offsets.toArray();
long[] length = calculateLengthByOffsets(emptyCollectionFlags.toArray(), offsetsArray);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public class NestedPrimitiveColumnReader implements ColumnReader<WritableColumnV

private boolean isFirstRow = true;

private Object lastValue;
private LastValueContainer lastValue = new LastValueContainer();

public NestedPrimitiveColumnReader(
ColumnDescriptor descriptor,
Expand Down Expand Up @@ -168,8 +168,10 @@ public WritableColumnVector readAndNewVector(int readNumber, WritableColumnVecto
// repeated type need two loops to read data.
while (!eof && index < readNumber) {
do {
valueList.add(lastValue);
valueIndex++;
if (!lastValue.shouldSkip) {
valueList.add(lastValue.value);
valueIndex++;
}
} while (readValue() && (repetitionLevel != 0));
index++;
}
Expand All @@ -187,6 +189,23 @@ public LevelDelegation getLevelDelegation() {
return new LevelDelegation(repetition, definition);
}

/**
* Example: {[[0, null], [1], [], null], [], null} => [5, 4, 5, 3, 2, 1, 0]
*
* <ul>
* <li>definitionLevel == maxDefLevel => not null value
* <li>definitionLevel == maxDefLevel - 1 => null value
* <li>definitionLevel == maxDefLevel - 2 => empty set, skip
* <li>definitionLevel == maxDefLevel - 3 => null set, skip
* <li>definitionLevel == maxDefLevel - 4 => empty outer set, skip
* <li>definitionLevel == maxDefLevel - 5 => null outer set, skip
* <li>... skip
* </ul>
*
* <p>When (definitionLevel <= maxDefLevel - 2) we skip the value because children ColumnVector
* for OrcArrayColumnVector and OrcMapColumnVector don't contain empty and null set value. Stay
* consistent here.
*/
private boolean readValue() throws IOException {
int left = readPageIfNeed();
if (left > 0) {
Expand All @@ -196,12 +215,14 @@ private boolean readValue() throws IOException {
if (definitionLevel == maxDefLevel) {
if (isCurrentPageDictionaryEncoded) {
int dictionaryId = dataColumn.readValueDictionaryId();
lastValue = dictionaryDecodeValue(dataType, dictionaryId);
lastValue.setValue(dictionaryDecodeValue(dataType, dictionaryId));
} else {
lastValue = readPrimitiveTypedRow(dataType);
lastValue.setValue(readPrimitiveTypedRow(dataType));
}
} else if (definitionLevel == maxDefLevel - 1) {
lastValue.setValue(null);
} else {
lastValue = null;
lastValue.skip();
}
return true;
} else {
Expand Down Expand Up @@ -641,4 +662,18 @@ public int nextInt() {
return 0;
}
}

private static class LastValueContainer {
protected boolean shouldSkip;
protected Object value;

protected void setValue(Object value) {
this.value = value;
this.shouldSkip = false;
}

protected void skip() {
this.shouldSkip = true;
}
}
}

0 comments on commit 3c15586

Please sign in to comment.