Skip to content

Commit

Permalink
add
Browse files Browse the repository at this point in the history
  • Loading branch information
Aiden-Dong committed Nov 29, 2024
1 parent 40e21ac commit 8513f09
Show file tree
Hide file tree
Showing 17 changed files with 301 additions and 270 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public FileRecordReader<InternalRow> createReader(FormatReaderFactory.Context co
buildFieldsList(projectedType.getFields(), projectedType.getFieldNames(), columnIO);

return new ParquetReader(
reader, requestedSchema, reader.getRecordCount(), poolOfBatches, fields);
reader, requestedSchema, reader.getFilteredRecordCount(), poolOfBatches, fields);
}

private void setReadOptions(ParquetReadOptions.Builder builder) {
Expand Down Expand Up @@ -406,7 +406,7 @@ private boolean nextBatch(ParquetReaderBatch batch) throws IOException {
}

private void readNextRowGroup() throws IOException {
PageReadStore rowGroup = reader.readNextRowGroup();
PageReadStore rowGroup = reader.readNextFilteredRowGroup();
if (rowGroup == null) {
throw new IOException(
"expecting more rows but reached last block. Read "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@
import org.apache.parquet.column.Dictionary;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.impl.ColumnReaderImpl;
import org.apache.parquet.column.page.*;
import org.apache.parquet.column.page.DataPage;
import org.apache.parquet.column.page.DataPageV1;
import org.apache.parquet.column.page.DataPageV2;
import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.column.page.PageReader;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.schema.PrimitiveType;
Expand Down Expand Up @@ -65,9 +70,11 @@ public abstract class AbstractColumnReader<VECTOR extends WritableColumnVector>
private boolean isCurrentPageDictionaryEncoded;

/** Total values in the current page. */
// private int pageValueCount;
// private int pageValueCount;

/** Helper struct to track intermediate states while reading Parquet pages in the column chunk.*/
/**
* Helper struct to track intermediate states while reading Parquet pages in the column chunk.
*/
private final ParquetReadState readState;

/*
Expand Down Expand Up @@ -147,13 +154,14 @@ public final void readToVector(int readNumber, VECTOR vector) throws IOException
if (readState.valuesToReadInPage == 0) {
int pageValueCount = readPage();
if (pageValueCount < 0) {
// we've read all the pages; this could happen when we're reading a repeated list and we
// we've read all the pages; this could happen when we're reading a repeated
// list and we
// don't know where the list will end until we've seen all the pages.
break;
}
}

if (readState.isFinished()){
if (readState.isFinished()) {
break;
}

Expand All @@ -166,38 +174,45 @@ public final void readToVector(int readNumber, VECTOR vector) throws IOException
long rangeStart = readState.currentRangeStart();
long rangeEnd = readState.currentRangeEnd();

if (pageRowId < rangeStart){
int toSkip = (int)(rangeStart - pageRowId);
if (toSkip >= leftInPage) { // drop page
if (pageRowId < rangeStart) {
int toSkip = (int) (rangeStart - pageRowId);
if (toSkip >= leftInPage) { // drop page
pageRowId += leftInPage;
leftInPage = 0;
}else {
} else {
if (isCurrentPageDictionaryEncoded) {
runLenDecoder.skipDictionaryIds(toSkip, maxDefLevel, this.dictionaryIdsDecoder);
runLenDecoder.skipDictionaryIds(
toSkip, maxDefLevel, this.dictionaryIdsDecoder);
pageRowId += toSkip;
leftInPage -= toSkip;
}else{
} else {
skipBatch(toSkip);
pageRowId += toSkip;
leftInPage -= toSkip;
}
}
}else if (pageRowId > rangeEnd){
} else if (pageRowId > rangeEnd) {
readState.nextRange();
}else{
} else {
long start = pageRowId;
long end = Math.min(rangeEnd, pageRowId + readBatch -1);
int num = (int) (end - start +1);
long end = Math.min(rangeEnd, pageRowId + readBatch - 1);
int num = (int) (end - start + 1);

if (isCurrentPageDictionaryEncoded) {
// Read and decode dictionary ids.
runLenDecoder.readDictionaryIds(
num, dictionaryIds, vector, rowId, maxDefLevel, this.dictionaryIdsDecoder);
num,
dictionaryIds,
vector,
rowId,
maxDefLevel,
this.dictionaryIdsDecoder);

if (vector.hasDictionary() || (rowId == 0 && supportLazyDecode())) {
// Column vector supports lazy decoding of dictionary values so just set the
// dictionary.
// We can't do this if rowId != 0 AND the column doesn't have a dictionary (i.e.
// We can't do this if rowId != 0 AND the column doesn't have a dictionary
// (i.e.
// some
// non-dictionary encoded values have already been added).
vector.setDictionary(new ParquetDictionary(dictionary));
Expand All @@ -206,7 +221,8 @@ public final void readToVector(int readNumber, VECTOR vector) throws IOException
}
} else {
if (vector.hasDictionary() && rowId != 0) {
// This batch already has dictionary encoded values but this new page is not.
// This batch already has dictionary encoded values but this new page is
// not.
// The batch
// does not support a mix of dictionary and not so we will decode the
// dictionary.
Expand All @@ -219,7 +235,6 @@ public final void readToVector(int readNumber, VECTOR vector) throws IOException
pageRowId += num;
leftInPage -= num;
rowId += num;

}
readState.rowsToReadInBatch = leftInBatch;
readState.valuesToReadInPage = leftInPage;
Expand All @@ -234,25 +249,27 @@ private int readPage() {
}
long pageFirstRowIndex = page.getFirstRowIndex().orElse(0L);

int pageValueCount = page.accept(new DataPage.Visitor<Integer>() {
@Override
public Integer visit(DataPageV1 dataPageV1) {
try {
return readPageV1(dataPageV1);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Override
public Integer visit(DataPageV2 dataPageV2) {
try {
return readPageV2(dataPageV2);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});
int pageValueCount =
page.accept(
new DataPage.Visitor<Integer>() {
@Override
public Integer visit(DataPageV1 dataPageV1) {
try {
return readPageV1(dataPageV1);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Override
public Integer visit(DataPageV2 dataPageV2) {
try {
return readPageV2(dataPageV2);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});
readState.resetForNewPage(pageValueCount, pageFirstRowIndex);
return pageValueCount;
}
Expand Down Expand Up @@ -332,8 +349,7 @@ private void prepareNewPage(Encoding dataEncoding, ByteBufferInputStream in, int
afterReadPage();
}


final void skipDataBuffer(int length){
final void skipDataBuffer(int length) {
try {
dataInputStream.skipFully(length);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.column.page.PageReader;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.schema.PrimitiveType;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.column.page.PageReader;
import org.apache.parquet.schema.PrimitiveType;

import java.io.IOException;
Expand All @@ -32,7 +31,8 @@
/** Byte {@link ColumnReader}. Using INT32 to store byte, so just cast int to byte. */
public class ByteColumnReader extends AbstractColumnReader<WritableByteVector> {

public ByteColumnReader(ColumnDescriptor descriptor, PageReadStore pageReadStore) throws IOException {
public ByteColumnReader(ColumnDescriptor descriptor, PageReadStore pageReadStore)
throws IOException {
super(descriptor, pageReadStore);
checkTypeName(PrimitiveType.PrimitiveTypeName.INT32);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.column.page.PageReader;
import org.apache.parquet.schema.PrimitiveType;

import java.io.IOException;
Expand Down Expand Up @@ -87,7 +86,8 @@ protected void skipBatch(int num) {
break;
case PACKED:
for (int i = 0; i < n; ++i) {
if (runLenDecoder.currentBuffer[runLenDecoder.currentBufferIdx++] == maxDefLevel) {
if (runLenDecoder.currentBuffer[runLenDecoder.currentBufferIdx++]
== maxDefLevel) {
skipBinary(1);
}
}
Expand All @@ -98,7 +98,7 @@ protected void skipBatch(int num) {
}
}

private void skipBinary(int num){
private void skipBinary(int num) {
for (int i = 0; i < num; i++) {
int len = readDataBuffer(4).getInt();
skipDataBuffer(len);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.column.page.PageReader;
import org.apache.parquet.schema.PrimitiveType;

import java.io.IOException;
Expand Down Expand Up @@ -99,7 +98,7 @@ protected void skipBatch(int num) {
}
}

private void skipDouble(int num){
private void skipDouble(int num) {
skipDataBuffer(8 * num);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.column.page.PageReader;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.PrimitiveType;

Expand All @@ -40,7 +39,8 @@ public class FixedLenBytesColumnReader<VECTOR extends WritableColumnVector>
private final int precision;

public FixedLenBytesColumnReader(
ColumnDescriptor descriptor, PageReadStore pageReadStore, int precision) throws IOException {
ColumnDescriptor descriptor, PageReadStore pageReadStore, int precision)
throws IOException {
super(descriptor, pageReadStore);
checkTypeName(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY);
this.precision = precision;
Expand Down Expand Up @@ -100,17 +100,15 @@ protected void skipBatch(int num) {
for (int i = 0; i < num; i++) {
if (runLenDecoder.readInteger() == maxDefLevel) {
skipDataBinary(bytesLen);

}
}
}
}

private void skipDataBinary(int len){
private void skipDataBinary(int len) {
skipDataBuffer(len);
}


@Override
protected void readBatchFromDictionaryIds(
int rowId, int num, VECTOR column, WritableIntVector dictionaryIds) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.column.page.PageReader;
import org.apache.parquet.schema.PrimitiveType;

import java.io.IOException;
Expand Down Expand Up @@ -71,7 +70,6 @@ protected void readBatch(int rowId, int num, WritableFloatVector column) {
}
}


@Override
protected void skipBatch(int num) {
int left = num;
Expand Down Expand Up @@ -100,11 +98,10 @@ protected void skipBatch(int num) {
}
}

private void skipFloat(int num){
private void skipFloat(int num) {
skipDataBuffer(4 * num);
}


@Override
protected void readBatchFromDictionaryIds(
int rowId, int num, WritableFloatVector column, WritableIntVector dictionaryIds) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.column.page.PageReader;
import org.apache.parquet.schema.PrimitiveType;

import java.io.IOException;
Expand All @@ -31,7 +30,8 @@
/** Int {@link ColumnReader}. */
public class IntColumnReader extends AbstractColumnReader<WritableIntVector> {

public IntColumnReader(ColumnDescriptor descriptor, PageReadStore pageReadStore) throws IOException {
public IntColumnReader(ColumnDescriptor descriptor, PageReadStore pageReadStore)
throws IOException {
super(descriptor, pageReadStore);
checkTypeName(PrimitiveType.PrimitiveTypeName.INT32);
}
Expand Down Expand Up @@ -101,7 +101,6 @@ private void skipInteger(int num) {
skipDataBuffer(4 * num);
}


@Override
protected void readBatchFromDictionaryIds(
int rowId, int num, WritableIntVector column, WritableIntVector dictionaryIds) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.column.page.PageReader;
import org.apache.parquet.schema.PrimitiveType;

import java.io.IOException;
Expand All @@ -32,7 +31,8 @@
/** Long {@link ColumnReader}. */
public class LongColumnReader extends AbstractColumnReader<WritableLongVector> {

public LongColumnReader(ColumnDescriptor descriptor, PageReadStore pageReadStore) throws IOException {
public LongColumnReader(ColumnDescriptor descriptor, PageReadStore pageReadStore)
throws IOException {
super(descriptor, pageReadStore);
checkTypeName(PrimitiveType.PrimitiveTypeName.INT64);
}
Expand Down Expand Up @@ -98,7 +98,7 @@ protected void skipBatch(int num) {
}
}

private void skipValue(int num){
private void skipValue(int num) {
skipDataBuffer(num * 8);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ private Pair<LevelDelegation, WritableColumnVector> readPrimitive(
reader =
new NestedPrimitiveColumnReader(
descriptor,
pages.getPageReader(descriptor),
pages,
isUtcTimestamp,
descriptor.getPrimitiveType(),
field.getType(),
Expand Down
Loading

0 comments on commit 8513f09

Please sign in to comment.