From 938c8933710dce488c2571bd6604343a9bd30c4c Mon Sep 17 00:00:00 2001 From: Jingsong Lee Date: Mon, 3 Jun 2024 11:23:39 +0800 Subject: [PATCH] [parquet] Parquet use vector IO (#3455) --- LICENSE | 4 + .../paimon/benchmark/TableReadBenchmark.java | 27 + .../format/parquet/ParquetInputFile.java | 3 +- .../format/parquet/ParquetInputStream.java | 4 + .../paimon/format/parquet/ParquetUtil.java | 2 +- .../parquet/hadoop/ParquetFileReader.java | 1532 +++++++++++++++++ .../apache/parquet/hadoop/ParquetWriter.java | 475 +++++ 7 files changed, 2044 insertions(+), 3 deletions(-) create mode 100644 paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java create mode 100644 paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java diff --git a/LICENSE b/LICENSE index 8f291d3a0a3c..38bbe5ec276d 100644 --- a/LICENSE +++ b/LICENSE @@ -266,6 +266,10 @@ paimon-format/src/main/java/org/apache/orc/OrcConf.java paimon-format/src/main/java/org/apache/orc/OrcFile.java from https://orc.apache.org/ version 2.0 +paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java +from https://parquet.apache.org/ version 1.14.0 + MIT License ----------- diff --git a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableReadBenchmark.java b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableReadBenchmark.java index 2c6a00dfa06f..535ab7652095 100644 --- a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableReadBenchmark.java +++ b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableReadBenchmark.java @@ -107,6 +107,33 @@ public void testOrcReadProjection1() throws Exception { */ } + @Test + public void testParquetReadProjection() throws Exception { + innerTestProjection( + Collections.singletonMap("parquet", prepareData(orc(), "parquet")), + new int[] {0, 5, 10, 14}); + /* + * OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Mac OS X 10.16 + * Apple M1 Pro + * read: Best/Avg Time(ms) Row Rate(K/s) Per Row(ns) Relative + * ------------------------------------------------------------------------------------------------ + * OPERATORTEST_read_read-orc 716 / 728 4187.4 238.8 1.0X + */ + } + + @Test + public void testParquetReadProjection1() throws Exception { + innerTestProjection( + Collections.singletonMap("parquet", prepareData(orc(), "parquet")), new int[] {10}); + /* + * OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Mac OS X 10.16 + * Apple M1 Pro + * read: Best/Avg Time(ms) Row Rate(K/s) Per Row(ns) Relative + * ------------------------------------------------------------------------------------------------ + * OPERATORTEST_read_read-orc 716 / 728 4187.4 238.8 1.0X + */ + } + private Options orc() { Options options = new Options(); options.set(CoreOptions.FILE_FORMAT, CoreOptions.FILE_FORMAT_ORC); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetInputFile.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetInputFile.java index 01d53f8fad5f..7c52c249719f 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetInputFile.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetInputFile.java @@ -23,7 +23,6 @@ import org.apache.paimon.fs.Path; import org.apache.parquet.io.InputFile; -import org.apache.parquet.io.SeekableInputStream; import java.io.IOException; @@ -52,7 +51,7 @@ public long getLength() { } @Override - public SeekableInputStream newStream() throws IOException { + public ParquetInputStream newStream() throws IOException { return new ParquetInputStream(fileIO.newInputStream(stat.getPath())); } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetInputStream.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetInputStream.java index 9b62e8486d20..704ea44ffec3 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetInputStream.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetInputStream.java @@ -34,6 +34,10 @@ public ParquetInputStream(SeekableInputStream in) { this.in = in; } + public SeekableInputStream in() { + return in; + } + @Override public long getPos() throws IOException { return in.getPos(); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetUtil.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetUtil.java index 55faac0a2e85..055fe83f7c66 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetUtil.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetUtil.java @@ -78,7 +78,7 @@ public class ParquetUtil { * @return parquet reader, used for reading footer, status, etc. */ public static ParquetFileReader getParquetReader(FileIO fileIO, Path path) throws IOException { - return ParquetFileReader.open( + return new ParquetFileReader( ParquetInputFile.fromPath(fileIO, path), ParquetReadOptions.builder().build()); } diff --git a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java new file mode 100644 index 000000000000..165a0141777e --- /dev/null +++ b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -0,0 +1,1532 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.parquet.hadoop; + +import org.apache.paimon.format.parquet.ParquetInputFile; +import org.apache.paimon.format.parquet.ParquetInputStream; +import org.apache.paimon.fs.FileRange; +import org.apache.paimon.fs.VectoredReadable; + +import org.apache.hadoop.fs.Path; +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.Preconditions; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.page.DataPage; +import org.apache.parquet.column.page.DataPageV1; +import org.apache.parquet.column.page.DataPageV2; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.column.page.DictionaryPageReadStore; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter; +import org.apache.parquet.column.values.bloomfilter.BloomFilter; +import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor; +import org.apache.parquet.crypto.AesCipher; +import org.apache.parquet.crypto.FileDecryptionProperties; +import org.apache.parquet.crypto.InternalColumnDecryptionSetup; +import org.apache.parquet.crypto.InternalFileDecryptor; +import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType; +import org.apache.parquet.crypto.ParquetCryptoRuntimeException; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.compat.RowGroupFilter; +import org.apache.parquet.format.BlockCipher; +import org.apache.parquet.format.BloomFilterHeader; +import org.apache.parquet.format.DataPageHeader; +import org.apache.parquet.format.DataPageHeaderV2; +import org.apache.parquet.format.DictionaryPageHeader; +import org.apache.parquet.format.FileCryptoMetaData; +import org.apache.parquet.format.PageHeader; +import org.apache.parquet.format.Util; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.ColumnChunkPageReadStore.ColumnChunkPageReader; +import org.apache.parquet.hadoop.ColumnIndexFilterUtils.OffsetRange; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.counters.BenchmarkCounter; +import org.apache.parquet.internal.column.columnindex.ColumnIndex; +import org.apache.parquet.internal.column.columnindex.OffsetIndex; +import org.apache.parquet.internal.filter2.columnindex.ColumnIndexFilter; +import org.apache.parquet.internal.filter2.columnindex.ColumnIndexStore; +import org.apache.parquet.internal.filter2.columnindex.RowRanges; +import org.apache.parquet.internal.hadoop.metadata.IndexReference; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.io.ParquetDecodingException; +import org.apache.parquet.io.SeekableInputStream; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.yetus.audience.InterfaceAudience.Private; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.SequenceInputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.zip.CRC32; + +import static org.apache.parquet.bytes.BytesUtils.readIntLittleEndian; +import static org.apache.parquet.filter2.compat.RowGroupFilter.FilterLevel.BLOOMFILTER; +import static org.apache.parquet.filter2.compat.RowGroupFilter.FilterLevel.DICTIONARY; +import static org.apache.parquet.filter2.compat.RowGroupFilter.FilterLevel.STATISTICS; +import static org.apache.parquet.format.Util.readFileCryptoMetaData; +import static org.apache.parquet.hadoop.ColumnIndexFilterUtils.calculateOffsetRanges; +import static org.apache.parquet.hadoop.ColumnIndexFilterUtils.filterOffsetIndex; +import static org.apache.parquet.hadoop.ParquetFileWriter.EFMAGIC; +import static org.apache.parquet.hadoop.ParquetFileWriter.MAGIC; + +/** + * Internal implementation of the Parquet file reader as a block container. + * + *

NOTE: The file was copied and modified to support {@link VectoredReadable}. + */ +public class ParquetFileReader implements Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(ParquetFileReader.class); + + private final ParquetMetadataConverter converter; + + private final CRC32 crc; + + private static ParquetMetadata readFooter( + InputFile file, + ParquetReadOptions options, + SeekableInputStream f, + ParquetMetadataConverter converter) + throws IOException { + + long fileLen = file.getLength(); + String filePath = file.toString(); + LOG.debug("File length {}", fileLen); + + int footerLengthSize = 4; + if (fileLen + < MAGIC.length + + footerLengthSize + + MAGIC.length) { // MAGIC + data + footer + footerIndex + MAGIC + throw new RuntimeException( + filePath + " is not a Parquet file (length is too low: " + fileLen + ")"); + } + + // Read footer length and magic string - with a single seek + byte[] magic = new byte[MAGIC.length]; + long fileMetadataLengthIndex = fileLen - magic.length - footerLengthSize; + LOG.debug("reading footer index at {}", fileMetadataLengthIndex); + f.seek(fileMetadataLengthIndex); + int fileMetadataLength = readIntLittleEndian(f); + f.readFully(magic); + + boolean encryptedFooterMode; + if (Arrays.equals(MAGIC, magic)) { + encryptedFooterMode = false; + } else if (Arrays.equals(EFMAGIC, magic)) { + encryptedFooterMode = true; + } else { + throw new RuntimeException( + filePath + + " is not a Parquet file. Expected magic number at tail, but found " + + Arrays.toString(magic)); + } + + long fileMetadataIndex = fileMetadataLengthIndex - fileMetadataLength; + LOG.debug( + "read footer length: {}, footer index: {}", fileMetadataLength, fileMetadataIndex); + if (fileMetadataIndex < magic.length || fileMetadataIndex >= fileMetadataLengthIndex) { + throw new RuntimeException( + "corrupted file: the footer index is not within the file: " + + fileMetadataIndex); + } + f.seek(fileMetadataIndex); + + FileDecryptionProperties fileDecryptionProperties = options.getDecryptionProperties(); + InternalFileDecryptor fileDecryptor = null; + if (null != fileDecryptionProperties) { + fileDecryptor = new InternalFileDecryptor(fileDecryptionProperties); + } + + // Read all the footer bytes in one time to avoid multiple read operations, + // since it can be pretty time consuming for a single read operation in HDFS. + ByteBuffer footerBytesBuffer = ByteBuffer.allocate(fileMetadataLength); + f.readFully(footerBytesBuffer); + LOG.debug("Finished to read all footer bytes."); + footerBytesBuffer.flip(); + InputStream footerBytesStream = ByteBufferInputStream.wrap(footerBytesBuffer); + + // Regular file, or encrypted file with plaintext footer + if (!encryptedFooterMode) { + return converter.readParquetMetadata( + footerBytesStream, + options.getMetadataFilter(), + fileDecryptor, + false, + fileMetadataLength); + } + + // Encrypted file with encrypted footer + if (null == fileDecryptor) { + throw new ParquetCryptoRuntimeException( + "Trying to read file with encrypted footer. No keys available"); + } + FileCryptoMetaData fileCryptoMetaData = readFileCryptoMetaData(footerBytesStream); + fileDecryptor.setFileCryptoMetaData( + fileCryptoMetaData.getEncryption_algorithm(), + true, + fileCryptoMetaData.getKey_metadata()); + // footer length is required only for signed plaintext footers + return converter.readParquetMetadata( + footerBytesStream, options.getMetadataFilter(), fileDecryptor, true, 0); + } + + protected final ParquetInputStream f; + private final ParquetInputFile file; + private final ParquetReadOptions options; + private final Map paths = new HashMap<>(); + private final FileMetaData fileMetaData; // may be null + private final List blocks; + private final List blockIndexStores; + private final List blockRowRanges; + + // not final. in some cases, this may be lazily loaded for backward-compat. + private ParquetMetadata footer; + + private int currentBlock = 0; + private ColumnChunkPageReadStore currentRowGroup = null; + private DictionaryPageReader nextDictionaryReader = null; + + private InternalFileDecryptor fileDecryptor = null; + + public ParquetFileReader(ParquetInputFile file, ParquetReadOptions options) throws IOException { + this.converter = new ParquetMetadataConverter(options); + this.file = file; + this.f = file.newStream(); + this.options = options; + try { + this.footer = readFooter(file, options, f, converter); + } catch (Exception e) { + // In case that reading footer throws an exception in the constructor, the new stream + // should be closed. Otherwise, there's no way to close this outside. + f.close(); + throw e; + } + this.fileMetaData = footer.getFileMetaData(); + this.fileDecryptor = + fileMetaData.getFileDecryptor(); // must be called before filterRowGroups! + if (null != fileDecryptor && fileDecryptor.plaintextFile()) { + this.fileDecryptor = null; // Plaintext file. No need in decryptor + } + + try { + this.blocks = filterRowGroups(footer.getBlocks()); + } catch (Exception e) { + // In case that filterRowGroups throws an exception in the constructor, the new stream + // should be closed. Otherwise, there's no way to close this outside. + f.close(); + throw e; + } + this.blockIndexStores = listWithNulls(this.blocks.size()); + this.blockRowRanges = listWithNulls(this.blocks.size()); + for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) { + paths.put(ColumnPath.get(col.getPath()), col); + } + this.crc = options.usePageChecksumVerification() ? new CRC32() : null; + } + + private static List listWithNulls(int size) { + return new ArrayList<>(Collections.nCopies(size, null)); + } + + public ParquetMetadata getFooter() { + if (footer == null) { + try { + // don't read the row groups because this.blocks is always set + this.footer = readFooter(file, options, f, converter); + } catch (IOException e) { + throw new ParquetDecodingException("Unable to read file footer", e); + } + } + return footer; + } + + public FileMetaData getFileMetaData() { + if (fileMetaData != null) { + return fileMetaData; + } + return getFooter().getFileMetaData(); + } + + public long getRecordCount() { + long total = 0L; + for (BlockMetaData block : blocks) { + total += block.getRowCount(); + } + return total; + } + + public long getFilteredRecordCount() { + if (!options.useColumnIndexFilter() + || !FilterCompat.isFilteringRequired(options.getRecordFilter())) { + return getRecordCount(); + } + long total = 0L; + for (int i = 0, n = blocks.size(); i < n; ++i) { + total += getRowRanges(i).rowCount(); + } + return total; + } + + /** + * @return the path for this file + * @deprecated will be removed in 2.0.0; use {@link #getFile()} instead + */ + @Deprecated + public Path getPath() { + return new Path(file.toString()); + } + + public String getFile() { + return file.toString(); + } + + private List filterRowGroups(List blocks) throws IOException { + FilterCompat.Filter recordFilter = options.getRecordFilter(); + if (FilterCompat.isFilteringRequired(recordFilter)) { + // set up data filters based on configured levels + List levels = new ArrayList<>(); + + if (options.useStatsFilter()) { + levels.add(STATISTICS); + } + + if (options.useDictionaryFilter()) { + levels.add(DICTIONARY); + } + + if (options.useBloomFilter()) { + levels.add(BLOOMFILTER); + } + return RowGroupFilter.filterRowGroups(levels, recordFilter, blocks, this); + } + + return blocks; + } + + public List getRowGroups() { + return blocks; + } + + public void setRequestedSchema(MessageType projection) { + paths.clear(); + for (ColumnDescriptor col : projection.getColumns()) { + paths.put(ColumnPath.get(col.getPath()), col); + } + } + + public void appendTo(ParquetFileWriter writer) throws IOException { + writer.appendRowGroups(f, blocks, true); + } + + /** + * Reads all the columns requested from the row group at the specified block. + * + * @param blockIndex the index of the requested block + * @throws IOException if an error occurs while reading + * @return the PageReadStore which can provide PageReaders for each column. + */ + public PageReadStore readRowGroup(int blockIndex) throws IOException { + return internalReadRowGroup(blockIndex); + } + + /** + * Reads all the columns requested from the row group at the current file position. + * + * @throws IOException if an error occurs while reading + * @return the PageReadStore which can provide PageReaders for each column. + */ + public PageReadStore readNextRowGroup() throws IOException { + ColumnChunkPageReadStore rowGroup = null; + try { + rowGroup = internalReadRowGroup(currentBlock); + } catch (ParquetEmptyBlockException e) { + LOG.warn("Read empty block at index {} from {}", currentBlock, getFile()); + advanceToNextBlock(); + return readNextRowGroup(); + } + + if (rowGroup == null) { + return null; + } + this.currentRowGroup = rowGroup; + // avoid re-reading bytes the dictionary reader is used after this call + if (nextDictionaryReader != null) { + nextDictionaryReader.setRowGroup(currentRowGroup); + } + + advanceToNextBlock(); + + return currentRowGroup; + } + + private ColumnChunkPageReadStore internalReadRowGroup(int blockIndex) throws IOException { + if (blockIndex < 0 || blockIndex >= blocks.size()) { + return null; + } + BlockMetaData block = blocks.get(blockIndex); + if (block.getRowCount() == 0) { + throw new ParquetEmptyBlockException("Illegal row group of 0 rows"); + } + ColumnChunkPageReadStore rowGroup = + new ColumnChunkPageReadStore(block.getRowCount(), block.getRowIndexOffset()); + // prepare the list of consecutive parts to read them in one scan + List allParts = new ArrayList(); + ConsecutivePartList currentParts = null; + for (ColumnChunkMetaData mc : block.getColumns()) { + ColumnPath pathKey = mc.getPath(); + ColumnDescriptor columnDescriptor = paths.get(pathKey); + if (columnDescriptor != null) { + BenchmarkCounter.incrementTotalBytes(mc.getTotalSize()); + long startingPos = mc.getStartingPos(); + // first part or not consecutive => new list + if (currentParts == null || currentParts.endPos() != startingPos) { + currentParts = new ConsecutivePartList(startingPos); + allParts.add(currentParts); + } + currentParts.addChunk( + new ChunkDescriptor(columnDescriptor, mc, startingPos, mc.getTotalSize())); + } + } + // actually read all the chunks + ChunkListBuilder builder = new ChunkListBuilder(block.getRowCount()); + readAllPartsVectoredOrNormal(allParts, builder); + for (Chunk chunk : builder.build()) { + readChunkPages(chunk, block, rowGroup); + } + + return rowGroup; + } + + /** + * Reads all the columns requested from the specified row group. It may skip specific pages + * based on the column indexes according to the actual filter. As the rows are not aligned among + * the pages of the different columns row synchronization might be required. See the + * documentation of the class SynchronizingColumnReader for details. + * + * @param blockIndex the index of the requested block + * @return the PageReadStore which can provide PageReaders for each column or null if there are + * no rows in this block + * @throws IOException if an error occurs while reading + */ + public PageReadStore readFilteredRowGroup(int blockIndex) throws IOException { + if (blockIndex < 0 || blockIndex >= blocks.size()) { + return null; + } + + // Filtering not required -> fall back to the non-filtering path + if (!options.useColumnIndexFilter() + || !FilterCompat.isFilteringRequired(options.getRecordFilter())) { + return internalReadRowGroup(blockIndex); + } + + BlockMetaData block = blocks.get(blockIndex); + if (block.getRowCount() == 0) { + throw new ParquetEmptyBlockException("Illegal row group of 0 rows"); + } + + RowRanges rowRanges = getRowRanges(blockIndex); + return readFilteredRowGroup(blockIndex, rowRanges); + } + + /** + * Reads all the columns requested from the specified row group. It may skip specific pages + * based on the {@code rowRanges} passed in. As the rows are not aligned among the pages of the + * different columns row synchronization might be required. See the documentation of the class + * SynchronizingColumnReader for details. + * + * @param blockIndex the index of the requested block + * @param rowRanges the row ranges to be read from the requested block + * @return the PageReadStore which can provide PageReaders for each column or null if there are + * no rows in this block + * @throws IOException if an error occurs while reading + * @throws IllegalArgumentException if the {@code blockIndex} is invalid or the {@code + * rowRanges} is null + */ + public ColumnChunkPageReadStore readFilteredRowGroup(int blockIndex, RowRanges rowRanges) + throws IOException { + if (blockIndex < 0 || blockIndex >= blocks.size()) { + throw new IllegalArgumentException( + String.format( + "Invalid block index %s, the valid block index range are: " + + "[%s, %s]", + blockIndex, 0, blocks.size() - 1)); + } + + if (Objects.isNull(rowRanges)) { + throw new IllegalArgumentException("RowRanges must not be null"); + } + + BlockMetaData block = blocks.get(blockIndex); + if (block.getRowCount() == 0L) { + return null; + } + + long rowCount = rowRanges.rowCount(); + if (rowCount == 0) { + // There are no matching rows -> returning null + return null; + } + + if (rowCount == block.getRowCount()) { + // All rows are matching -> fall back to the non-filtering path + return internalReadRowGroup(blockIndex); + } + + return internalReadFilteredRowGroup(block, rowRanges, getColumnIndexStore(blockIndex)); + } + + /** + * Read data in all parts via either vectored IO or serial IO. + * + * @param allParts all parts to be read. + * @param builder used to build chunk list to read the pages for the different columns. + * @throws IOException any IOE. + */ + private void readAllPartsVectoredOrNormal( + List allParts, ChunkListBuilder builder) throws IOException { + if (shouldUseVectoredIo()) { + try { + readVectored(allParts, builder); + return; + } catch (IllegalArgumentException | UnsupportedOperationException e) { + // Either the arguments are wrong or somehow this is being invoked against + // a hadoop release which doesn't have the API and yet somehow it got here. + LOG.warn("readVectored() failed; falling back to normal IO against {}", f, e); + } + } + for (ConsecutivePartList consecutiveChunks : allParts) { + consecutiveChunks.readAll(f, builder); + } + } + + /** Should the read use vectored IO. */ + private boolean shouldUseVectoredIo() { + return f.in() instanceof VectoredReadable; + } + + /** + * Read all parts through vectored IO. + * + *

The API is available in recent hadoop builds for all implementations of + * PositionedReadable; the default implementation simply does a sequence of reads at different + * offsets. + * + *

If directly implemented by a Filesystem then it is likely to be a more efficient operation + * such as a scatter-gather read (native IO) or set of parallel GET requests against an object + * store. + * + * @param allParts all parts to be read. + * @param builder used to build chunk list to read the pages for the different columns. + * @throws IOException any IOE. + * @throws IllegalArgumentException arguments are invalid. + * @throws UnsupportedOperationException if the filesystem does not support vectored IO. + */ + @SuppressWarnings("checkstyle:JavadocParagraph") + private void readVectored(List allParts, ChunkListBuilder builder) + throws IOException { + List ranges = new ArrayList<>(allParts.size()); + long totalSize = 0; + for (ConsecutivePartList consecutiveChunks : allParts) { + final long len = consecutiveChunks.length; + Preconditions.checkArgument( + len < Integer.MAX_VALUE, + "Invalid length %s for vectored read operation. It must be less than max integer value.", + len); + ranges.add(FileRange.createFileRange(consecutiveChunks.offset, (int) len)); + totalSize += len; + } + LOG.debug( + "Reading {} bytes of data with vectored IO in {} ranges", totalSize, ranges.size()); + // Request a vectored read; + ((VectoredReadable) f.in()).readVectored(ranges); + int k = 0; + for (ConsecutivePartList consecutivePart : allParts) { + FileRange currRange = ranges.get(k++); + consecutivePart.readFromVectoredRange(currRange, builder); + } + } + + /** + * Reads all the columns requested from the row group at the current file position. It may skip + * specific pages based on the column indexes according to the actual filter. As the rows are + * not aligned among the pages of the different columns row synchronization might be required. + * See the documentation of the class SynchronizingColumnReader for details. + * + * @return the PageReadStore which can provide PageReaders for each column + * @throws IOException if an error occurs while reading + */ + public PageReadStore readNextFilteredRowGroup() throws IOException { + if (currentBlock == blocks.size()) { + return null; + } + // Filtering not required -> fall back to the non-filtering path + if (!options.useColumnIndexFilter() + || !FilterCompat.isFilteringRequired(options.getRecordFilter())) { + return readNextRowGroup(); + } + BlockMetaData block = blocks.get(currentBlock); + if (block.getRowCount() == 0L) { + LOG.warn("Read empty block at index {} from {}", currentBlock, getFile()); + // Skip the empty block + advanceToNextBlock(); + return readNextFilteredRowGroup(); + } + RowRanges rowRanges = getRowRanges(currentBlock); + long rowCount = rowRanges.rowCount(); + if (rowCount == 0) { + // There are no matching rows -> skipping this row-group + advanceToNextBlock(); + return readNextFilteredRowGroup(); + } + if (rowCount == block.getRowCount()) { + // All rows are matching -> fall back to the non-filtering path + return readNextRowGroup(); + } + + this.currentRowGroup = + internalReadFilteredRowGroup(block, rowRanges, getColumnIndexStore(currentBlock)); + + // avoid re-reading bytes the dictionary reader is used after this call + if (nextDictionaryReader != null) { + nextDictionaryReader.setRowGroup(currentRowGroup); + } + + advanceToNextBlock(); + + return this.currentRowGroup; + } + + private ColumnChunkPageReadStore internalReadFilteredRowGroup( + BlockMetaData block, RowRanges rowRanges, ColumnIndexStore ciStore) throws IOException { + ColumnChunkPageReadStore rowGroup = + new ColumnChunkPageReadStore(rowRanges, block.getRowIndexOffset()); + // prepare the list of consecutive parts to read them in one scan + ChunkListBuilder builder = new ChunkListBuilder(block.getRowCount()); + List allParts = new ArrayList<>(); + ConsecutivePartList currentParts = null; + for (ColumnChunkMetaData mc : block.getColumns()) { + ColumnPath pathKey = mc.getPath(); + ColumnDescriptor columnDescriptor = paths.get(pathKey); + if (columnDescriptor != null) { + OffsetIndex offsetIndex = ciStore.getOffsetIndex(mc.getPath()); + + OffsetIndex filteredOffsetIndex = + filterOffsetIndex(offsetIndex, rowRanges, block.getRowCount()); + for (OffsetRange range : + calculateOffsetRanges(filteredOffsetIndex, mc, offsetIndex.getOffset(0))) { + BenchmarkCounter.incrementTotalBytes(range.getLength()); + long startingPos = range.getOffset(); + // first part or not consecutive => new list + if (currentParts == null || currentParts.endPos() != startingPos) { + currentParts = new ConsecutivePartList(startingPos); + allParts.add(currentParts); + } + ChunkDescriptor chunkDescriptor = + new ChunkDescriptor( + columnDescriptor, mc, startingPos, range.getLength()); + currentParts.addChunk(chunkDescriptor); + builder.setOffsetIndex(chunkDescriptor, filteredOffsetIndex); + } + } + } + // actually read all the chunks + readAllPartsVectoredOrNormal(allParts, builder); + for (Chunk chunk : builder.build()) { + readChunkPages(chunk, block, rowGroup); + } + + return rowGroup; + } + + private void readChunkPages(Chunk chunk, BlockMetaData block, ColumnChunkPageReadStore rowGroup) + throws IOException { + if (null == fileDecryptor || fileDecryptor.plaintextFile()) { + rowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages()); + return; + } + // Encrypted file + ColumnPath columnPath = ColumnPath.get(chunk.descriptor.col.getPath()); + InternalColumnDecryptionSetup columnDecryptionSetup = + fileDecryptor.getColumnSetup(columnPath); + if (!columnDecryptionSetup.isEncrypted()) { // plaintext column + rowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages()); + } else { // encrypted column + rowGroup.addColumn( + chunk.descriptor.col, + chunk.readAllPages( + columnDecryptionSetup.getMetaDataDecryptor(), + columnDecryptionSetup.getDataDecryptor(), + fileDecryptor.getFileAAD(), + block.getOrdinal(), + columnDecryptionSetup.getOrdinal())); + } + } + + public ColumnIndexStore getColumnIndexStore(int blockIndex) { + ColumnIndexStore ciStore = blockIndexStores.get(blockIndex); + if (ciStore == null) { + ciStore = ColumnIndexStoreImpl.create(this, blocks.get(blockIndex), paths.keySet()); + blockIndexStores.set(blockIndex, ciStore); + } + return ciStore; + } + + private RowRanges getRowRanges(int blockIndex) { + assert FilterCompat.isFilteringRequired(options.getRecordFilter()) + : "Should not be invoked if filter is null or NOOP"; + RowRanges rowRanges = blockRowRanges.get(blockIndex); + if (rowRanges == null) { + rowRanges = + ColumnIndexFilter.calculateRowRanges( + options.getRecordFilter(), + getColumnIndexStore(blockIndex), + paths.keySet(), + blocks.get(blockIndex).getRowCount()); + blockRowRanges.set(blockIndex, rowRanges); + } + return rowRanges; + } + + public boolean skipNextRowGroup() { + return advanceToNextBlock(); + } + + private boolean advanceToNextBlock() { + if (currentBlock == blocks.size()) { + return false; + } + + // update the current block and instantiate a dictionary reader for it + ++currentBlock; + this.nextDictionaryReader = null; + + return true; + } + + /** + * Returns a {@link DictionaryPageReadStore} for the row group that would be returned by calling + * {@link #readNextRowGroup()} or skipped by calling {@link #skipNextRowGroup()}. + * + * @return a DictionaryPageReadStore for the next row group + */ + public DictionaryPageReadStore getNextDictionaryReader() { + if (nextDictionaryReader == null) { + this.nextDictionaryReader = getDictionaryReader(currentBlock); + } + return nextDictionaryReader; + } + + public DictionaryPageReader getDictionaryReader(int blockIndex) { + if (blockIndex < 0 || blockIndex >= blocks.size()) { + return null; + } + return new DictionaryPageReader(this, blocks.get(blockIndex)); + } + + public DictionaryPageReader getDictionaryReader(BlockMetaData block) { + return new DictionaryPageReader(this, block); + } + + /** + * Reads and decompresses a dictionary page for the given column chunk. + * + *

Returns null if the given column chunk has no dictionary page. + * + * @param meta a column's ColumnChunkMetaData to read the dictionary from + * @return an uncompressed DictionaryPage or null + * @throws IOException if there is an error while reading the dictionary + */ + DictionaryPage readDictionary(ColumnChunkMetaData meta) throws IOException { + if (!meta.hasDictionaryPage()) { + return null; + } + + // TODO: this should use getDictionaryPageOffset() but it isn't reliable. + if (f.getPos() != meta.getStartingPos()) { + f.seek(meta.getStartingPos()); + } + + boolean encryptedColumn = false; + InternalColumnDecryptionSetup columnDecryptionSetup = null; + byte[] dictionaryPageAAD = null; + BlockCipher.Decryptor pageDecryptor = null; + if (null != fileDecryptor && !fileDecryptor.plaintextFile()) { + columnDecryptionSetup = fileDecryptor.getColumnSetup(meta.getPath()); + if (columnDecryptionSetup.isEncrypted()) { + encryptedColumn = true; + } + } + + PageHeader pageHeader; + if (!encryptedColumn) { + pageHeader = Util.readPageHeader(f); + } else { + byte[] dictionaryPageHeaderAAD = + AesCipher.createModuleAAD( + fileDecryptor.getFileAAD(), + ModuleType.DictionaryPageHeader, + meta.getRowGroupOrdinal(), + columnDecryptionSetup.getOrdinal(), + -1); + pageHeader = + Util.readPageHeader( + f, + columnDecryptionSetup.getMetaDataDecryptor(), + dictionaryPageHeaderAAD); + dictionaryPageAAD = + AesCipher.createModuleAAD( + fileDecryptor.getFileAAD(), + ModuleType.DictionaryPage, + meta.getRowGroupOrdinal(), + columnDecryptionSetup.getOrdinal(), + -1); + pageDecryptor = columnDecryptionSetup.getDataDecryptor(); + } + + if (!pageHeader.isSetDictionary_page_header()) { + return null; // TODO: should this complain? + } + + DictionaryPage compressedPage = + readCompressedDictionary(pageHeader, f, pageDecryptor, dictionaryPageAAD); + BytesInputDecompressor decompressor = + options.getCodecFactory().getDecompressor(meta.getCodec()); + + return new DictionaryPage( + decompressor.decompress( + compressedPage.getBytes(), compressedPage.getUncompressedSize()), + compressedPage.getDictionarySize(), + compressedPage.getEncoding()); + } + + private DictionaryPage readCompressedDictionary( + PageHeader pageHeader, + SeekableInputStream fin, + BlockCipher.Decryptor pageDecryptor, + byte[] dictionaryPageAAD) + throws IOException { + DictionaryPageHeader dictHeader = pageHeader.getDictionary_page_header(); + + int uncompressedPageSize = pageHeader.getUncompressed_page_size(); + int compressedPageSize = pageHeader.getCompressed_page_size(); + + byte[] dictPageBytes = new byte[compressedPageSize]; + fin.readFully(dictPageBytes); + + BytesInput bin = BytesInput.from(dictPageBytes); + + if (null != pageDecryptor) { + bin = BytesInput.from(pageDecryptor.decrypt(bin.toByteArray(), dictionaryPageAAD)); + } + + return new DictionaryPage( + bin, + uncompressedPageSize, + dictHeader.getNum_values(), + converter.getEncoding(dictHeader.getEncoding())); + } + + public BloomFilterReader getBloomFilterDataReader(int blockIndex) { + if (blockIndex < 0 || blockIndex >= blocks.size()) { + return null; + } + return new BloomFilterReader(this, blocks.get(blockIndex)); + } + + public BloomFilterReader getBloomFilterDataReader(BlockMetaData block) { + return new BloomFilterReader(this, block); + } + + /** + * Reads Bloom filter data for the given column chunk. + * + * @param meta a column's ColumnChunkMetaData to read the dictionary from + * @return an BloomFilter object. + * @throws IOException if there is an error while reading the Bloom filter. + */ + public BloomFilter readBloomFilter(ColumnChunkMetaData meta) throws IOException { + long bloomFilterOffset = meta.getBloomFilterOffset(); + if (bloomFilterOffset < 0) { + return null; + } + + // Prepare to decrypt Bloom filter (for encrypted columns) + BlockCipher.Decryptor bloomFilterDecryptor = null; + byte[] bloomFilterHeaderAAD = null; + byte[] bloomFilterBitsetAAD = null; + if (null != fileDecryptor && !fileDecryptor.plaintextFile()) { + InternalColumnDecryptionSetup columnDecryptionSetup = + fileDecryptor.getColumnSetup(meta.getPath()); + if (columnDecryptionSetup.isEncrypted()) { + bloomFilterDecryptor = columnDecryptionSetup.getMetaDataDecryptor(); + bloomFilterHeaderAAD = + AesCipher.createModuleAAD( + fileDecryptor.getFileAAD(), + ModuleType.BloomFilterHeader, + meta.getRowGroupOrdinal(), + columnDecryptionSetup.getOrdinal(), + -1); + bloomFilterBitsetAAD = + AesCipher.createModuleAAD( + fileDecryptor.getFileAAD(), + ModuleType.BloomFilterBitset, + meta.getRowGroupOrdinal(), + columnDecryptionSetup.getOrdinal(), + -1); + } + } + + // Read Bloom filter data header. + f.seek(bloomFilterOffset); + BloomFilterHeader bloomFilterHeader; + try { + bloomFilterHeader = + Util.readBloomFilterHeader(f, bloomFilterDecryptor, bloomFilterHeaderAAD); + } catch (IOException e) { + LOG.warn("read no bloom filter"); + return null; + } + + int numBytes = bloomFilterHeader.getNumBytes(); + if (numBytes <= 0 || numBytes > BlockSplitBloomFilter.UPPER_BOUND_BYTES) { + LOG.warn( + "the read bloom filter size is wrong, size is {}", + bloomFilterHeader.getNumBytes()); + return null; + } + + if (!bloomFilterHeader.getHash().isSetXXHASH() + || !bloomFilterHeader.getAlgorithm().isSetBLOCK() + || !bloomFilterHeader.getCompression().isSetUNCOMPRESSED()) { + LOG.warn( + "the read bloom filter is not supported yet, algorithm = {}, hash = {}, compression = {}", + bloomFilterHeader.getAlgorithm(), + bloomFilterHeader.getHash(), + bloomFilterHeader.getCompression()); + return null; + } + + byte[] bitset; + if (null == bloomFilterDecryptor) { + bitset = new byte[numBytes]; + f.readFully(bitset); + } else { + bitset = bloomFilterDecryptor.decrypt(f, bloomFilterBitsetAAD); + if (bitset.length != numBytes) { + throw new ParquetCryptoRuntimeException( + "Wrong length of decrypted bloom filter bitset"); + } + } + return new BlockSplitBloomFilter(bitset); + } + + /** + * @param column the column chunk which the column index is to be returned for + * @return the column index for the specified column chunk or {@code null} if there is no index + * @throws IOException if any I/O error occurs during reading the file + */ + @Private + public ColumnIndex readColumnIndex(ColumnChunkMetaData column) throws IOException { + IndexReference ref = column.getColumnIndexReference(); + if (ref == null) { + return null; + } + f.seek(ref.getOffset()); + + BlockCipher.Decryptor columnIndexDecryptor = null; + byte[] columnIndexAAD = null; + if (null != fileDecryptor && !fileDecryptor.plaintextFile()) { + InternalColumnDecryptionSetup columnDecryptionSetup = + fileDecryptor.getColumnSetup(column.getPath()); + if (columnDecryptionSetup.isEncrypted()) { + columnIndexDecryptor = columnDecryptionSetup.getMetaDataDecryptor(); + columnIndexAAD = + AesCipher.createModuleAAD( + fileDecryptor.getFileAAD(), + ModuleType.ColumnIndex, + column.getRowGroupOrdinal(), + columnDecryptionSetup.getOrdinal(), + -1); + } + } + return ParquetMetadataConverter.fromParquetColumnIndex( + column.getPrimitiveType(), + Util.readColumnIndex(f, columnIndexDecryptor, columnIndexAAD)); + } + + /** + * @param column the column chunk which the offset index is to be returned for + * @return the offset index for the specified column chunk or {@code null} if there is no index + * @throws IOException if any I/O error occurs during reading the file + */ + @Private + public OffsetIndex readOffsetIndex(ColumnChunkMetaData column) throws IOException { + IndexReference ref = column.getOffsetIndexReference(); + if (ref == null) { + return null; + } + f.seek(ref.getOffset()); + + BlockCipher.Decryptor offsetIndexDecryptor = null; + byte[] offsetIndexAAD = null; + if (null != fileDecryptor && !fileDecryptor.plaintextFile()) { + InternalColumnDecryptionSetup columnDecryptionSetup = + fileDecryptor.getColumnSetup(column.getPath()); + if (columnDecryptionSetup.isEncrypted()) { + offsetIndexDecryptor = columnDecryptionSetup.getMetaDataDecryptor(); + offsetIndexAAD = + AesCipher.createModuleAAD( + fileDecryptor.getFileAAD(), + ModuleType.OffsetIndex, + column.getRowGroupOrdinal(), + columnDecryptionSetup.getOrdinal(), + -1); + } + } + return ParquetMetadataConverter.fromParquetOffsetIndex( + Util.readOffsetIndex(f, offsetIndexDecryptor, offsetIndexAAD)); + } + + @Override + public void close() throws IOException { + try { + if (f != null) { + f.close(); + } + } finally { + options.getCodecFactory().release(); + } + } + + /* + * Builder to concatenate the buffers of the discontinuous parts for the same column. These parts are generated as a + * result of the column-index based filtering when some pages might be skipped at reading. + */ + private class ChunkListBuilder { + private class ChunkData { + final List buffers = new ArrayList<>(); + OffsetIndex offsetIndex; + } + + private final Map map = new HashMap<>(); + private ChunkDescriptor lastDescriptor; + private final long rowCount; + private SeekableInputStream f; + + public ChunkListBuilder(long rowCount) { + this.rowCount = rowCount; + } + + void add(ChunkDescriptor descriptor, List buffers, SeekableInputStream f) { + map.computeIfAbsent(descriptor, d -> new ChunkData()).buffers.addAll(buffers); + lastDescriptor = descriptor; + this.f = f; + } + + void setOffsetIndex(ChunkDescriptor descriptor, OffsetIndex offsetIndex) { + map.computeIfAbsent(descriptor, d -> new ChunkData()).offsetIndex = offsetIndex; + } + + List build() { + Set> entries = map.entrySet(); + List chunks = new ArrayList<>(entries.size()); + for (Entry entry : entries) { + ChunkDescriptor descriptor = entry.getKey(); + ChunkData data = entry.getValue(); + if (descriptor.equals(lastDescriptor)) { + // because of a bug, the last chunk might be larger than descriptor.size + chunks.add( + new WorkaroundChunk( + lastDescriptor, data.buffers, f, data.offsetIndex, rowCount)); + } else { + chunks.add(new Chunk(descriptor, data.buffers, data.offsetIndex, rowCount)); + } + } + return chunks; + } + } + + /** The data for a column chunk. */ + private class Chunk { + + protected final ChunkDescriptor descriptor; + protected final ByteBufferInputStream stream; + final OffsetIndex offsetIndex; + final long rowCount; + + /** + * @param descriptor descriptor for the chunk + * @param buffers ByteBuffers that contain the chunk + * @param offsetIndex the offset index for this column; might be null + */ + public Chunk( + ChunkDescriptor descriptor, + List buffers, + OffsetIndex offsetIndex, + long rowCount) { + this.descriptor = descriptor; + this.stream = ByteBufferInputStream.wrap(buffers); + this.offsetIndex = offsetIndex; + this.rowCount = rowCount; + } + + protected PageHeader readPageHeader() throws IOException { + return readPageHeader(null, null); + } + + protected PageHeader readPageHeader( + BlockCipher.Decryptor blockDecryptor, byte[] pageHeaderAAD) throws IOException { + return Util.readPageHeader(stream, blockDecryptor, pageHeaderAAD); + } + + /** + * Calculate checksum of input bytes, throw decoding exception if it does not match the + * provided reference crc. + */ + private void verifyCrc(int referenceCrc, byte[] bytes, String exceptionMsg) { + crc.reset(); + crc.update(bytes); + if (crc.getValue() != ((long) referenceCrc & 0xffffffffL)) { + throw new ParquetDecodingException(exceptionMsg); + } + } + + /** + * Read all of the pages in a given column chunk. + * + * @return the list of pages + */ + public ColumnChunkPageReader readAllPages() throws IOException { + return readAllPages(null, null, null, -1, -1); + } + + public ColumnChunkPageReader readAllPages( + BlockCipher.Decryptor headerBlockDecryptor, + BlockCipher.Decryptor pageBlockDecryptor, + byte[] aadPrefix, + int rowGroupOrdinal, + int columnOrdinal) + throws IOException { + List pagesInChunk = new ArrayList<>(); + DictionaryPage dictionaryPage = null; + PrimitiveType type = + getFileMetaData() + .getSchema() + .getType(descriptor.col.getPath()) + .asPrimitiveType(); + long valuesCountReadSoFar = 0L; + int dataPageCountReadSoFar = 0; + byte[] dataPageHeaderAAD = null; + if (null != headerBlockDecryptor) { + dataPageHeaderAAD = + AesCipher.createModuleAAD( + aadPrefix, + ModuleType.DataPageHeader, + rowGroupOrdinal, + columnOrdinal, + getPageOrdinal(dataPageCountReadSoFar)); + } + while (hasMorePages(valuesCountReadSoFar, dataPageCountReadSoFar)) { + byte[] pageHeaderAAD = dataPageHeaderAAD; + if (null != headerBlockDecryptor) { + // Important: this verifies file integrity (makes sure dictionary page had not + // been removed) + if (null == dictionaryPage && descriptor.metadata.hasDictionaryPage()) { + pageHeaderAAD = + AesCipher.createModuleAAD( + aadPrefix, + ModuleType.DictionaryPageHeader, + rowGroupOrdinal, + columnOrdinal, + -1); + } else { + int pageOrdinal = getPageOrdinal(dataPageCountReadSoFar); + AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal); + } + } + PageHeader pageHeader = readPageHeader(headerBlockDecryptor, pageHeaderAAD); + int uncompressedPageSize = pageHeader.getUncompressed_page_size(); + int compressedPageSize = pageHeader.getCompressed_page_size(); + final BytesInput pageBytes; + switch (pageHeader.type) { + case DICTIONARY_PAGE: + // there is only one dictionary page per column chunk + if (dictionaryPage != null) { + throw new ParquetDecodingException( + "more than one dictionary page in column " + descriptor.col); + } + pageBytes = this.readAsBytesInput(compressedPageSize); + if (options.usePageChecksumVerification() && pageHeader.isSetCrc()) { + verifyCrc( + pageHeader.getCrc(), + pageBytes.toByteArray(), + "could not verify dictionary page integrity, CRC checksum verification failed"); + } + DictionaryPageHeader dicHeader = pageHeader.getDictionary_page_header(); + dictionaryPage = + new DictionaryPage( + pageBytes, + uncompressedPageSize, + dicHeader.getNum_values(), + converter.getEncoding(dicHeader.getEncoding())); + // Copy crc to new page, used for testing + if (pageHeader.isSetCrc()) { + dictionaryPage.setCrc(pageHeader.getCrc()); + } + break; + case DATA_PAGE: + DataPageHeader dataHeaderV1 = pageHeader.getData_page_header(); + pageBytes = this.readAsBytesInput(compressedPageSize); + if (options.usePageChecksumVerification() && pageHeader.isSetCrc()) { + verifyCrc( + pageHeader.getCrc(), + pageBytes.toByteArray(), + "could not verify page integrity, CRC checksum verification failed"); + } + DataPageV1 dataPageV1 = + new DataPageV1( + pageBytes, + dataHeaderV1.getNum_values(), + uncompressedPageSize, + converter.fromParquetStatistics( + getFileMetaData().getCreatedBy(), + dataHeaderV1.getStatistics(), + type), + converter.getEncoding( + dataHeaderV1.getRepetition_level_encoding()), + converter.getEncoding( + dataHeaderV1.getDefinition_level_encoding()), + converter.getEncoding(dataHeaderV1.getEncoding())); + // Copy crc to new page, used for testing + if (pageHeader.isSetCrc()) { + dataPageV1.setCrc(pageHeader.getCrc()); + } + pagesInChunk.add(dataPageV1); + valuesCountReadSoFar += dataHeaderV1.getNum_values(); + ++dataPageCountReadSoFar; + break; + case DATA_PAGE_V2: + DataPageHeaderV2 dataHeaderV2 = pageHeader.getData_page_header_v2(); + int dataSize = + compressedPageSize + - dataHeaderV2.getRepetition_levels_byte_length() + - dataHeaderV2.getDefinition_levels_byte_length(); + pagesInChunk.add( + new DataPageV2( + dataHeaderV2.getNum_rows(), + dataHeaderV2.getNum_nulls(), + dataHeaderV2.getNum_values(), + this.readAsBytesInput( + dataHeaderV2.getRepetition_levels_byte_length()), + this.readAsBytesInput( + dataHeaderV2.getDefinition_levels_byte_length()), + converter.getEncoding(dataHeaderV2.getEncoding()), + this.readAsBytesInput(dataSize), + uncompressedPageSize, + converter.fromParquetStatistics( + getFileMetaData().getCreatedBy(), + dataHeaderV2.getStatistics(), + type), + dataHeaderV2.isIs_compressed())); + valuesCountReadSoFar += dataHeaderV2.getNum_values(); + ++dataPageCountReadSoFar; + break; + default: + LOG.debug( + "skipping page of type {} of size {}", + pageHeader.getType(), + compressedPageSize); + stream.skipFully(compressedPageSize); + break; + } + } + if (offsetIndex == null + && valuesCountReadSoFar != descriptor.metadata.getValueCount()) { + // Would be nice to have a CorruptParquetFileException or something as a subclass? + throw new IOException( + "Expected " + + descriptor.metadata.getValueCount() + + " values in column chunk at " + + getPath() + + " offset " + + descriptor.metadata.getFirstDataPageOffset() + + " but got " + + valuesCountReadSoFar + + " values instead over " + + pagesInChunk.size() + + " pages ending at file offset " + + (descriptor.fileOffset + stream.position())); + } + BytesInputDecompressor decompressor = + options.getCodecFactory().getDecompressor(descriptor.metadata.getCodec()); + return new ColumnChunkPageReader( + decompressor, + pagesInChunk, + dictionaryPage, + offsetIndex, + rowCount, + pageBlockDecryptor, + aadPrefix, + rowGroupOrdinal, + columnOrdinal); + } + + private boolean hasMorePages(long valuesCountReadSoFar, int dataPageCountReadSoFar) { + return offsetIndex == null + ? valuesCountReadSoFar < descriptor.metadata.getValueCount() + : dataPageCountReadSoFar < offsetIndex.getPageCount(); + } + + private int getPageOrdinal(int dataPageCountReadSoFar) { + if (null == offsetIndex) { + return dataPageCountReadSoFar; + } + + return offsetIndex.getPageOrdinal(dataPageCountReadSoFar); + } + + /** + * @param size the size of the page + * @return the page + * @throws IOException if there is an error while reading from the file stream + */ + public BytesInput readAsBytesInput(int size) throws IOException { + return BytesInput.from(stream.sliceBuffers(size)); + } + } + + /** deals with a now fixed bug where compressedLength was missing a few bytes. */ + private class WorkaroundChunk extends Chunk { + + private final SeekableInputStream f; + + /** + * @param descriptor the descriptor of the chunk + * @param f the file stream positioned at the end of this chunk + */ + private WorkaroundChunk( + ChunkDescriptor descriptor, + List buffers, + SeekableInputStream f, + OffsetIndex offsetIndex, + long rowCount) { + super(descriptor, buffers, offsetIndex, rowCount); + this.f = f; + } + + protected PageHeader readPageHeader() throws IOException { + PageHeader pageHeader; + stream.mark(8192); // headers should not be larger than 8k + try { + pageHeader = Util.readPageHeader(stream); + } catch (IOException e) { + // this is to workaround a bug where the compressedLength + // of the chunk is missing the size of the header of the dictionary + // to allow reading older files (using dictionary) we need this. + // usually 13 to 19 bytes are missing + // if the last page is smaller than this, the page header itself is truncated in the + // buffer. + stream.reset(); // resetting the buffer to the position before we got the error + LOG.info("completing the column chunk to read the page header"); + pageHeader = + Util.readPageHeader( + new SequenceInputStream( + stream, + f)); // trying again from the buffer + remainder of the + // stream. + } + return pageHeader; + } + + public BytesInput readAsBytesInput(int size) throws IOException { + int available = stream.available(); + if (size > available) { + // this is to workaround a bug where the compressedLength + // of the chunk is missing the size of the header of the dictionary + // to allow reading older files (using dictionary) we need this. + // usually 13 to 19 bytes are missing + int missingBytes = size - available; + LOG.info("completed the column chunk with {} bytes", missingBytes); + + List streamBuffers = stream.sliceBuffers(available); + + ByteBuffer lastBuffer = ByteBuffer.allocate(missingBytes); + f.readFully(lastBuffer); + + List buffers = new ArrayList<>(streamBuffers.size() + 1); + buffers.addAll(streamBuffers); + buffers.add(lastBuffer); + + return BytesInput.from(buffers); + } + + return super.readAsBytesInput(size); + } + } + + /** Information needed to read a column chunk or a part of it. */ + private static class ChunkDescriptor { + + private final ColumnDescriptor col; + private final ColumnChunkMetaData metadata; + private final long fileOffset; + private final long size; + + /** + * @param col column this chunk is part of + * @param metadata metadata for the column + * @param fileOffset offset in the file where this chunk starts + * @param size size of the chunk + */ + private ChunkDescriptor( + ColumnDescriptor col, ColumnChunkMetaData metadata, long fileOffset, long size) { + super(); + this.col = col; + this.metadata = metadata; + this.fileOffset = fileOffset; + this.size = size; + } + + @Override + public int hashCode() { + return col.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } else if (obj instanceof ChunkDescriptor) { + return col.equals(((ChunkDescriptor) obj).col); + } else { + return false; + } + } + } + + /** + * Describes a list of consecutive parts to be read at once. A consecutive part may contain + * whole column chunks or only parts of them (some pages). + */ + private class ConsecutivePartList { + + private final long offset; + private long length; + private final List chunks = new ArrayList<>(); + + /** @param offset where the first chunk starts */ + ConsecutivePartList(long offset) { + this.offset = offset; + } + + /** + * adds a chunk to the list. It must be consecutive to the previous chunk + * + * @param descriptor a chunk descriptor + */ + public void addChunk(ChunkDescriptor descriptor) { + chunks.add(descriptor); + length += descriptor.size; + } + + /** + * @param f file to read the chunks from + * @param builder used to build chunk list to read the pages for the different columns + * @throws IOException if there is an error while reading from the stream + */ + public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOException { + f.seek(offset); + + int fullAllocations = Math.toIntExact(length / options.getMaxAllocationSize()); + int lastAllocationSize = Math.toIntExact(length % options.getMaxAllocationSize()); + + int numAllocations = fullAllocations + (lastAllocationSize > 0 ? 1 : 0); + List buffers = new ArrayList<>(numAllocations); + + for (int i = 0; i < fullAllocations; i += 1) { + buffers.add(options.getAllocator().allocate(options.getMaxAllocationSize())); + } + + if (lastAllocationSize > 0) { + buffers.add(options.getAllocator().allocate(lastAllocationSize)); + } + + for (ByteBuffer buffer : buffers) { + f.readFully(buffer); + buffer.flip(); + } + + // report in a counter the data we just scanned + BenchmarkCounter.incrementBytesRead(length); + ByteBufferInputStream stream = ByteBufferInputStream.wrap(buffers); + for (final ChunkDescriptor descriptor : chunks) { + builder.add(descriptor, stream.sliceBuffers(descriptor.size), f); + } + } + + /** + * Populate data in a parquet file range from a vectored range. + * + * @param currRange range to populated. + * @param builder used to build chunk list to read the pages for the different columns. + * @throws IOException if there is an error while reading from the stream, including a + * timeout. + */ + public void readFromVectoredRange(FileRange currRange, ChunkListBuilder builder) + throws IOException { + byte[] buffer; + try { + buffer = currRange.getData().get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + + ByteBufferInputStream stream = ByteBufferInputStream.wrap(ByteBuffer.wrap(buffer)); + for (ChunkDescriptor descriptor : chunks) { + builder.add(descriptor, stream.sliceBuffers(descriptor.size), f); + } + } + + /** @return the position following the last byte of these chunks */ + public long endPos() { + return offset + length; + } + } +} diff --git a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java new file mode 100644 index 000000000000..657745a392cb --- /dev/null +++ b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java @@ -0,0 +1,475 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.parquet.hadoop; + +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.column.ParquetProperties.WriterVersion; +import org.apache.parquet.crypto.FileEncryptionProperties; +import org.apache.parquet.hadoop.api.WriteSupport; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.io.OutputFile; +import org.apache.parquet.schema.MessageType; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Write records to a Parquet file. + * + *

NOTE: The file was copied and modified to reduce hadoop dependencies. + */ +public class ParquetWriter implements Closeable { + + public static final int DEFAULT_BLOCK_SIZE = 128 * 1024 * 1024; + public static final int DEFAULT_PAGE_SIZE = ParquetProperties.DEFAULT_PAGE_SIZE; + public static final CompressionCodecName DEFAULT_COMPRESSION_CODEC_NAME = + CompressionCodecName.UNCOMPRESSED; + public static final boolean DEFAULT_IS_DICTIONARY_ENABLED = + ParquetProperties.DEFAULT_IS_DICTIONARY_ENABLED; + public static final boolean DEFAULT_IS_VALIDATING_ENABLED = false; + public static final WriterVersion DEFAULT_WRITER_VERSION = + ParquetProperties.DEFAULT_WRITER_VERSION; + + public static final String OBJECT_MODEL_NAME_PROP = "writer.model.name"; + + // max size (bytes) to write as padding and the min size of a row group + public static final int MAX_PADDING_SIZE_DEFAULT = 8 * 1024 * 1024; // 8MB + + private final InternalParquetRecordWriter writer; + private final CodecFactory codecFactory; + + ParquetWriter( + OutputFile file, + ParquetFileWriter.Mode mode, + WriteSupport writeSupport, + CompressionCodecName compressionCodecName, + long rowGroupSize, + boolean validating, + Configuration conf, + int maxPaddingSize, + ParquetProperties encodingProps) + throws IOException { + + WriteSupport.WriteContext writeContext = writeSupport.init(conf); + MessageType schema = writeContext.getSchema(); + + ParquetFileWriter fileWriter = + new ParquetFileWriter( + file, + schema, + mode, + rowGroupSize, + maxPaddingSize, + encodingProps.getColumnIndexTruncateLength(), + encodingProps.getStatisticsTruncateLength(), + encodingProps.getPageWriteChecksumEnabled(), + (FileEncryptionProperties) null); + fileWriter.start(); + + this.codecFactory = new CodecFactory(conf, encodingProps.getPageSizeThreshold()); + CodecFactory.BytesCompressor compressor = codecFactory.getCompressor(compressionCodecName); + this.writer = + new InternalParquetRecordWriter( + fileWriter, + writeSupport, + schema, + writeContext.getExtraMetaData(), + rowGroupSize, + compressor, + validating, + encodingProps); + } + + public void write(T object) throws IOException { + try { + writer.write(object); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + @Override + public void close() throws IOException { + try { + writer.close(); + } catch (InterruptedException e) { + throw new IOException(e); + } finally { + // release after the writer closes in case it is used for a last flush + codecFactory.release(); + } + } + + /** @return the ParquetMetadata written to the (closed) file. */ + public ParquetMetadata getFooter() { + return writer.getFooter(); + } + + /** @return the total size of data written to the file and buffered in memory */ + public long getDataSize() { + return writer.getDataSize(); + } + + /** + * An abstract builder class for ParquetWriter instances. + * + *

Object models should extend this builder to provide writer configuration options. + * + * @param The type of objects written by the constructed ParquetWriter. + * @param The type of this builder that is returned by builder methods + */ + public abstract static class Builder> { + private OutputFile file = null; + private Configuration conf = new Configuration(); + private ParquetFileWriter.Mode mode; + private CompressionCodecName codecName = DEFAULT_COMPRESSION_CODEC_NAME; + private long rowGroupSize = DEFAULT_BLOCK_SIZE; + private int maxPaddingSize = MAX_PADDING_SIZE_DEFAULT; + private boolean enableValidation = DEFAULT_IS_VALIDATING_ENABLED; + private ParquetProperties.Builder encodingPropsBuilder = ParquetProperties.builder(); + + protected Builder(OutputFile path) { + this.file = path; + } + + /** @return this as the correct subclass of ParquetWriter.Builder. */ + protected abstract SELF self(); + + /** + * @param conf a configuration + * @return an appropriate WriteSupport for the object model. + */ + protected abstract WriteSupport getWriteSupport(Configuration conf); + + /** + * Set the {@link Configuration} used by the constructed writer. + * + * @param conf a {@code Configuration} + * @return this builder for method chaining. + */ + public SELF withConf(Configuration conf) { + this.conf = conf; + return self(); + } + + /** + * Set the {@link ParquetFileWriter.Mode write mode} used when creating the backing file for + * this writer. + * + * @param mode a {@code ParquetFileWriter.Mode} + * @return this builder for method chaining. + */ + public SELF withWriteMode(ParquetFileWriter.Mode mode) { + this.mode = mode; + return self(); + } + + /** + * Set the {@link CompressionCodecName compression codec} used by the constructed writer. + * + * @param codecName a {@code CompressionCodecName} + * @return this builder for method chaining. + */ + public SELF withCompressionCodec(CompressionCodecName codecName) { + this.codecName = codecName; + return self(); + } + + /** + * Set the Parquet format row group size used by the constructed writer. + * + * @param rowGroupSize an integer size in bytes + * @return this builder for method chaining. + * @deprecated Use {@link #withRowGroupSize(long)} instead + */ + @Deprecated + public SELF withRowGroupSize(int rowGroupSize) { + return withRowGroupSize((long) rowGroupSize); + } + + /** + * Set the Parquet format row group size used by the constructed writer. + * + * @param rowGroupSize an integer size in bytes + * @return this builder for method chaining. + */ + public SELF withRowGroupSize(long rowGroupSize) { + this.rowGroupSize = rowGroupSize; + return self(); + } + + /** + * Set the Parquet format page size used by the constructed writer. + * + * @param pageSize an integer size in bytes + * @return this builder for method chaining. + */ + public SELF withPageSize(int pageSize) { + encodingPropsBuilder.withPageSize(pageSize); + return self(); + } + + /** + * Sets the Parquet format page row count limit used by the constructed writer. + * + * @param rowCount limit for the number of rows stored in a page + * @return this builder for method chaining + */ + public SELF withPageRowCountLimit(int rowCount) { + encodingPropsBuilder.withPageRowCountLimit(rowCount); + return self(); + } + + /** + * Set the Parquet format dictionary page size used by the constructed writer. + * + * @param dictionaryPageSize an integer size in bytes + * @return this builder for method chaining. + */ + public SELF withDictionaryPageSize(int dictionaryPageSize) { + encodingPropsBuilder.withDictionaryPageSize(dictionaryPageSize); + return self(); + } + + /** + * Set the maximum amount of padding, in bytes, that will be used to align row groups with + * blocks in the underlying filesystem. If the underlying filesystem is not a block + * filesystem like HDFS, this has no effect. + * + * @param maxPaddingSize an integer size in bytes + * @return this builder for method chaining. + */ + public SELF withMaxPaddingSize(int maxPaddingSize) { + this.maxPaddingSize = maxPaddingSize; + return self(); + } + + /** + * Enables dictionary encoding for the constructed writer. + * + * @return this builder for method chaining. + */ + public SELF enableDictionaryEncoding() { + encodingPropsBuilder.withDictionaryEncoding(true); + return self(); + } + + /** + * Enable or disable dictionary encoding for the constructed writer. + * + * @param enableDictionary whether dictionary encoding should be enabled + * @return this builder for method chaining. + */ + public SELF withDictionaryEncoding(boolean enableDictionary) { + encodingPropsBuilder.withDictionaryEncoding(enableDictionary); + return self(); + } + + public SELF withByteStreamSplitEncoding(boolean enableByteStreamSplit) { + encodingPropsBuilder.withByteStreamSplitEncoding(enableByteStreamSplit); + return self(); + } + + /** + * Enable or disable dictionary encoding of the specified column for the constructed writer. + * + * @param columnPath the path of the column (dot-string) + * @param enableDictionary whether dictionary encoding should be enabled + * @return this builder for method chaining. + */ + public SELF withDictionaryEncoding(String columnPath, boolean enableDictionary) { + encodingPropsBuilder.withDictionaryEncoding(columnPath, enableDictionary); + return self(); + } + + /** + * Enables validation for the constructed writer. + * + * @return this builder for method chaining. + */ + public SELF enableValidation() { + this.enableValidation = true; + return self(); + } + + /** + * Enable or disable validation for the constructed writer. + * + * @param enableValidation whether validation should be enabled + * @return this builder for method chaining. + */ + public SELF withValidation(boolean enableValidation) { + this.enableValidation = enableValidation; + return self(); + } + + /** + * Set the {@link WriterVersion format version} used by the constructed writer. + * + * @param version a {@code WriterVersion} + * @return this builder for method chaining. + */ + public SELF withWriterVersion(WriterVersion version) { + encodingPropsBuilder.withWriterVersion(version); + return self(); + } + + /** + * Enables writing page level checksums for the constructed writer. + * + * @return this builder for method chaining. + */ + public SELF enablePageWriteChecksum() { + encodingPropsBuilder.withPageWriteChecksumEnabled(true); + return self(); + } + + /** + * Enables writing page level checksums for the constructed writer. + * + * @param enablePageWriteChecksum whether page checksums should be written out + * @return this builder for method chaining. + */ + public SELF withPageWriteChecksumEnabled(boolean enablePageWriteChecksum) { + encodingPropsBuilder.withPageWriteChecksumEnabled(enablePageWriteChecksum); + return self(); + } + + /** + * Sets the NDV (number of distinct values) for the specified column. + * + * @param columnPath the path of the column (dot-string) + * @param ndv the NDV of the column + * @return this builder for method chaining. + */ + public SELF withBloomFilterNDV(String columnPath, long ndv) { + encodingPropsBuilder.withBloomFilterNDV(columnPath, ndv); + + return self(); + } + + public SELF withBloomFilterFPP(String columnPath, double fpp) { + encodingPropsBuilder.withBloomFilterFPP(columnPath, fpp); + return self(); + } + + /** + * Sets the bloom filter enabled/disabled. + * + * @param enabled whether to write bloom filters + * @return this builder for method chaining + */ + public SELF withBloomFilterEnabled(boolean enabled) { + encodingPropsBuilder.withBloomFilterEnabled(enabled); + return self(); + } + + /** + * Sets the bloom filter enabled/disabled for the specified column. If not set for the + * column specifically the default enabled/disabled state will take place. See {@link + * #withBloomFilterEnabled(boolean)}. + * + * @param columnPath the path of the column (dot-string) + * @param enabled whether to write bloom filter for the column + * @return this builder for method chaining + */ + public SELF withBloomFilterEnabled(String columnPath, boolean enabled) { + encodingPropsBuilder.withBloomFilterEnabled(columnPath, enabled); + return self(); + } + + /** + * Sets the minimum number of rows to write before a page size check is done. + * + * @param min writes at least `min` rows before invoking a page size check + * @return this builder for method chaining + */ + public SELF withMinRowCountForPageSizeCheck(int min) { + encodingPropsBuilder.withMinRowCountForPageSizeCheck(min); + return self(); + } + + /** + * Sets the maximum number of rows to write before a page size check is done. + * + * @param max makes a page size check after `max` rows have been written + * @return this builder for method chaining + */ + public SELF withMaxRowCountForPageSizeCheck(int max) { + encodingPropsBuilder.withMaxRowCountForPageSizeCheck(max); + return self(); + } + + /** + * Sets the length to be used for truncating binary values in a binary column index. + * + * @param length the length to truncate to + * @return this builder for method chaining + */ + public SELF withColumnIndexTruncateLength(int length) { + encodingPropsBuilder.withColumnIndexTruncateLength(length); + return self(); + } + + /** + * Sets the length which the min/max binary values in row groups are truncated to. + * + * @param length the length to truncate to + * @return this builder for method chaining + */ + public SELF withStatisticsTruncateLength(int length) { + encodingPropsBuilder.withStatisticsTruncateLength(length); + return self(); + } + + /** + * Set a property that will be available to the read path. For writers that use a Hadoop + * configuration, this is the recommended way to add configuration values. + * + * @param property a String property name + * @param value a String property value + * @return this builder for method chaining. + */ + public SELF config(String property, String value) { + conf.set(property, value); + return self(); + } + + /** + * Build a {@link ParquetWriter} with the accumulated configuration. + * + * @return a configured {@code ParquetWriter} instance. + * @throws IOException if there is an error while creating the writer + */ + public ParquetWriter build() throws IOException { + return new ParquetWriter<>( + file, + mode, + getWriteSupport(conf), + codecName, + rowGroupSize, + enableValidation, + conf, + maxPaddingSize, + encodingPropsBuilder.build()); + } + } +}