Skip to content

Commit

Permalink
[format] support parquet reader reading field with 'FIXED_LEN_BYTE_AR…
Browse files Browse the repository at this point in the history
…RAY' type (apache#4759)
  • Loading branch information
LsomeYeah authored Dec 23, 2024
1 parent 51d1ec9 commit a6127fc
Show file tree
Hide file tree
Showing 5 changed files with 311 additions and 115 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.format.parquet.reader;

import org.apache.paimon.data.columnar.writable.WritableBytesVector;
import org.apache.paimon.data.columnar.writable.WritableColumnVector;
import org.apache.paimon.data.columnar.writable.WritableIntVector;

import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReadStore;

import java.io.IOException;

/** Fixed length bytes {@link ColumnReader}, just for Binary. */
public class FixedLenBytesBinaryColumnReader<VECTOR extends WritableColumnVector>
extends FixedLenBytesColumnReader<VECTOR> {

public FixedLenBytesBinaryColumnReader(
ColumnDescriptor descriptor, PageReadStore pageReadStore, int precision)
throws IOException {
super(descriptor, pageReadStore, precision);
}

@Override
protected void readBatch(int rowId, int num, VECTOR column) {
int bytesLen = descriptor.getPrimitiveType().getTypeLength();
WritableBytesVector bytesVector = (WritableBytesVector) column;
for (int i = 0; i < num; i++) {
if (runLenDecoder.readInteger() == maxDefLevel) {
byte[] bytes = readDataBinary(bytesLen).getBytesUnsafe();
bytesVector.appendBytes(rowId + i, bytes, 0, bytes.length);
} else {
bytesVector.setNullAt(rowId + i);
}
}
}

@Override
protected void skipBatch(int num) {
int bytesLen = descriptor.getPrimitiveType().getTypeLength();

for (int i = 0; i < num; i++) {
if (runLenDecoder.readInteger() == maxDefLevel) {
skipDataBinary(bytesLen);
}
}
}

@Override
protected void readBatchFromDictionaryIds(
int rowId, int num, VECTOR column, WritableIntVector dictionaryIds) {

WritableBytesVector bytesVector = (WritableBytesVector) column;
for (int i = rowId; i < rowId + num; ++i) {
if (!bytesVector.isNullAt(i)) {
byte[] v = dictionary.decodeToBinary(dictionaryIds.getInt(i)).getBytesUnsafe();
bytesVector.appendBytes(i, v, 0, v.length);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,7 @@

package org.apache.paimon.format.parquet.reader;

import org.apache.paimon.data.columnar.writable.WritableBytesVector;
import org.apache.paimon.data.columnar.writable.WritableColumnVector;
import org.apache.paimon.data.columnar.writable.WritableIntVector;
import org.apache.paimon.data.columnar.writable.WritableLongVector;
import org.apache.paimon.format.parquet.ParquetSchemaConverter;

import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReadStore;
Expand All @@ -33,10 +29,10 @@
import java.nio.ByteBuffer;

/** Fixed length bytes {@link ColumnReader}, just for Decimal. */
public class FixedLenBytesColumnReader<VECTOR extends WritableColumnVector>
public abstract class FixedLenBytesColumnReader<VECTOR extends WritableColumnVector>
extends AbstractColumnReader<VECTOR> {

private final int precision;
protected final int precision;

public FixedLenBytesColumnReader(
ColumnDescriptor descriptor, PageReadStore pageReadStore, int precision)
Expand All @@ -46,116 +42,11 @@ public FixedLenBytesColumnReader(
this.precision = precision;
}

@Override
protected void readBatch(int rowId, int num, VECTOR column) {
int bytesLen = descriptor.getPrimitiveType().getTypeLength();
if (ParquetSchemaConverter.is32BitDecimal(precision)) {
WritableIntVector intVector = (WritableIntVector) column;
for (int i = 0; i < num; i++) {
if (runLenDecoder.readInteger() == maxDefLevel) {
intVector.setInt(rowId + i, (int) heapBinaryToLong(readDataBinary(bytesLen)));
} else {
intVector.setNullAt(rowId + i);
}
}
} else if (ParquetSchemaConverter.is64BitDecimal(precision)) {
WritableLongVector longVector = (WritableLongVector) column;
for (int i = 0; i < num; i++) {
if (runLenDecoder.readInteger() == maxDefLevel) {
longVector.setLong(rowId + i, heapBinaryToLong(readDataBinary(bytesLen)));
} else {
longVector.setNullAt(rowId + i);
}
}
} else {
WritableBytesVector bytesVector = (WritableBytesVector) column;
for (int i = 0; i < num; i++) {
if (runLenDecoder.readInteger() == maxDefLevel) {
byte[] bytes = readDataBinary(bytesLen).getBytesUnsafe();
bytesVector.appendBytes(rowId + i, bytes, 0, bytes.length);
} else {
bytesVector.setNullAt(rowId + i);
}
}
}
}

@Override
protected void skipBatch(int num) {
int bytesLen = descriptor.getPrimitiveType().getTypeLength();
if (ParquetSchemaConverter.is32BitDecimal(precision)) {
for (int i = 0; i < num; i++) {
if (runLenDecoder.readInteger() == maxDefLevel) {
skipDataBinary(bytesLen);
}
}
} else if (ParquetSchemaConverter.is64BitDecimal(precision)) {

for (int i = 0; i < num; i++) {
if (runLenDecoder.readInteger() == maxDefLevel) {
skipDataBinary(bytesLen);
}
}
} else {
for (int i = 0; i < num; i++) {
if (runLenDecoder.readInteger() == maxDefLevel) {
skipDataBinary(bytesLen);
}
}
}
}

private void skipDataBinary(int len) {
protected void skipDataBinary(int len) {
skipDataBuffer(len);
}

@Override
protected void readBatchFromDictionaryIds(
int rowId, int num, VECTOR column, WritableIntVector dictionaryIds) {
if (ParquetSchemaConverter.is32BitDecimal(precision)) {
WritableIntVector intVector = (WritableIntVector) column;
for (int i = rowId; i < rowId + num; ++i) {
if (!intVector.isNullAt(i)) {
Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
intVector.setInt(i, (int) heapBinaryToLong(v));
}
}
} else if (ParquetSchemaConverter.is64BitDecimal(precision)) {
WritableLongVector longVector = (WritableLongVector) column;
for (int i = rowId; i < rowId + num; ++i) {
if (!longVector.isNullAt(i)) {
Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
longVector.setLong(i, heapBinaryToLong(v));
}
}
} else {
WritableBytesVector bytesVector = (WritableBytesVector) column;
for (int i = rowId; i < rowId + num; ++i) {
if (!bytesVector.isNullAt(i)) {
byte[] v = dictionary.decodeToBinary(dictionaryIds.getInt(i)).getBytesUnsafe();
bytesVector.appendBytes(i, v, 0, v.length);
}
}
}
}

private long heapBinaryToLong(Binary binary) {
ByteBuffer buffer = binary.toByteBuffer();
byte[] bytes = buffer.array();
int start = buffer.arrayOffset() + buffer.position();
int end = buffer.arrayOffset() + buffer.limit();

long unscaled = 0L;

for (int i = start; i < end; i++) {
unscaled = (unscaled << 8) | (bytes[i] & 0xff);
}

int bits = 8 * (end - start);
return (unscaled << (64 - bits)) >> (64 - bits);
}

private Binary readDataBinary(int len) {
protected Binary readDataBinary(int len) {
ByteBuffer buffer = readDataBuffer(len);
if (buffer.hasArray()) {
return Binary.fromConstantByteArray(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.format.parquet.reader;

import org.apache.paimon.data.columnar.writable.WritableBytesVector;
import org.apache.paimon.data.columnar.writable.WritableColumnVector;
import org.apache.paimon.data.columnar.writable.WritableIntVector;
import org.apache.paimon.data.columnar.writable.WritableLongVector;
import org.apache.paimon.format.parquet.ParquetSchemaConverter;

import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.io.api.Binary;

import java.io.IOException;
import java.nio.ByteBuffer;

/** Fixed length bytes {@link ColumnReader}, just for Decimal. */
public class FixedLenBytesDecimalColumnReader<VECTOR extends WritableColumnVector>
extends FixedLenBytesColumnReader<VECTOR> {

public FixedLenBytesDecimalColumnReader(
ColumnDescriptor descriptor, PageReadStore pageReadStore, int precision)
throws IOException {
super(descriptor, pageReadStore, precision);
}

@Override
protected void readBatch(int rowId, int num, VECTOR column) {
int bytesLen = descriptor.getPrimitiveType().getTypeLength();
if (ParquetSchemaConverter.is32BitDecimal(precision)) {
WritableIntVector intVector = (WritableIntVector) column;
for (int i = 0; i < num; i++) {
if (runLenDecoder.readInteger() == maxDefLevel) {
intVector.setInt(rowId + i, (int) heapBinaryToLong(readDataBinary(bytesLen)));
} else {
intVector.setNullAt(rowId + i);
}
}
} else if (ParquetSchemaConverter.is64BitDecimal(precision)) {
WritableLongVector longVector = (WritableLongVector) column;
for (int i = 0; i < num; i++) {
if (runLenDecoder.readInteger() == maxDefLevel) {
longVector.setLong(rowId + i, heapBinaryToLong(readDataBinary(bytesLen)));
} else {
longVector.setNullAt(rowId + i);
}
}
} else {
WritableBytesVector bytesVector = (WritableBytesVector) column;
for (int i = 0; i < num; i++) {
if (runLenDecoder.readInteger() == maxDefLevel) {
byte[] bytes = readDataBinary(bytesLen).getBytesUnsafe();
bytesVector.appendBytes(rowId + i, bytes, 0, bytes.length);
} else {
bytesVector.setNullAt(rowId + i);
}
}
}
}

@Override
protected void skipBatch(int num) {
int bytesLen = descriptor.getPrimitiveType().getTypeLength();
if (ParquetSchemaConverter.is32BitDecimal(precision)) {
for (int i = 0; i < num; i++) {
if (runLenDecoder.readInteger() == maxDefLevel) {
skipDataBinary(bytesLen);
}
}
} else if (ParquetSchemaConverter.is64BitDecimal(precision)) {

for (int i = 0; i < num; i++) {
if (runLenDecoder.readInteger() == maxDefLevel) {
skipDataBinary(bytesLen);
}
}
} else {
for (int i = 0; i < num; i++) {
if (runLenDecoder.readInteger() == maxDefLevel) {
skipDataBinary(bytesLen);
}
}
}
}

@Override
protected void readBatchFromDictionaryIds(
int rowId, int num, VECTOR column, WritableIntVector dictionaryIds) {
if (ParquetSchemaConverter.is32BitDecimal(precision)) {
WritableIntVector intVector = (WritableIntVector) column;
for (int i = rowId; i < rowId + num; ++i) {
if (!intVector.isNullAt(i)) {
Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
intVector.setInt(i, (int) heapBinaryToLong(v));
}
}
} else if (ParquetSchemaConverter.is64BitDecimal(precision)) {
WritableLongVector longVector = (WritableLongVector) column;
for (int i = rowId; i < rowId + num; ++i) {
if (!longVector.isNullAt(i)) {
Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
longVector.setLong(i, heapBinaryToLong(v));
}
}
} else {
WritableBytesVector bytesVector = (WritableBytesVector) column;
for (int i = rowId; i < rowId + num; ++i) {
if (!bytesVector.isNullAt(i)) {
byte[] v = dictionary.decodeToBinary(dictionaryIds.getInt(i)).getBytesUnsafe();
bytesVector.appendBytes(i, v, 0, v.length);
}
}
}
}

private long heapBinaryToLong(Binary binary) {
ByteBuffer buffer = binary.toByteBuffer();
byte[] bytes = buffer.array();
int start = buffer.arrayOffset() + buffer.position();
int end = buffer.arrayOffset() + buffer.limit();

long unscaled = 0L;

for (int i = start; i < end; i++) {
unscaled = (unscaled << 8) | (bytes[i] & 0xff);
}

int bits = 8 * (end - start);
return (unscaled << (64 - bits)) >> (64 - bits);
}
}
Loading

0 comments on commit a6127fc

Please sign in to comment.