Skip to content

Commit

Permalink
Parquet: Refactor BasePageIterator to add initRepetitionLevelsReader (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
wgtmac authored Mar 15, 2024
1 parent 59d79e7 commit 560b723
Showing 1 changed file with 17 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,21 @@ protected abstract void initDefinitionLevelsReader(
protected abstract void initDefinitionLevelsReader(
DataPageV2 dataPageV2, ColumnDescriptor descriptor) throws IOException;

protected void initRepetitionLevelsReader(
DataPageV1 dataPageV1, ColumnDescriptor descriptor, ByteBufferInputStream in, int count)
throws IOException {
ValuesReader rlReader =
dataPageV1.getRlEncoding().getValuesReader(descriptor, ValuesType.REPETITION_LEVEL);
this.repetitionLevels = new ValuesReaderIntIterator(rlReader);
rlReader.initFromPage(count, in);
}

protected void initRepetitionLevelsReader(DataPageV2 dataPageV2, ColumnDescriptor descriptor)
throws IOException {
this.repetitionLevels =
newRLEIterator(descriptor.getMaxRepetitionLevel(), dataPageV2.getRepetitionLevels());
}

public int currentPageCount() {
return triplesCount;
}
Expand Down Expand Up @@ -112,15 +127,12 @@ public ValuesReader visit(DataPageV2 dataPageV2) {

protected void initFromPage(DataPageV1 initPage) {
this.triplesCount = initPage.getValueCount();
ValuesReader rlReader =
initPage.getRlEncoding().getValuesReader(desc, ValuesType.REPETITION_LEVEL);
this.repetitionLevels = new ValuesReaderIntIterator(rlReader);
try {
BytesInput bytes = initPage.getBytes();
LOG.debug("page size {} bytes and {} records", bytes.size(), triplesCount);
LOG.debug("reading repetition levels at 0");
ByteBufferInputStream in = bytes.toInputStream();
rlReader.initFromPage(triplesCount, in);
initRepetitionLevelsReader(initPage, desc, in, triplesCount);
LOG.debug("reading definition levels at {}", in.position());
initDefinitionLevelsReader(initPage, desc, in, triplesCount);
LOG.debug("reading data at {}", in.position());
Expand All @@ -132,9 +144,8 @@ protected void initFromPage(DataPageV1 initPage) {

protected void initFromPage(DataPageV2 initPage) {
this.triplesCount = initPage.getValueCount();
this.repetitionLevels =
newRLEIterator(desc.getMaxRepetitionLevel(), initPage.getRepetitionLevels());
try {
initRepetitionLevelsReader(initPage, desc);
initDefinitionLevelsReader(initPage, desc);
LOG.debug("page data size {} bytes and {} records", initPage.getData().size(), triplesCount);
initDataReader(initPage.getDataEncoding(), initPage.getData().toInputStream(), triplesCount);
Expand Down

0 comments on commit 560b723

Please sign in to comment.