Skip to content

Commit

Permalink
[WIP][parquet] Child vector of complex type should arrange elements c…
Browse files Browse the repository at this point in the history
…ompactly (like Orc)
  • Loading branch information
yuzelin committed Aug 2, 2024
1 parent 68df379 commit fb8a8ff
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public class NestedPrimitiveColumnReader implements ColumnReader<WritableColumnV

private boolean isFirstRow = true;

private Object lastValue;
private LastValueContainer lastValue = new LastValueContainer();

public NestedPrimitiveColumnReader(
ColumnDescriptor descriptor,
Expand Down Expand Up @@ -168,10 +168,8 @@ public WritableColumnVector readAndNewVector(int readNumber, WritableColumnVecto
// repeated type need two loops to read data.
while (!eof && index < readNumber) {
do {
// Children ColumnVector for OrcArrayColumnVector and OrcMapColumnVector don't set
// null value. To stay consistent, skip null value here.
if (lastValue != null) {
valueList.add(lastValue);
if (!lastValue.shouldSkip) {
valueList.add(lastValue.value);
valueIndex++;
}
} while (readValue() && (repetitionLevel != 0));
Expand All @@ -191,6 +189,23 @@ public LevelDelegation getLevelDelegation() {
return new LevelDelegation(repetition, definition);
}

/**
* Example: {[[0, null], [1], [], null], [], null} => [5, 4, 5, 3, 2, 1, 0]
*
* <ul>
* <li>definitionLevel == maxDefLevel => not null value
* <li>definitionLevel == maxDefLevel - 1 => null value
* <li>definitionLevel == maxDefLevel - 2 => empty set, skip
* <li>definitionLevel == maxDefLevel - 3 => null set, skip
* <li>definitionLevel == maxDefLevel - 4 => empty outer set, skip
* <li>definitionLevel == maxDefLevel - 5 => null outer set, skip
* <li>... skip
* </ul>
*
* When (definitionLevel <= maxDefLevel - 2) we skip the value because children ColumnVector for
* OrcArrayColumnVector and OrcMapColumnVector don't contain empty and null set value. Stay
* consistent here.
*/
private boolean readValue() throws IOException {
int left = readPageIfNeed();
if (left > 0) {
Expand All @@ -200,12 +215,14 @@ private boolean readValue() throws IOException {
if (definitionLevel == maxDefLevel) {
if (isCurrentPageDictionaryEncoded) {
int dictionaryId = dataColumn.readValueDictionaryId();
lastValue = dictionaryDecodeValue(dataType, dictionaryId);
lastValue.setValue(dictionaryDecodeValue(dataType, dictionaryId));
} else {
lastValue = readPrimitiveTypedRow(dataType);
lastValue.setValue(readPrimitiveTypedRow(dataType));
}
} else if (definitionLevel == maxDefLevel - 1) {
lastValue.setValue(null);
} else {
lastValue = null;
lastValue.skip();
}
return true;
} else {
Expand Down Expand Up @@ -353,59 +370,96 @@ private WritableColumnVector fillColumnVector(int total, List valueList) {
HeapBytesVector heapBytesVector = new HeapBytesVector(total);
for (int i = 0; i < valueList.size(); i++) {
byte[] src = ((List<byte[]>) valueList).get(i);
heapBytesVector.appendBytes(i, src, 0, src.length);
if (src == null) {
heapBytesVector.setNullAt(i);
} else {
heapBytesVector.appendBytes(i, src, 0, src.length);
}
}
return heapBytesVector;
case BOOLEAN:
HeapBooleanVector heapBooleanVector = new HeapBooleanVector(total);
for (int i = 0; i < valueList.size(); i++) {
heapBooleanVector.vector[i] = ((List<Boolean>) valueList).get(i);
if (valueList.get(i) == null) {
heapBooleanVector.setNullAt(i);
} else {
heapBooleanVector.vector[i] = ((List<Boolean>) valueList).get(i);
}
}
return heapBooleanVector;
case TINYINT:
HeapByteVector heapByteVector = new HeapByteVector(total);
for (int i = 0; i < valueList.size(); i++) {
heapByteVector.vector[i] = (byte) ((List<Integer>) valueList).get(i).intValue();
if (valueList.get(i) == null) {
heapByteVector.setNullAt(i);
} else {
heapByteVector.vector[i] =
(byte) ((List<Integer>) valueList).get(i).intValue();
}
}
return heapByteVector;
case SMALLINT:
HeapShortVector heapShortVector = new HeapShortVector(total);
for (int i = 0; i < valueList.size(); i++) {
heapShortVector.vector[i] =
(short) ((List<Integer>) valueList).get(i).intValue();
if (valueList.get(i) == null) {
heapShortVector.setNullAt(i);
} else {
heapShortVector.vector[i] =
(short) ((List<Integer>) valueList).get(i).intValue();
}
}
return heapShortVector;
case INTEGER:
case DATE:
case TIME_WITHOUT_TIME_ZONE:
HeapIntVector heapIntVector = new HeapIntVector(total);
for (int i = 0; i < valueList.size(); i++) {
heapIntVector.vector[i] = ((List<Integer>) valueList).get(i);
if (valueList.get(i) == null) {
heapIntVector.setNullAt(i);
} else {
heapIntVector.vector[i] = ((List<Integer>) valueList).get(i);
}
}
return heapIntVector;
case FLOAT:
HeapFloatVector heapFloatVector = new HeapFloatVector(total);
for (int i = 0; i < valueList.size(); i++) {
heapFloatVector.vector[i] = ((List<Float>) valueList).get(i);
if (valueList.get(i) == null) {
heapFloatVector.setNullAt(i);
} else {
heapFloatVector.vector[i] = ((List<Float>) valueList).get(i);
}
}
return heapFloatVector;
case BIGINT:
HeapLongVector heapLongVector = new HeapLongVector(total);
for (int i = 0; i < valueList.size(); i++) {
heapLongVector.vector[i] = ((List<Long>) valueList).get(i);
if (valueList.get(i) == null) {
heapLongVector.setNullAt(i);
} else {
heapLongVector.vector[i] = ((List<Long>) valueList).get(i);
}
}
return heapLongVector;
case DOUBLE:
HeapDoubleVector heapDoubleVector = new HeapDoubleVector(total);
for (int i = 0; i < valueList.size(); i++) {
heapDoubleVector.vector[i] = ((List<Double>) valueList).get(i);
if (valueList.get(i) == null) {
heapDoubleVector.setNullAt(i);
} else {
heapDoubleVector.vector[i] = ((List<Double>) valueList).get(i);
}
}
return heapDoubleVector;
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
HeapTimestampVector heapTimestampVector = new HeapTimestampVector(total);
for (int i = 0; i < valueList.size(); i++) {
heapTimestampVector.setTimestamp(i, ((List<Timestamp>) valueList).get(i));
if (valueList.get(i) == null) {
heapTimestampVector.setNullAt(i);
} else {
heapTimestampVector.setTimestamp(i, ((List<Timestamp>) valueList).get(i));
}
}
return heapTimestampVector;
case DECIMAL:
Expand All @@ -415,13 +469,21 @@ private WritableColumnVector fillColumnVector(int total, List valueList) {
case INT32:
HeapIntVector phiv = new HeapIntVector(total);
for (int i = 0; i < valueList.size(); i++) {
phiv.vector[i] = ((List<Integer>) valueList).get(i);
if (valueList.get(i) == null) {
phiv.setNullAt(i);
} else {
phiv.vector[i] = ((List<Integer>) valueList).get(i);
}
}
return new ParquetDecimalVector(phiv);
case INT64:
HeapLongVector phlv = new HeapLongVector(total);
for (int i = 0; i < valueList.size(); i++) {
phlv.vector[i] = ((List<Long>) valueList).get(i);
if (valueList.get(i) == null) {
phlv.setNullAt(i);
} else {
phlv.vector[i] = ((List<Long>) valueList).get(i);
}
}
return new ParquetDecimalVector(phlv);
default:
Expand Down Expand Up @@ -600,4 +662,18 @@ public int nextInt() {
return 0;
}
}

private static class LastValueContainer {
protected boolean shouldSkip;
protected Object value;

protected void setValue(Object value) {
this.value = value;
this.shouldSkip = false;
}

protected void skip() {
this.shouldSkip = true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public void testArrayString() throws IOException {

// TODO check PER-ROW

VectorizedColumnBatch batch = createVectorizedColumnBatch(nestedArrayType, rows);
VectorizedColumnBatch batch = createVectorizedColumnBatch(nestedArrayType, rows).batch();
ArrayColumnVector arrayColumnVector = (ArrayColumnVector) batch.columns[0];

// check nullability
Expand Down Expand Up @@ -181,7 +181,7 @@ public void testArrayArrayString() throws IOException {
// TODO check per-row

// validate column vector
VectorizedColumnBatch batch = createVectorizedColumnBatch(nestedArrayType, rows);
VectorizedColumnBatch batch = createVectorizedColumnBatch(nestedArrayType, rows).batch();
ArrayColumnVector arrayColumnVector = (ArrayColumnVector) batch.columns[0];

expectedData.validateOuterArray(
Expand All @@ -198,7 +198,45 @@ public void testArrayArrayString() throws IOException {
(cv, i) -> new String(((BytesColumnVector) cv).getBytes(i).getBytes()));
}

private VectorizedColumnBatch createVectorizedColumnBatch(
@Test
public void testArrayArrayInt() throws IOException {
RowType nestedArrayType =
RowType.builder()
.field("array_array_int", DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())))
.build();

VectorizedRecordIterator iterator = createVectorizedColumnBatch(nestedArrayType, gen());

VectorizedColumnBatch batch = iterator.batch();
ArrayColumnVector arrayColumnVector = (ArrayColumnVector) batch.columns[0];

// TODO check per-row
for (int i = 0; i < 10; i++) {
InternalRow row = iterator.next();
InternalArray array = row.getArray(0);
System.out.println();
}
}

private List<InternalRow> gen() {
List<InternalRow> rows = new ArrayList<>(1);
for (int i = 0; i < 1; i++) {
rows.add(
GenericRow.of(
new GenericArray(
new Object[] {
new GenericArray(new Object[] {0, null}),
new GenericArray(new Object[] {1}),
new GenericArray(new Object[] {}),
null
})));
}
rows.add(GenericRow.of(new GenericArray(new Object[] {})));
rows.add(GenericRow.of((Object) null));
return rows;
}

private VectorizedRecordIterator createVectorizedColumnBatch(
RowType rowType, List<InternalRow> rows) throws IOException {
Path path = new Path(tempDir.toString(), UUID.randomUUID().toString());
LocalFileIO fileIO = LocalFileIO.create();
Expand All @@ -220,7 +258,7 @@ private VectorizedColumnBatch createVectorizedColumnBatch(
new FormatReaderContext(fileIO, path, fileIO.getFileSize(path)));

RecordReader.RecordIterator<InternalRow> iterator = reader.readBatch();
return ((VectorizedRecordIterator) iterator).batch();
return (VectorizedRecordIterator) iterator;
}

/** Store generated data of ARRAY[STRING] and provide validated methods. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ void testReadRowPositionWithRandomFilter() throws IOException {
@ParameterizedTest
@CsvSource({"10, paimon", "1000, paimon", "10, origin", "1000, origin"})
public void testNestedRead(int rowGroupSize, String writerType) throws Exception {
List<InternalRow> rows = prepareNestedData(1283);
List<InternalRow> rows = prepareNestedData(10);
Path path;
if ("paimon".equals(writerType)) {
path = createTempParquetFileByPaimon(folder, rows, rowGroupSize, NESTED_ARRAY_MAP_TYPE);
Expand Down

0 comments on commit fb8a8ff

Please sign in to comment.