From a6127fc145f612c7f3b4795999bb7916a4f14596 Mon Sep 17 00:00:00 2001 From: LsomeYeah <94825748+LsomeYeah@users.noreply.github.com> Date: Mon, 23 Dec 2024 19:33:25 +0800 Subject: [PATCH] [format] support parquet reader reading field with 'FIXED_LEN_BYTE_ARRAY' type (#4759) --- .../FixedLenBytesBinaryColumnReader.java | 77 +++++++++ .../reader/FixedLenBytesColumnReader.java | 117 +------------- .../FixedLenBytesDecimalColumnReader.java | 148 ++++++++++++++++++ .../reader/ParquetSplitReaderUtil.java | 16 +- .../format/parquet/ParquetReadWriteTest.java | 68 ++++++++ 5 files changed, 311 insertions(+), 115 deletions(-) create mode 100644 paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/FixedLenBytesBinaryColumnReader.java create mode 100644 paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/FixedLenBytesDecimalColumnReader.java diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/FixedLenBytesBinaryColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/FixedLenBytesBinaryColumnReader.java new file mode 100644 index 000000000000..5f8c78ca4cb5 --- /dev/null +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/FixedLenBytesBinaryColumnReader.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format.parquet.reader; + +import org.apache.paimon.data.columnar.writable.WritableBytesVector; +import org.apache.paimon.data.columnar.writable.WritableColumnVector; +import org.apache.paimon.data.columnar.writable.WritableIntVector; + +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.page.PageReadStore; + +import java.io.IOException; + +/** Fixed length bytes {@link ColumnReader}, just for Binary. */ +public class FixedLenBytesBinaryColumnReader + extends FixedLenBytesColumnReader { + + public FixedLenBytesBinaryColumnReader( + ColumnDescriptor descriptor, PageReadStore pageReadStore, int precision) + throws IOException { + super(descriptor, pageReadStore, precision); + } + + @Override + protected void readBatch(int rowId, int num, VECTOR column) { + int bytesLen = descriptor.getPrimitiveType().getTypeLength(); + WritableBytesVector bytesVector = (WritableBytesVector) column; + for (int i = 0; i < num; i++) { + if (runLenDecoder.readInteger() == maxDefLevel) { + byte[] bytes = readDataBinary(bytesLen).getBytesUnsafe(); + bytesVector.appendBytes(rowId + i, bytes, 0, bytes.length); + } else { + bytesVector.setNullAt(rowId + i); + } + } + } + + @Override + protected void skipBatch(int num) { + int bytesLen = descriptor.getPrimitiveType().getTypeLength(); + + for (int i = 0; i < num; i++) { + if (runLenDecoder.readInteger() == maxDefLevel) { + skipDataBinary(bytesLen); + } + } + } + + @Override + protected void readBatchFromDictionaryIds( + int rowId, int num, VECTOR column, WritableIntVector dictionaryIds) { + + WritableBytesVector bytesVector = (WritableBytesVector) column; + for (int i = rowId; i < rowId + num; ++i) { + if (!bytesVector.isNullAt(i)) { + byte[] v = dictionary.decodeToBinary(dictionaryIds.getInt(i)).getBytesUnsafe(); + bytesVector.appendBytes(i, v, 0, v.length); + } + } + } +} diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/FixedLenBytesColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/FixedLenBytesColumnReader.java index 25e1b466e465..536bf105378f 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/FixedLenBytesColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/FixedLenBytesColumnReader.java @@ -18,11 +18,7 @@ package org.apache.paimon.format.parquet.reader; -import org.apache.paimon.data.columnar.writable.WritableBytesVector; import org.apache.paimon.data.columnar.writable.WritableColumnVector; -import org.apache.paimon.data.columnar.writable.WritableIntVector; -import org.apache.paimon.data.columnar.writable.WritableLongVector; -import org.apache.paimon.format.parquet.ParquetSchemaConverter; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.PageReadStore; @@ -33,10 +29,10 @@ import java.nio.ByteBuffer; /** Fixed length bytes {@link ColumnReader}, just for Decimal. */ -public class FixedLenBytesColumnReader +public abstract class FixedLenBytesColumnReader extends AbstractColumnReader { - private final int precision; + protected final int precision; public FixedLenBytesColumnReader( ColumnDescriptor descriptor, PageReadStore pageReadStore, int precision) @@ -46,116 +42,11 @@ public FixedLenBytesColumnReader( this.precision = precision; } - @Override - protected void readBatch(int rowId, int num, VECTOR column) { - int bytesLen = descriptor.getPrimitiveType().getTypeLength(); - if (ParquetSchemaConverter.is32BitDecimal(precision)) { - WritableIntVector intVector = (WritableIntVector) column; - for (int i = 0; i < num; i++) { - if (runLenDecoder.readInteger() == maxDefLevel) { - intVector.setInt(rowId + i, (int) heapBinaryToLong(readDataBinary(bytesLen))); - } else { - intVector.setNullAt(rowId + i); - } - } - } else if (ParquetSchemaConverter.is64BitDecimal(precision)) { - WritableLongVector longVector = (WritableLongVector) column; - for (int i = 0; i < num; i++) { - if (runLenDecoder.readInteger() == maxDefLevel) { - longVector.setLong(rowId + i, heapBinaryToLong(readDataBinary(bytesLen))); - } else { - longVector.setNullAt(rowId + i); - } - } - } else { - WritableBytesVector bytesVector = (WritableBytesVector) column; - for (int i = 0; i < num; i++) { - if (runLenDecoder.readInteger() == maxDefLevel) { - byte[] bytes = readDataBinary(bytesLen).getBytesUnsafe(); - bytesVector.appendBytes(rowId + i, bytes, 0, bytes.length); - } else { - bytesVector.setNullAt(rowId + i); - } - } - } - } - - @Override - protected void skipBatch(int num) { - int bytesLen = descriptor.getPrimitiveType().getTypeLength(); - if (ParquetSchemaConverter.is32BitDecimal(precision)) { - for (int i = 0; i < num; i++) { - if (runLenDecoder.readInteger() == maxDefLevel) { - skipDataBinary(bytesLen); - } - } - } else if (ParquetSchemaConverter.is64BitDecimal(precision)) { - - for (int i = 0; i < num; i++) { - if (runLenDecoder.readInteger() == maxDefLevel) { - skipDataBinary(bytesLen); - } - } - } else { - for (int i = 0; i < num; i++) { - if (runLenDecoder.readInteger() == maxDefLevel) { - skipDataBinary(bytesLen); - } - } - } - } - - private void skipDataBinary(int len) { + protected void skipDataBinary(int len) { skipDataBuffer(len); } - @Override - protected void readBatchFromDictionaryIds( - int rowId, int num, VECTOR column, WritableIntVector dictionaryIds) { - if (ParquetSchemaConverter.is32BitDecimal(precision)) { - WritableIntVector intVector = (WritableIntVector) column; - for (int i = rowId; i < rowId + num; ++i) { - if (!intVector.isNullAt(i)) { - Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i)); - intVector.setInt(i, (int) heapBinaryToLong(v)); - } - } - } else if (ParquetSchemaConverter.is64BitDecimal(precision)) { - WritableLongVector longVector = (WritableLongVector) column; - for (int i = rowId; i < rowId + num; ++i) { - if (!longVector.isNullAt(i)) { - Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i)); - longVector.setLong(i, heapBinaryToLong(v)); - } - } - } else { - WritableBytesVector bytesVector = (WritableBytesVector) column; - for (int i = rowId; i < rowId + num; ++i) { - if (!bytesVector.isNullAt(i)) { - byte[] v = dictionary.decodeToBinary(dictionaryIds.getInt(i)).getBytesUnsafe(); - bytesVector.appendBytes(i, v, 0, v.length); - } - } - } - } - - private long heapBinaryToLong(Binary binary) { - ByteBuffer buffer = binary.toByteBuffer(); - byte[] bytes = buffer.array(); - int start = buffer.arrayOffset() + buffer.position(); - int end = buffer.arrayOffset() + buffer.limit(); - - long unscaled = 0L; - - for (int i = start; i < end; i++) { - unscaled = (unscaled << 8) | (bytes[i] & 0xff); - } - - int bits = 8 * (end - start); - return (unscaled << (64 - bits)) >> (64 - bits); - } - - private Binary readDataBinary(int len) { + protected Binary readDataBinary(int len) { ByteBuffer buffer = readDataBuffer(len); if (buffer.hasArray()) { return Binary.fromConstantByteArray( diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/FixedLenBytesDecimalColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/FixedLenBytesDecimalColumnReader.java new file mode 100644 index 000000000000..f8bad2575097 --- /dev/null +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/FixedLenBytesDecimalColumnReader.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format.parquet.reader; + +import org.apache.paimon.data.columnar.writable.WritableBytesVector; +import org.apache.paimon.data.columnar.writable.WritableColumnVector; +import org.apache.paimon.data.columnar.writable.WritableIntVector; +import org.apache.paimon.data.columnar.writable.WritableLongVector; +import org.apache.paimon.format.parquet.ParquetSchemaConverter; + +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.io.api.Binary; + +import java.io.IOException; +import java.nio.ByteBuffer; + +/** Fixed length bytes {@link ColumnReader}, just for Decimal. */ +public class FixedLenBytesDecimalColumnReader + extends FixedLenBytesColumnReader { + + public FixedLenBytesDecimalColumnReader( + ColumnDescriptor descriptor, PageReadStore pageReadStore, int precision) + throws IOException { + super(descriptor, pageReadStore, precision); + } + + @Override + protected void readBatch(int rowId, int num, VECTOR column) { + int bytesLen = descriptor.getPrimitiveType().getTypeLength(); + if (ParquetSchemaConverter.is32BitDecimal(precision)) { + WritableIntVector intVector = (WritableIntVector) column; + for (int i = 0; i < num; i++) { + if (runLenDecoder.readInteger() == maxDefLevel) { + intVector.setInt(rowId + i, (int) heapBinaryToLong(readDataBinary(bytesLen))); + } else { + intVector.setNullAt(rowId + i); + } + } + } else if (ParquetSchemaConverter.is64BitDecimal(precision)) { + WritableLongVector longVector = (WritableLongVector) column; + for (int i = 0; i < num; i++) { + if (runLenDecoder.readInteger() == maxDefLevel) { + longVector.setLong(rowId + i, heapBinaryToLong(readDataBinary(bytesLen))); + } else { + longVector.setNullAt(rowId + i); + } + } + } else { + WritableBytesVector bytesVector = (WritableBytesVector) column; + for (int i = 0; i < num; i++) { + if (runLenDecoder.readInteger() == maxDefLevel) { + byte[] bytes = readDataBinary(bytesLen).getBytesUnsafe(); + bytesVector.appendBytes(rowId + i, bytes, 0, bytes.length); + } else { + bytesVector.setNullAt(rowId + i); + } + } + } + } + + @Override + protected void skipBatch(int num) { + int bytesLen = descriptor.getPrimitiveType().getTypeLength(); + if (ParquetSchemaConverter.is32BitDecimal(precision)) { + for (int i = 0; i < num; i++) { + if (runLenDecoder.readInteger() == maxDefLevel) { + skipDataBinary(bytesLen); + } + } + } else if (ParquetSchemaConverter.is64BitDecimal(precision)) { + + for (int i = 0; i < num; i++) { + if (runLenDecoder.readInteger() == maxDefLevel) { + skipDataBinary(bytesLen); + } + } + } else { + for (int i = 0; i < num; i++) { + if (runLenDecoder.readInteger() == maxDefLevel) { + skipDataBinary(bytesLen); + } + } + } + } + + @Override + protected void readBatchFromDictionaryIds( + int rowId, int num, VECTOR column, WritableIntVector dictionaryIds) { + if (ParquetSchemaConverter.is32BitDecimal(precision)) { + WritableIntVector intVector = (WritableIntVector) column; + for (int i = rowId; i < rowId + num; ++i) { + if (!intVector.isNullAt(i)) { + Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i)); + intVector.setInt(i, (int) heapBinaryToLong(v)); + } + } + } else if (ParquetSchemaConverter.is64BitDecimal(precision)) { + WritableLongVector longVector = (WritableLongVector) column; + for (int i = rowId; i < rowId + num; ++i) { + if (!longVector.isNullAt(i)) { + Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i)); + longVector.setLong(i, heapBinaryToLong(v)); + } + } + } else { + WritableBytesVector bytesVector = (WritableBytesVector) column; + for (int i = rowId; i < rowId + num; ++i) { + if (!bytesVector.isNullAt(i)) { + byte[] v = dictionary.decodeToBinary(dictionaryIds.getInt(i)).getBytesUnsafe(); + bytesVector.appendBytes(i, v, 0, v.length); + } + } + } + } + + private long heapBinaryToLong(Binary binary) { + ByteBuffer buffer = binary.toByteBuffer(); + byte[] bytes = buffer.array(); + int start = buffer.arrayOffset() + buffer.position(); + int end = buffer.arrayOffset() + buffer.limit(); + + long unscaled = 0L; + + for (int i = start; i < end; i++) { + unscaled = (unscaled << 8) | (bytes[i] & 0xff); + } + + int bits = 8 * (end - start); + return (unscaled << (64 - bits)) >> (64 - bits); + } +} diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java index a2be77414d5a..c05714a1b565 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java @@ -36,6 +36,7 @@ import org.apache.paimon.format.parquet.type.ParquetGroupField; import org.apache.paimon.format.parquet.type.ParquetPrimitiveField; import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.BinaryType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypeChecks; @@ -106,6 +107,11 @@ public static ColumnReader createColumnReader( case VARCHAR: case BINARY: case VARBINARY: + if (descriptors.get(0).getPrimitiveType().getPrimitiveTypeName() + == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) { + return new FixedLenBytesBinaryColumnReader( + descriptors.get(0), pages, ((BinaryType) fieldType).getLength()); + } return new BytesColumnReader(descriptors.get(0), pages); case TIMESTAMP_WITHOUT_TIME_ZONE: case TIMESTAMP_WITH_LOCAL_TIME_ZONE: @@ -123,7 +129,7 @@ public static ColumnReader createColumnReader( case BINARY: return new BytesColumnReader(descriptors.get(0), pages); case FIXED_LEN_BYTE_ARRAY: - return new FixedLenBytesColumnReader( + return new FixedLenBytesDecimalColumnReader( descriptors.get(0), pages, ((DecimalType) fieldType).getPrecision()); @@ -195,13 +201,19 @@ public static WritableColumnVector createWritableColumnVector( return new HeapShortVector(batchSize); case CHAR: case VARCHAR: - case BINARY: case VARBINARY: checkArgument( typeName == PrimitiveType.PrimitiveTypeName.BINARY, "Unexpected type: %s", typeName); return new HeapBytesVector(batchSize); + case BINARY: + checkArgument( + typeName == PrimitiveType.PrimitiveTypeName.BINARY + || typeName == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, + "Unexpected type: %s", + typeName); + return new HeapBytesVector(batchSize); case TIMESTAMP_WITHOUT_TIME_ZONE: case TIMESTAMP_WITH_LOCAL_TIME_ZONE: int precision = DataTypeChecks.getPrecision(fieldType); diff --git a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java index 878414498315..c6728e338ffe 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java @@ -36,6 +36,7 @@ import org.apache.paimon.reader.RecordReader; import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.BinaryType; import org.apache.paimon.types.BooleanType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; @@ -50,6 +51,7 @@ import org.apache.paimon.types.SmallIntType; import org.apache.paimon.types.TimestampType; import org.apache.paimon.types.TinyIntType; +import org.apache.paimon.types.VarBinaryType; import org.apache.paimon.types.VarCharType; import org.apache.hadoop.conf.Configuration; @@ -61,6 +63,7 @@ import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.example.ExampleParquetWriter; import org.apache.parquet.hadoop.util.HadoopOutputFile; +import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.ConversionPatterns; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.LogicalTypeAnnotation; @@ -556,6 +559,71 @@ public void testConvertToParquetTypeWithId() { assertThat(actual).isEqualTo(expected); } + @Test + public void testReadBinaryWrittenByParquet() throws Exception { + Path path = new Path(folder.getPath(), UUID.randomUUID().toString()); + Configuration conf = new Configuration(); + MessageType schema = + new MessageType( + "origin-parquet", + Types.primitive( + PrimitiveType.PrimitiveTypeName.BINARY, + Type.Repetition.REQUIRED) + .named("f0") + .withId(0), + Types.primitive( + PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, + Type.Repetition.REQUIRED) + .length(10) + .named("f1") + .withId(1)); + + List targetRows = new ArrayList<>(); + try (ParquetWriter writer = + ExampleParquetWriter.builder( + HadoopOutputFile.fromPath( + new org.apache.hadoop.fs.Path(path.toString()), conf)) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .withConf(new Configuration()) + .withType(schema) + .build()) { + SimpleGroupFactory simpleGroupFactory = new SimpleGroupFactory(schema); + for (int i = 0; i < 100; i++) { + Group row = simpleGroupFactory.newGroup(); + byte[] randomLengthBytes = new byte[ThreadLocalRandom.current().nextInt(1, 100)]; + byte[] fixedLengthBytes = new byte[10]; + ThreadLocalRandom.current().nextBytes(randomLengthBytes); + ThreadLocalRandom.current().nextBytes(fixedLengthBytes); + + targetRows.add(GenericRow.of(randomLengthBytes, fixedLengthBytes)); + + row.append("f0", Binary.fromConstantByteArray(randomLengthBytes)); + row.append("f1", Binary.fromConstantByteArray(fixedLengthBytes)); + writer.write(row); + } + } catch (IOException e) { + throw new RuntimeException("Create data by parquet origin writer failed."); + } + + RowType paimonRowType = + RowType.builder() + .fields(new VarBinaryType(VarCharType.MAX_LENGTH), new BinaryType(10)) + .build(); + + ParquetReaderFactory format = + new ParquetReaderFactory(new Options(), paimonRowType, 500, FilterCompat.NOOP); + + RecordReader reader = + format.createReader( + new FormatReaderContext( + new LocalFileIO(), path, new LocalFileIO().getFileSize(path))); + reader.forEachRemaining( + row -> { + Assertions.assertArrayEquals(row.getBinary(0), row.getBinary(0)); + Assertions.assertArrayEquals(row.getBinary(1), row.getBinary(1)); + }); + } + private void innerTestTypes(File folder, List records, int rowGroupSize) throws IOException { List rows = records.stream().map(this::newRow).collect(Collectors.toList());