Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy committed Jan 11, 2025
1 parent 247ad15 commit 89e1e2b
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,18 +91,15 @@ public class ParquetReaderFactory implements FormatReaderFactory {
private static final String ALLOCATION_SIZE = "parquet.read.allocation.size";

private final Options conf;

private final RowType projectedType;
private final DataField[] projectedFields;
private final DataField[] readFields;
private final int batchSize;
private final FilterCompat.Filter filter;
private final Set<Integer> unknownFieldsIndices = new HashSet<>();

public ParquetReaderFactory(
Options conf, RowType projectedType, int batchSize, FilterCompat.Filter filter) {
Options conf, RowType readType, int batchSize, FilterCompat.Filter filter) {
this.conf = conf;
this.projectedType = projectedType;
this.projectedFields = projectedType.getFields().toArray(new DataField[0]);
this.readFields = readType.getFields().toArray(new DataField[0]);
this.batchSize = batchSize;
this.filter = filter;
}
Expand All @@ -129,8 +126,7 @@ public FileRecordReader<InternalRow> createReader(FormatReaderFactory.Context co
createPoolOfBatches(context.filePath(), requestedSchema);

MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(requestedSchema);
List<ParquetField> fields =
buildFieldsList(projectedType.getFields(), projectedType.getFieldNames(), columnIO);
List<ParquetField> fields = buildFieldsList(readFields, columnIO);

return new ParquetReader(
reader, requestedSchema, reader.getFilteredRecordCount(), poolOfBatches, fields);
Expand Down Expand Up @@ -158,19 +154,19 @@ private void setReadOptions(ParquetReadOptions.Builder builder) {

/** Clips `parquetSchema` according to `fieldNames`. */
private MessageType clipParquetSchema(GroupType parquetSchema) {
Type[] types = new Type[projectedFields.length];
for (int i = 0; i < projectedFields.length; ++i) {
String fieldName = projectedFields[i].name();
Type[] types = new Type[readFields.length];
for (int i = 0; i < readFields.length; ++i) {
String fieldName = readFields[i].name();
if (!parquetSchema.containsField(fieldName)) {
LOG.warn(
"{} does not exist in {}, will fill the field with null.",
fieldName,
parquetSchema);
types[i] = ParquetSchemaConverter.convertToParquetType(projectedFields[i]);
types[i] = ParquetSchemaConverter.convertToParquetType(readFields[i]);
unknownFieldsIndices.add(i);
} else {
Type parquetType = parquetSchema.getType(fieldName);
types[i] = clipParquetType(projectedFields[i].type(), parquetType);
types[i] = clipParquetType(readFields[i].type(), parquetType);
}
}

Expand Down Expand Up @@ -220,7 +216,7 @@ private Type clipParquetType(DataType readType, Type parquetType) {

private void checkSchema(MessageType fileSchema, MessageType requestedSchema)
throws IOException, UnsupportedOperationException {
if (projectedFields.length != requestedSchema.getFieldCount()) {
if (readFields.length != requestedSchema.getFieldCount()) {
throw new RuntimeException(
"The quality of field type is incompatible with the request schema!");
}
Expand Down Expand Up @@ -268,13 +264,13 @@ private ParquetReaderBatch createReaderBatch(
}

private WritableColumnVector[] createWritableVectors(MessageType requestedSchema) {
WritableColumnVector[] columns = new WritableColumnVector[projectedFields.length];
WritableColumnVector[] columns = new WritableColumnVector[readFields.length];
List<Type> types = requestedSchema.getFields();
for (int i = 0; i < projectedFields.length; i++) {
for (int i = 0; i < readFields.length; i++) {
columns[i] =
createWritableColumnVector(
batchSize,
projectedFields[i].type(),
readFields[i].type(),
types.get(i),
requestedSchema.getColumns(),
0);
Expand All @@ -290,7 +286,7 @@ private VectorizedColumnBatch createVectorizedColumnBatch(
WritableColumnVector[] writableVectors) {
ColumnVector[] vectors = new ColumnVector[writableVectors.length];
for (int i = 0; i < writableVectors.length; i++) {
switch (projectedFields[i].type().getTypeRoot()) {
switch (readFields[i].type().getTypeRoot()) {
case DECIMAL:
vectors[i] =
new ParquetDecimalVector(
Expand Down Expand Up @@ -429,7 +425,7 @@ private void readNextRowGroup() throws IOException {
if (!unknownFieldsIndices.contains(i)) {
columnReaders[i] =
createColumnReader(
projectedFields[i].type(),
readFields[i].type(),
types.get(i),
requestedSchema.getColumns(),
rowGroup,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,12 +381,10 @@ private static List<ColumnDescriptor> getAllColumnDescriptorByType(
}

public static List<ParquetField> buildFieldsList(
List<DataField> children, List<String> fieldNames, MessageColumnIO columnIO) {
DataField[] readFields, MessageColumnIO columnIO) {
List<ParquetField> list = new ArrayList<>();
for (int i = 0; i < children.size(); i++) {
list.add(
constructField(
children.get(i), lookupColumnByName(columnIO, fieldNames.get(i))));
for (DataField readField : readFields) {
list.add(constructField(readField, lookupColumnByName(columnIO, readField.name())));
}
return list;
}
Expand Down

0 comments on commit 89e1e2b

Please sign in to comment.