Skip to content

Commit

Permalink
[core] fix parquet can not read empty row with first column is array.
Browse files Browse the repository at this point in the history
  • Loading branch information
wenchao.wu committed Dec 15, 2024
1 parent d61f3d2 commit fa4cd3a
Show file tree
Hide file tree
Showing 13 changed files with 141 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.data.columnar.writable.AbstractWritableVector;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.utils.Preconditions;

import java.nio.ByteOrder;
import java.util.Arrays;
Expand Down Expand Up @@ -54,6 +55,23 @@ public AbstractHeapVector(int len) {
this.len = len;
}

// This will be called only when inner vectors don't have data.
public AbstractHeapVector(int len, boolean[] isNull) {
Preconditions.checkArgument(
len == isNull.length, "len should be equal to isNull's length.");

for (boolean element : isNull) {
if (!element) {
throw new UnsupportedOperationException(
"This constructor can only be called when the vector is all null.");
}
}

this.len = len;
this.isNull = isNull;
this.noNulls = false;
}

/**
* Resets the column to default state. - fills the isNull array with false. - sets noNulls to
* true.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ public HeapBooleanVector(int len) {
vector = new boolean[len];
}

public HeapBooleanVector(int len, boolean[] isNull) {
super(len, isNull);
}

@Override
public HeapIntVector reserveDictionaryIds(int capacity) {
throw new RuntimeException("HeapBooleanVector has no dictionary.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ public HeapByteVector(int len) {
vector = new byte[len];
}

public HeapByteVector(int len, boolean[] isNull) {
super(len, isNull);
}

@Override
public byte getByte(int i) {
if (dictionary == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ public HeapBytesVector(int size) {
length = new int[size];
}

public HeapBytesVector(int len, boolean[] isNull) {
super(len, isNull);
}

@Override
public void reset() {
super.reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ public HeapDoubleVector(int len) {
vector = new double[len];
}

public HeapDoubleVector(int len, boolean[] isNull) {
super(len, isNull);
}

@Override
public double getDouble(int i) {
if (dictionary == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ public HeapFloatVector(int len) {
vector = new float[len];
}

public HeapFloatVector(int len, boolean[] isNull) {
super(len, isNull);
}

@Override
public float getFloat(int i) {
if (dictionary == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ public HeapIntVector(int len) {
vector = new int[len];
}

public HeapIntVector(int len, boolean[] isNull) {
super(len, isNull);
}

@Override
public int getInt(int i) {
if (dictionary == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ public HeapLongVector(int len) {
vector = new long[len];
}

public HeapLongVector(int len, boolean[] isNull) {
super(len, isNull);
}

@Override
public long getLong(int i) {
if (dictionary == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ public HeapShortVector(int len) {
vector = new short[len];
}

public HeapShortVector(int len, boolean[] isNull) {
super(len, isNull);
}

@Override
public short getShort(int i) {
if (dictionary == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,19 @@ public class HeapTimestampVector extends AbstractHeapVector implements WritableT

private static final long serialVersionUID = 1L;

private final long[] milliseconds;
private final int[] nanoOfMilliseconds;
private long[] milliseconds;
private int[] nanoOfMilliseconds;

public HeapTimestampVector(int len) {
super(len);
this.milliseconds = new long[len];
this.nanoOfMilliseconds = new int[len];
}

public HeapTimestampVector(int len, boolean[] isNull) {
super(len, isNull);
}

@Override
public Timestamp getTimestamp(int i, int precision) {
if (dictionary == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,14 @@ public static CollectionPosition calculateCollectionOffsets(

offsets.add(offset);
valueCount++;
} else {
// else when definitionLevels[i] < collectionDefinitionLevel - 1, it means the
// collection is not defined, no need to increase offset.
nullCollectionFlags.add(true);
nullValuesCount++;
emptyCollectionFlags.add(false);
valueCount++;
}
// else when definitionLevels[i] < collectionDefinitionLevel - 1, it means the
// collection is not defined, just ignore it
}
long[] offsetsArray = offsets.toArray();
long[] length = calculateLengthByOffsets(emptyCollectionFlags.toArray(), offsetsArray);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
Expand Down Expand Up @@ -106,6 +107,8 @@ public class NestedPrimitiveColumnReader implements ColumnReader<WritableColumnV

private boolean isFirstRow = true;

private boolean cutLevel = false;

private final LastValueContainer lastValue = new LastValueContainer();

public NestedPrimitiveColumnReader(
Expand Down Expand Up @@ -168,7 +171,13 @@ public WritableColumnVector readAndNewVector(int readNumber, WritableColumnVecto

int valueIndex = collectDataFromParquetPage(readNumber, valueList);

return fillColumnVector(valueIndex, valueList);
if (!valueList.isEmpty()) {
return fillColumnVector(valueIndex, valueList);
}
if (readNumber < repetitionLevelList.size()) {
cutLevel = true;
}
return fillColumnVectorWithNone(valueIndex);
}

private int collectDataFromParquetPage(int total, List<Object> valueList) throws IOException {
Expand Down Expand Up @@ -199,8 +208,8 @@ private int collectDataFromParquetPage(int total, List<Object> valueList) throws

if (!lastValue.shouldSkip && !needFilterSkip) {
valueList.add(lastValue.value);
valueIndex++;
}
valueIndex++;
} while (readValue() && (repetitionLevel != 0));

if (pageRowId == readState.rowId) {
Expand All @@ -222,6 +231,11 @@ public LevelDelegation getLevelDelegation() {
definitionLevelList.clear();
repetitionLevelList.add(repetitionLevel);
definitionLevelList.add(definitionLevel);
if (cutLevel) {
repetition = Arrays.copyOf(repetition, repetition.length - 1);
definition = Arrays.copyOf(definition, definition.length - 1);
cutLevel = false;
}
return new LevelDelegation(repetition, definition);
}

Expand Down Expand Up @@ -549,6 +563,53 @@ private WritableColumnVector fillColumnVector(int total, List valueList) {
}
}

private WritableColumnVector fillColumnVectorWithNone(int total) {
boolean[] isNull = new boolean[total];
Arrays.fill(isNull, true);
switch (dataType.getTypeRoot()) {
case CHAR:
case VARCHAR:
case BINARY:
case VARBINARY:
return new HeapBytesVector(total, isNull);
case BOOLEAN:
return new HeapBooleanVector(total, isNull);
case TINYINT:
return new HeapByteVector(total, isNull);
case SMALLINT:
return new HeapShortVector(total, isNull);
case INTEGER:
case DATE:
case TIME_WITHOUT_TIME_ZONE:
return new HeapIntVector(total, isNull);
case FLOAT:
return new HeapFloatVector(total, isNull);
case BIGINT:
return new HeapLongVector(total, isNull);
case DOUBLE:
return new HeapDoubleVector(total, isNull);
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return new HeapTimestampVector(total, isNull);
case DECIMAL:
PrimitiveType.PrimitiveTypeName primitiveTypeName =
descriptor.getPrimitiveType().getPrimitiveTypeName();
switch (primitiveTypeName) {
case INT32:
HeapIntVector phiv = new HeapIntVector(total, isNull);
return new ParquetDecimalVector(phiv, total);
case INT64:
HeapLongVector phlv = new HeapLongVector(total, isNull);
return new ParquetDecimalVector(phlv, total);
default:
HeapBytesVector phbv = new HeapBytesVector(total, isNull);
return new ParquetDecimalVector(phbv, total);
}
default:
throw new RuntimeException("Unsupported type in the list: " + type);
}
}

private static HeapBytesVector getHeapBytesVector(int total, List valueList) {
HeapBytesVector phbv = new HeapBytesVector(total);
for (int i = 0; i < valueList.size(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.util.HadoopOutputFile;
import org.apache.parquet.schema.ConversionPatterns;
import org.apache.parquet.schema.GroupType;
Expand Down Expand Up @@ -176,7 +177,11 @@ public class ParquetReadWriteTest {
new ArrayType(true, new IntType())))
.field("c", new IntType())
.build()),
new IntType()));
new IntType()),
RowType.of(
new ArrayType(RowType.of(new VarCharType(255))),
RowType.of(new IntType()),
new VarCharType(255)));

@TempDir public File folder;

Expand Down Expand Up @@ -822,7 +827,8 @@ null, new GenericMap(mp1), new GenericMap(mp2)
}),
i)
}),
i)));
i),
null));
}
return rows;
}
Expand All @@ -834,11 +840,14 @@ private Path createNestedDataByOriginWriter(int rowNum, File tmpDir, int rowGrou
MessageType schema =
ParquetSchemaConverter.convertToParquetMessageType(
"paimon-parquet", NESTED_ARRAY_MAP_TYPE);
String[] candidates = new String[] {"snappy", "zstd", "gzip"};
String compress = candidates[new Random().nextInt(3)];
try (ParquetWriter<Group> writer =
ExampleParquetWriter.builder(
HadoopOutputFile.fromPath(
new org.apache.hadoop.fs.Path(path.toString()), conf))
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withCompressionCodec(CompressionCodecName.fromConf(compress))
.withConf(new Configuration())
.withType(schema)
.build()) {
Expand Down Expand Up @@ -1011,6 +1020,10 @@ private void compareNestedRow(
origin.getRow(5, 2).getArray(0).getRow(0, 2).getInt(1),
result.getRow(5, 2).getArray(0).getRow(0, 2).getInt(1));
Assertions.assertEquals(origin.getRow(5, 2).getInt(1), result.getRow(5, 2).getInt(1));
Assertions.assertTrue(result.isNullAt(6));
Assertions.assertTrue(result.getRow(6, 2).isNullAt(0));
Assertions.assertTrue(result.getRow(6, 2).isNullAt(1));
Assertions.assertTrue(result.getRow(6, 2).isNullAt(2));
}
assertThat(iterator.hasNext()).isFalse();
iterator.close();
Expand Down

0 comments on commit fa4cd3a

Please sign in to comment.