decompressors = new HashMap<>();
-
- @Override
- public BytesInputCompressor getCompressor(CompressionCodecName codecName) {
- return createCompressor(codecName);
- }
-
- @Override
- public BytesInputDecompressor getDecompressor(CompressionCodecName codecName) {
- return decompressors.computeIfAbsent(codecName, this::createDecompressor);
- }
-
- protected BytesInputCompressor createCompressor(CompressionCodecName codecName) {
- switch (codecName) {
- case UNCOMPRESSED:
- return new NoopBytesInputCompressor();
- case SNAPPY:
- return new SnappyBytesInputCompressor();
- default:
- throw new IllegalArgumentException("Unimplemented codec: " + codecName);
- }
- }
-
- protected BytesInputDecompressor createDecompressor(CompressionCodecName codecName) {
- switch (codecName) {
- case UNCOMPRESSED:
- return new NoopBytesInputDecompressor();
- case SNAPPY:
- return new SnappyBytesInputDecompressor();
- default:
- throw new IllegalArgumentException("Unimplemented codec: " + codecName);
- }
- }
-
- @Override
- public void release() {
- for (BytesInputCompressor compressor : compressors.values()) {
- compressor.release();
- }
- compressors.clear();
- for (BytesInputDecompressor decompressor : decompressors.values()) {
- decompressor.release();
- }
- decompressors.clear();
- }
-}
diff --git a/src/main/java/org/apache/parquet/local/ColumnChunkPageReadStore.java b/src/main/java/org/apache/parquet/local/ColumnChunkPageReadStore.java
deleted file mode 100644
index 8b23d75..0000000
--- a/src/main/java/org/apache/parquet/local/ColumnChunkPageReadStore.java
+++ /dev/null
@@ -1,314 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.local;
-
-import org.apache.parquet.bytes.BytesInput;
-import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.page.*;
-import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
-import org.apache.parquet.crypto.AesCipher;
-import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
-import org.apache.parquet.format.BlockCipher;
-import org.apache.parquet.internal.column.columnindex.OffsetIndex;
-import org.apache.parquet.internal.filter2.columnindex.RowRanges;
-import org.apache.parquet.io.ParquetDecodingException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.*;
-
-class ColumnChunkPageReadStore implements PageReadStore, DictionaryPageReadStore {
- private static final Logger LOG = LoggerFactory.getLogger(ColumnChunkPageReadStore.class);
-
- /**
- * PageReader for a single column chunk. A column chunk contains several pages, which are yielded
- * one by one in order.
- *
- * This implementation is provided with a list of pages, each of which is decompressed and
- * passed through.
- */
- static final class ColumnChunkPageReader implements PageReader {
-
- private final BytesInputDecompressor decompressor;
- private final long valueCount;
- private final Queue compressedPages;
- private final DictionaryPage compressedDictionaryPage;
- // null means no page synchronization is required; firstRowIndex will not be returned by the
- // pages
- private final OffsetIndex offsetIndex;
- private final long rowCount;
- private int pageIndex = 0;
-
- private final BlockCipher.Decryptor blockDecryptor;
- private final byte[] dataPageAAD;
- private final byte[] dictionaryPageAAD;
-
- ColumnChunkPageReader(
- BytesInputDecompressor decompressor,
- List compressedPages,
- DictionaryPage compressedDictionaryPage,
- OffsetIndex offsetIndex,
- long rowCount,
- BlockCipher.Decryptor blockDecryptor,
- byte[] fileAAD,
- int rowGroupOrdinal,
- int columnOrdinal) {
- this.decompressor = decompressor;
- this.compressedPages = new ArrayDeque(compressedPages);
- this.compressedDictionaryPage = compressedDictionaryPage;
- long count = 0;
- for (DataPage p : compressedPages) {
- count += p.getValueCount();
- }
- this.valueCount = count;
- this.offsetIndex = offsetIndex;
- this.rowCount = rowCount;
-
- this.blockDecryptor = blockDecryptor;
-
- if (null != blockDecryptor) {
- dataPageAAD =
- AesCipher.createModuleAAD(
- fileAAD, ModuleType.DataPage, rowGroupOrdinal, columnOrdinal, 0);
- dictionaryPageAAD =
- AesCipher.createModuleAAD(
- fileAAD, ModuleType.DictionaryPage, rowGroupOrdinal, columnOrdinal, -1);
- } else {
- dataPageAAD = null;
- dictionaryPageAAD = null;
- }
- }
-
- private int getPageOrdinal(int currentPageIndex) {
- if (null == offsetIndex) {
- return currentPageIndex;
- }
-
- return offsetIndex.getPageOrdinal(currentPageIndex);
- }
-
- @Override
- public long getTotalValueCount() {
- return valueCount;
- }
-
- @Override
- public DataPage readPage() {
- final DataPage compressedPage = compressedPages.poll();
- if (compressedPage == null) {
- return null;
- }
- final int currentPageIndex = pageIndex++;
-
- if (null != blockDecryptor) {
- AesCipher.quickUpdatePageAAD(dataPageAAD, getPageOrdinal(currentPageIndex));
- }
-
- return compressedPage.accept(
- new DataPage.Visitor() {
- @Override
- public DataPage visit(DataPageV1 dataPageV1) {
- try {
- BytesInput bytes = dataPageV1.getBytes();
- if (null != blockDecryptor) {
- bytes = BytesInput.from(blockDecryptor.decrypt(bytes.toByteArray(), dataPageAAD));
- }
- BytesInput decompressed =
- decompressor.decompress(bytes, dataPageV1.getUncompressedSize());
-
- final DataPageV1 decompressedPage;
- if (offsetIndex == null) {
- decompressedPage =
- new DataPageV1(
- decompressed,
- dataPageV1.getValueCount(),
- dataPageV1.getUncompressedSize(),
- dataPageV1.getStatistics(),
- dataPageV1.getRlEncoding(),
- dataPageV1.getDlEncoding(),
- dataPageV1.getValueEncoding());
- } else {
- long firstRowIndex = offsetIndex.getFirstRowIndex(currentPageIndex);
- decompressedPage =
- new DataPageV1(
- decompressed,
- dataPageV1.getValueCount(),
- dataPageV1.getUncompressedSize(),
- firstRowIndex,
- Math.toIntExact(
- offsetIndex.getLastRowIndex(currentPageIndex, rowCount)
- - firstRowIndex
- + 1),
- dataPageV1.getStatistics(),
- dataPageV1.getRlEncoding(),
- dataPageV1.getDlEncoding(),
- dataPageV1.getValueEncoding());
- }
- if (dataPageV1.getCrc().isPresent()) {
- decompressedPage.setCrc(dataPageV1.getCrc().getAsInt());
- }
- return decompressedPage;
- } catch (IOException e) {
- throw new ParquetDecodingException("could not decompress page", e);
- }
- }
-
- @Override
- public DataPage visit(DataPageV2 dataPageV2) {
- if (!dataPageV2.isCompressed() && offsetIndex == null && null == blockDecryptor) {
- return dataPageV2;
- }
- BytesInput pageBytes = dataPageV2.getData();
-
- if (null != blockDecryptor) {
- try {
- pageBytes =
- BytesInput.from(blockDecryptor.decrypt(pageBytes.toByteArray(), dataPageAAD));
- } catch (IOException e) {
- throw new ParquetDecodingException(
- "could not convert page ByteInput to byte array", e);
- }
- }
- if (dataPageV2.isCompressed()) {
- int uncompressedSize =
- Math.toIntExact(
- dataPageV2.getUncompressedSize()
- - dataPageV2.getDefinitionLevels().size()
- - dataPageV2.getRepetitionLevels().size());
- try {
- pageBytes = decompressor.decompress(pageBytes, uncompressedSize);
- } catch (IOException e) {
- throw new ParquetDecodingException("could not decompress page", e);
- }
- }
-
- if (offsetIndex == null) {
- return DataPageV2.uncompressed(
- dataPageV2.getRowCount(),
- dataPageV2.getNullCount(),
- dataPageV2.getValueCount(),
- dataPageV2.getRepetitionLevels(),
- dataPageV2.getDefinitionLevels(),
- dataPageV2.getDataEncoding(),
- pageBytes,
- dataPageV2.getStatistics());
- } else {
- return DataPageV2.uncompressed(
- dataPageV2.getRowCount(),
- dataPageV2.getNullCount(),
- dataPageV2.getValueCount(),
- offsetIndex.getFirstRowIndex(currentPageIndex),
- dataPageV2.getRepetitionLevels(),
- dataPageV2.getDefinitionLevels(),
- dataPageV2.getDataEncoding(),
- pageBytes,
- dataPageV2.getStatistics());
- }
- }
- });
- }
-
- @Override
- public DictionaryPage readDictionaryPage() {
- if (compressedDictionaryPage == null) {
- return null;
- }
- try {
- BytesInput bytes = compressedDictionaryPage.getBytes();
- if (null != blockDecryptor) {
- bytes = BytesInput.from(blockDecryptor.decrypt(bytes.toByteArray(), dictionaryPageAAD));
- }
- DictionaryPage decompressedPage =
- new DictionaryPage(
- decompressor.decompress(bytes, compressedDictionaryPage.getUncompressedSize()),
- compressedDictionaryPage.getDictionarySize(),
- compressedDictionaryPage.getEncoding());
- if (compressedDictionaryPage.getCrc().isPresent()) {
- decompressedPage.setCrc(compressedDictionaryPage.getCrc().getAsInt());
- }
- return decompressedPage;
- } catch (IOException e) {
- throw new ParquetDecodingException("Could not decompress dictionary page", e);
- }
- }
- }
-
- private final Map readers =
- new HashMap();
- private final long rowCount;
- private final long rowIndexOffset;
- private final RowRanges rowRanges;
-
- public ColumnChunkPageReadStore(long rowCount) {
- this(rowCount, -1);
- }
-
- ColumnChunkPageReadStore(RowRanges rowRanges) {
- this(rowRanges, -1);
- }
-
- ColumnChunkPageReadStore(long rowCount, long rowIndexOffset) {
- this.rowCount = rowCount;
- this.rowIndexOffset = rowIndexOffset;
- rowRanges = null;
- }
-
- ColumnChunkPageReadStore(RowRanges rowRanges, long rowIndexOffset) {
- this.rowRanges = rowRanges;
- this.rowIndexOffset = rowIndexOffset;
- rowCount = rowRanges.rowCount();
- }
-
- @Override
- public long getRowCount() {
- return rowCount;
- }
-
- @Override
- public Optional getRowIndexOffset() {
- return rowIndexOffset < 0 ? Optional.empty() : Optional.of(rowIndexOffset);
- }
-
- @Override
- public PageReader getPageReader(ColumnDescriptor path) {
- final PageReader pageReader = readers.get(path);
- if (pageReader == null) {
- throw new IllegalArgumentException(
- path + " is not in the store: " + readers.keySet() + " " + rowCount);
- }
- return pageReader;
- }
-
- @Override
- public DictionaryPage readDictionaryPage(ColumnDescriptor descriptor) {
- return readers.get(descriptor).readDictionaryPage();
- }
-
- @Override
- public Optional getRowIndexes() {
- return rowRanges == null ? Optional.empty() : Optional.of(rowRanges.iterator());
- }
-
- void addColumn(ColumnDescriptor path, ColumnChunkPageReader reader) {
- if (readers.put(path, reader) != null) {
- throw new RuntimeException(path + " was added twice");
- }
- }
-}
diff --git a/src/main/java/org/apache/parquet/local/ColumnChunkPageWriteStore.java b/src/main/java/org/apache/parquet/local/ColumnChunkPageWriteStore.java
deleted file mode 100644
index ce9a4e6..0000000
--- a/src/main/java/org/apache/parquet/local/ColumnChunkPageWriteStore.java
+++ /dev/null
@@ -1,501 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.local;
-
-import org.apache.parquet.bytes.ByteBufferAllocator;
-import org.apache.parquet.bytes.BytesInput;
-import org.apache.parquet.bytes.ConcatenatingByteArrayCollector;
-import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.Encoding;
-import org.apache.parquet.column.page.DictionaryPage;
-import org.apache.parquet.column.page.PageWriteStore;
-import org.apache.parquet.column.page.PageWriter;
-import org.apache.parquet.column.statistics.Statistics;
-import org.apache.parquet.column.values.bloomfilter.BloomFilter;
-import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
-import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
-import org.apache.parquet.compression.CompressionCodecFactory;
-import org.apache.parquet.crypto.AesCipher;
-import org.apache.parquet.crypto.InternalColumnEncryptionSetup;
-import org.apache.parquet.crypto.InternalFileEncryptor;
-import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
-import org.apache.parquet.format.BlockCipher;
-import org.apache.parquet.hadoop.metadata.ColumnPath;
-import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder;
-import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder;
-import org.apache.parquet.io.ParquetEncodingException;
-import org.apache.parquet.schema.MessageType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.*;
-import java.util.zip.CRC32;
-
-class ColumnChunkPageWriteStore implements PageWriteStore, BloomFilterWriteStore {
- private static final Logger LOG = LoggerFactory.getLogger(ColumnChunkPageWriteStore.class);
-
- private static final ParquetMetadataConverter parquetMetadataConverter =
- new ParquetMetadataConverter();
-
- private static final class ColumnChunkPageWriter implements PageWriter, BloomFilterWriter {
-
- private final ColumnDescriptor path;
- private final CompressionCodecFactory.BytesInputCompressor compressor;
-
- private final ByteArrayOutputStream tempOutputStream = new ByteArrayOutputStream();
- private final ConcatenatingByteArrayCollector buf;
- private DictionaryPage dictionaryPage;
-
- private long uncompressedLength;
- private long compressedLength;
- private long totalValueCount;
- private int pageCount;
-
- // repetition and definition level encodings are used only for v1 pages and don't change
- private final Set rlEncodings = new HashSet<>();
- private final Set dlEncodings = new HashSet<>();
- private final List dataEncodings = new ArrayList<>();
-
- private BloomFilter bloomFilter;
- private ColumnIndexBuilder columnIndexBuilder;
- private OffsetIndexBuilder offsetIndexBuilder;
- private Statistics totalStatistics;
-
- private final CRC32 crc;
- boolean pageWriteChecksumEnabled;
-
- private final BlockCipher.Encryptor headerBlockEncryptor;
- private final BlockCipher.Encryptor pageBlockEncryptor;
- private final int rowGroupOrdinal;
- private final int columnOrdinal;
- private int pageOrdinal;
- private final byte[] dataPageAAD;
- private final byte[] dataPageHeaderAAD;
- private final byte[] fileAAD;
-
- private ColumnChunkPageWriter(
- ColumnDescriptor path,
- CompressionCodecFactory.BytesInputCompressor compressor,
- ByteBufferAllocator allocator,
- int columnIndexTruncateLength,
- boolean pageWriteChecksumEnabled,
- BlockCipher.Encryptor headerBlockEncryptor,
- BlockCipher.Encryptor pageBlockEncryptor,
- byte[] fileAAD,
- int rowGroupOrdinal,
- int columnOrdinal) {
- this.path = path;
- this.compressor = compressor;
- this.buf = new ConcatenatingByteArrayCollector();
- this.columnIndexBuilder =
- ColumnIndexBuilder.getBuilder(path.getPrimitiveType(), columnIndexTruncateLength);
- this.offsetIndexBuilder = OffsetIndexBuilder.getBuilder();
- this.pageWriteChecksumEnabled = pageWriteChecksumEnabled;
- this.crc = pageWriteChecksumEnabled ? new CRC32() : null;
-
- this.headerBlockEncryptor = headerBlockEncryptor;
- this.pageBlockEncryptor = pageBlockEncryptor;
- this.fileAAD = fileAAD;
- this.rowGroupOrdinal = rowGroupOrdinal;
- this.columnOrdinal = columnOrdinal;
- this.pageOrdinal = -1;
- if (null != headerBlockEncryptor) {
- dataPageHeaderAAD =
- AesCipher.createModuleAAD(
- fileAAD, ModuleType.DataPageHeader, rowGroupOrdinal, columnOrdinal, 0);
- } else {
- dataPageHeaderAAD = null;
- }
- if (null != pageBlockEncryptor) {
- dataPageAAD =
- AesCipher.createModuleAAD(
- fileAAD, ModuleType.DataPage, rowGroupOrdinal, columnOrdinal, 0);
- } else {
- dataPageAAD = null;
- }
- }
-
- @Override
- @Deprecated
- public void writePage(
- BytesInput bytesInput,
- int valueCount,
- Statistics> statistics,
- Encoding rlEncoding,
- Encoding dlEncoding,
- Encoding valuesEncoding)
- throws IOException {
- // Setting the builders to the no-op ones so no column/offset indexes will be written for this
- // column chunk
- columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder();
- offsetIndexBuilder = OffsetIndexBuilder.getNoOpBuilder();
-
- writePage(bytesInput, valueCount, -1, statistics, rlEncoding, dlEncoding, valuesEncoding);
- }
-
- @Override
- public void writePage(
- BytesInput bytes,
- int valueCount,
- int rowCount,
- Statistics statistics,
- Encoding rlEncoding,
- Encoding dlEncoding,
- Encoding valuesEncoding)
- throws IOException {
- pageOrdinal++;
- long uncompressedSize = bytes.size();
- if (uncompressedSize > Integer.MAX_VALUE || uncompressedSize < 0) {
- throw new ParquetEncodingException(
- "Cannot write page larger than Integer.MAX_VALUE or negative bytes: "
- + uncompressedSize);
- }
- BytesInput compressedBytes = compressor.compress(bytes);
- if (null != pageBlockEncryptor) {
- AesCipher.quickUpdatePageAAD(dataPageAAD, pageOrdinal);
- compressedBytes =
- BytesInput.from(pageBlockEncryptor.encrypt(compressedBytes.toByteArray(), dataPageAAD));
- }
- long compressedSize = compressedBytes.size();
- if (compressedSize > Integer.MAX_VALUE) {
- throw new ParquetEncodingException(
- "Cannot write compressed page larger than Integer.MAX_VALUE bytes: " + compressedSize);
- }
- tempOutputStream.reset();
- if (null != headerBlockEncryptor) {
- AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
- }
- if (pageWriteChecksumEnabled) {
- crc.reset();
- crc.update(compressedBytes.toByteArray());
- parquetMetadataConverter.writeDataPageV1Header(
- (int) uncompressedSize,
- (int) compressedSize,
- valueCount,
- rlEncoding,
- dlEncoding,
- valuesEncoding,
- (int) crc.getValue(),
- tempOutputStream,
- headerBlockEncryptor,
- dataPageHeaderAAD);
- } else {
- parquetMetadataConverter.writeDataPageV1Header(
- (int) uncompressedSize,
- (int) compressedSize,
- valueCount,
- rlEncoding,
- dlEncoding,
- valuesEncoding,
- tempOutputStream,
- headerBlockEncryptor,
- dataPageHeaderAAD);
- }
- this.uncompressedLength += uncompressedSize;
- this.compressedLength += compressedSize;
- this.totalValueCount += valueCount;
- this.pageCount += 1;
-
- // Copying the statistics if it is not initialized yet so we have the correct typed one
- if (totalStatistics == null) {
- totalStatistics = statistics.copy();
- } else {
- totalStatistics.mergeStatistics(statistics);
- }
-
- columnIndexBuilder.add(statistics);
- offsetIndexBuilder.add(toIntWithCheck(tempOutputStream.size() + compressedSize), rowCount);
-
- // by concatenating before collecting instead of collecting twice,
- // we only allocate one buffer to copy into instead of multiple.
- buf.collect(BytesInput.concat(BytesInput.from(tempOutputStream), compressedBytes));
- rlEncodings.add(rlEncoding);
- dlEncodings.add(dlEncoding);
- dataEncodings.add(valuesEncoding);
- }
-
- @Override
- public void writePageV2(
- int rowCount,
- int nullCount,
- int valueCount,
- BytesInput repetitionLevels,
- BytesInput definitionLevels,
- Encoding dataEncoding,
- BytesInput data,
- Statistics> statistics)
- throws IOException {
- pageOrdinal++;
-
- int rlByteLength = toIntWithCheck(repetitionLevels.size());
- int dlByteLength = toIntWithCheck(definitionLevels.size());
- int uncompressedSize =
- toIntWithCheck(data.size() + repetitionLevels.size() + definitionLevels.size());
- // TODO: decide if we compress
- BytesInput compressedData = compressor.compress(data);
- if (null != pageBlockEncryptor) {
- AesCipher.quickUpdatePageAAD(dataPageAAD, pageOrdinal);
- compressedData =
- BytesInput.from(pageBlockEncryptor.encrypt(compressedData.toByteArray(), dataPageAAD));
- }
- int compressedSize =
- toIntWithCheck(compressedData.size() + repetitionLevels.size() + definitionLevels.size());
- tempOutputStream.reset();
- if (null != headerBlockEncryptor) {
- AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
- }
- parquetMetadataConverter.writeDataPageV2Header(
- uncompressedSize,
- compressedSize,
- valueCount,
- nullCount,
- rowCount,
- dataEncoding,
- rlByteLength,
- dlByteLength,
- tempOutputStream,
- headerBlockEncryptor,
- dataPageHeaderAAD);
- this.uncompressedLength += uncompressedSize;
- this.compressedLength += compressedSize;
- this.totalValueCount += valueCount;
- this.pageCount += 1;
-
- // Copying the statistics if it is not initialized yet so we have the correct typed one
- if (totalStatistics == null) {
- totalStatistics = statistics.copy();
- } else {
- totalStatistics.mergeStatistics(statistics);
- }
-
- columnIndexBuilder.add(statistics);
- offsetIndexBuilder.add(
- toIntWithCheck((long) tempOutputStream.size() + compressedSize), rowCount);
-
- // by concatenating before collecting instead of collecting twice,
- // we only allocate one buffer to copy into instead of multiple.
- buf.collect(
- BytesInput.concat(
- BytesInput.from(tempOutputStream),
- repetitionLevels,
- definitionLevels,
- compressedData));
- dataEncodings.add(dataEncoding);
- }
-
- private int toIntWithCheck(long size) {
- if (size > Integer.MAX_VALUE) {
- throw new ParquetEncodingException(
- "Cannot write page larger than " + Integer.MAX_VALUE + " bytes: " + size);
- }
- return (int) size;
- }
-
- @Override
- public long getMemSize() {
- return buf.size();
- }
-
- public void writeToFileWriter(ParquetFileWriter writer) throws IOException {
- if (null == headerBlockEncryptor) {
- writer.writeColumnChunk(
- path,
- totalValueCount,
- compressor.getCodecName(),
- dictionaryPage,
- buf,
- uncompressedLength,
- compressedLength,
- totalStatistics,
- columnIndexBuilder,
- offsetIndexBuilder,
- bloomFilter,
- rlEncodings,
- dlEncodings,
- dataEncodings);
- } else {
- writer.writeColumnChunk(
- path,
- totalValueCount,
- compressor.getCodecName(),
- dictionaryPage,
- buf,
- uncompressedLength,
- compressedLength,
- totalStatistics,
- columnIndexBuilder,
- offsetIndexBuilder,
- bloomFilter,
- rlEncodings,
- dlEncodings,
- dataEncodings,
- headerBlockEncryptor,
- rowGroupOrdinal,
- columnOrdinal,
- fileAAD);
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- String.format(
- "written %,dB for %s: %,d values, %,dB raw, %,dB comp, %d pages, encodings: %s",
- buf.size(),
- path,
- totalValueCount,
- uncompressedLength,
- compressedLength,
- pageCount,
- new HashSet(dataEncodings))
- + (dictionaryPage != null
- ? String.format(
- ", dic { %,d entries, %,dB raw, %,dB comp}",
- dictionaryPage.getDictionarySize(),
- dictionaryPage.getUncompressedSize(),
- dictionaryPage.getDictionarySize())
- : ""));
- }
- rlEncodings.clear();
- dlEncodings.clear();
- dataEncodings.clear();
- pageCount = 0;
- pageOrdinal = -1;
- }
-
- @Override
- public long allocatedSize() {
- return buf.size();
- }
-
- @Override
- public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException {
- if (this.dictionaryPage != null) {
- throw new ParquetEncodingException("Only one dictionary page is allowed");
- }
- BytesInput dictionaryBytes = dictionaryPage.getBytes();
- int uncompressedSize = (int) dictionaryBytes.size();
- BytesInput compressedBytes = compressor.compress(dictionaryBytes);
- if (null != pageBlockEncryptor) {
- byte[] dictonaryPageAAD =
- AesCipher.createModuleAAD(
- fileAAD, ModuleType.DictionaryPage, rowGroupOrdinal, columnOrdinal, -1);
- compressedBytes =
- BytesInput.from(
- pageBlockEncryptor.encrypt(compressedBytes.toByteArray(), dictonaryPageAAD));
- }
- this.dictionaryPage =
- new DictionaryPage(
- BytesInput.copy(compressedBytes),
- uncompressedSize,
- dictionaryPage.getDictionarySize(),
- dictionaryPage.getEncoding());
- }
-
- @Override
- public String memUsageString(String prefix) {
- return buf.memUsageString(prefix + " ColumnChunkPageWriter");
- }
-
- @Override
- public void writeBloomFilter(BloomFilter bloomFilter) {
- this.bloomFilter = bloomFilter;
- }
- }
-
- private final Map writers =
- new HashMap();
- private final MessageType schema;
-
- public ColumnChunkPageWriteStore(
- CompressionCodecFactory.BytesInputCompressor compressor,
- MessageType schema,
- ByteBufferAllocator allocator,
- int columnIndexTruncateLength,
- boolean pageWriteChecksumEnabled,
- InternalFileEncryptor fileEncryptor,
- int rowGroupOrdinal) {
- this.schema = schema;
- if (null == fileEncryptor) {
- for (ColumnDescriptor path : schema.getColumns()) {
- writers.put(
- path,
- new ColumnChunkPageWriter(
- path,
- compressor,
- allocator,
- columnIndexTruncateLength,
- pageWriteChecksumEnabled,
- null,
- null,
- null,
- -1,
- -1));
- }
- return;
- }
-
- // Encrypted file
- int columnOrdinal = -1;
- byte[] fileAAD = fileEncryptor.getFileAAD();
- for (ColumnDescriptor path : schema.getColumns()) {
- columnOrdinal++;
- BlockCipher.Encryptor headerBlockEncryptor = null;
- BlockCipher.Encryptor pageBlockEncryptor = null;
- ColumnPath columnPath = ColumnPath.get(path.getPath());
-
- InternalColumnEncryptionSetup columnSetup =
- fileEncryptor.getColumnSetup(columnPath, true, columnOrdinal);
- if (columnSetup.isEncrypted()) {
- headerBlockEncryptor = columnSetup.getMetaDataEncryptor();
- pageBlockEncryptor = columnSetup.getDataEncryptor();
- }
-
- writers.put(
- path,
- new ColumnChunkPageWriter(
- path,
- compressor,
- allocator,
- columnIndexTruncateLength,
- pageWriteChecksumEnabled,
- headerBlockEncryptor,
- pageBlockEncryptor,
- fileAAD,
- rowGroupOrdinal,
- columnOrdinal));
- }
- }
-
- @Override
- public PageWriter getPageWriter(ColumnDescriptor path) {
- return writers.get(path);
- }
-
- @Override
- public BloomFilterWriter getBloomFilterWriter(ColumnDescriptor path) {
- return writers.get(path);
- }
-
- public void flushToFileWriter(ParquetFileWriter writer) throws IOException {
- for (ColumnDescriptor path : schema.getColumns()) {
- ColumnChunkPageWriter pageWriter = writers.get(path);
- pageWriter.writeToFileWriter(writer);
- }
- }
-}
diff --git a/src/main/java/org/apache/parquet/local/ColumnIndexFilterUtils.java b/src/main/java/org/apache/parquet/local/ColumnIndexFilterUtils.java
deleted file mode 100644
index b83b0e4..0000000
--- a/src/main/java/org/apache/parquet/local/ColumnIndexFilterUtils.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.local;
-
-import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
-import org.apache.parquet.internal.column.columnindex.OffsetIndex;
-import org.apache.parquet.internal.filter2.columnindex.RowRanges;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Formatter;
-import java.util.List;
-
-/** Internal utility class to help at column index based filtering. */
-class ColumnIndexFilterUtils {
- static class OffsetRange {
- private final long offset;
- private long length;
-
- private OffsetRange(long offset, int length) {
- this.offset = offset;
- this.length = length;
- }
-
- long getOffset() {
- return offset;
- }
-
- long getLength() {
- return length;
- }
-
- private boolean extend(long offset, int length) {
- if (this.offset + this.length == offset) {
- this.length += length;
- return true;
- } else {
- return false;
- }
- }
- }
-
- private static class FilteredOffsetIndex implements OffsetIndex {
- private final OffsetIndex offsetIndex;
- private final int[] indexMap;
-
- private FilteredOffsetIndex(OffsetIndex offsetIndex, int[] indexMap) {
- this.offsetIndex = offsetIndex;
- this.indexMap = indexMap;
- }
-
- @Override
- public int getPageOrdinal(int pageIndex) {
- return indexMap[pageIndex];
- }
-
- @Override
- public int getPageCount() {
- return indexMap.length;
- }
-
- @Override
- public long getOffset(int pageIndex) {
- return offsetIndex.getOffset(indexMap[pageIndex]);
- }
-
- @Override
- public int getCompressedPageSize(int pageIndex) {
- return offsetIndex.getCompressedPageSize(indexMap[pageIndex]);
- }
-
- @Override
- public long getFirstRowIndex(int pageIndex) {
- return offsetIndex.getFirstRowIndex(indexMap[pageIndex]);
- }
-
- @Override
- public long getLastRowIndex(int pageIndex, long totalRowCount) {
- int nextIndex = indexMap[pageIndex] + 1;
- return (nextIndex >= offsetIndex.getPageCount()
- ? totalRowCount
- : offsetIndex.getFirstRowIndex(nextIndex))
- - 1;
- }
-
- @Override
- public String toString() {
- try (Formatter formatter = new Formatter()) {
- formatter.format(
- "%-12s %20s %16s %20s\n", "", "offset", "compressed size", "first row index");
- for (int i = 0, n = offsetIndex.getPageCount(); i < n; ++i) {
- int index = Arrays.binarySearch(indexMap, i);
- boolean isHidden = index < 0;
- formatter.format(
- "%spage-%-5d %20d %16d %20d\n",
- isHidden ? "- " : " ",
- isHidden ? i : index,
- offsetIndex.getOffset(i),
- offsetIndex.getCompressedPageSize(i),
- offsetIndex.getFirstRowIndex(i));
- }
- return formatter.toString();
- }
- }
- }
-
- /*
- * Returns the filtered offset index containing only the pages which are overlapping with rowRanges.
- */
- static OffsetIndex filterOffsetIndex(
- OffsetIndex offsetIndex, RowRanges rowRanges, long totalRowCount) {
- int[] result = new int[offsetIndex.getPageCount()];
- int count = 0;
- for (int i = 0, n = offsetIndex.getPageCount(); i < n; ++i) {
- long from = offsetIndex.getFirstRowIndex(i);
- if (rowRanges.isOverlapping(from, offsetIndex.getLastRowIndex(i, totalRowCount))) {
- result[count++] = i;
- }
- }
- return new FilteredOffsetIndex(offsetIndex, Arrays.copyOfRange(result, 0, count));
- }
-
- static List calculateOffsetRanges(
- OffsetIndex offsetIndex, ColumnChunkMetaData cm, long firstPageOffset) {
- List ranges = new ArrayList<>();
- int n = offsetIndex.getPageCount();
- if (n > 0) {
- OffsetRange currentRange = null;
-
- // Add a range for the dictionary page if required
- long rowGroupOffset = cm.getStartingPos();
- if (rowGroupOffset < firstPageOffset) {
- currentRange = new OffsetRange(rowGroupOffset, (int) (firstPageOffset - rowGroupOffset));
- ranges.add(currentRange);
- }
-
- for (int i = 0; i < n; ++i) {
- long offset = offsetIndex.getOffset(i);
- int length = offsetIndex.getCompressedPageSize(i);
- if (currentRange == null || !currentRange.extend(offset, length)) {
- currentRange = new OffsetRange(offset, length);
- ranges.add(currentRange);
- }
- }
- }
- return ranges;
- }
-}
diff --git a/src/main/java/org/apache/parquet/local/ColumnIndexStoreImpl.java b/src/main/java/org/apache/parquet/local/ColumnIndexStoreImpl.java
deleted file mode 100644
index 9ecaac5..0000000
--- a/src/main/java/org/apache/parquet/local/ColumnIndexStoreImpl.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.local;
-
-import org.apache.parquet.hadoop.metadata.BlockMetaData;
-import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
-import org.apache.parquet.hadoop.metadata.ColumnPath;
-import org.apache.parquet.internal.column.columnindex.ColumnIndex;
-import org.apache.parquet.internal.column.columnindex.OffsetIndex;
-import org.apache.parquet.internal.filter2.columnindex.ColumnIndexStore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import static java.util.Collections.emptySet;
-
-/** Internal implementation of {@link ColumnIndexStore}. */
-class ColumnIndexStoreImpl implements ColumnIndexStore {
-
- private interface IndexStore {
- ColumnIndex getColumnIndex();
-
- OffsetIndex getOffsetIndex();
- }
-
- private class IndexStoreImpl implements IndexStore {
- private final ColumnChunkMetaData meta;
- private ColumnIndex columnIndex;
- private boolean columnIndexRead;
- private final OffsetIndex offsetIndex;
-
- IndexStoreImpl(ColumnChunkMetaData meta) {
- this.meta = meta;
- OffsetIndex oi;
- try {
- oi = reader.readOffsetIndex(meta);
- } catch (IOException e) {
- // If the I/O issue still stands it will fail the reading later;
- // otherwise we fail the filtering only with a missing offset index.
- LOGGER.warn("Unable to read offset index for column {}", meta.getPath(), e);
- oi = null;
- }
- if (oi == null) {
- throw new MissingOffsetIndexException(meta.getPath());
- }
- offsetIndex = oi;
- }
-
- @Override
- public ColumnIndex getColumnIndex() {
- if (!columnIndexRead) {
- try {
- columnIndex = reader.readColumnIndex(meta);
- } catch (IOException e) {
- // If the I/O issue still stands it will fail the reading later;
- // otherwise we fail the filtering only with a missing column index.
- LOGGER.warn("Unable to read column index for column {}", meta.getPath(), e);
- }
- columnIndexRead = true;
- }
- return columnIndex;
- }
-
- @Override
- public OffsetIndex getOffsetIndex() {
- return offsetIndex;
- }
- }
-
- private static final Logger LOGGER = LoggerFactory.getLogger(ColumnIndexStoreImpl.class);
- // Used for columns are not in this parquet file
- private static final IndexStore MISSING_INDEX_STORE =
- new IndexStore() {
- @Override
- public ColumnIndex getColumnIndex() {
- return null;
- }
-
- @Override
- public OffsetIndex getOffsetIndex() {
- return null;
- }
- };
- private static final ColumnIndexStoreImpl EMPTY =
- new ColumnIndexStoreImpl(null, new BlockMetaData(), emptySet()) {
- @Override
- public ColumnIndex getColumnIndex(ColumnPath column) {
- return null;
- }
-
- @Override
- public OffsetIndex getOffsetIndex(ColumnPath column) {
- throw new MissingOffsetIndexException(column);
- }
- };
-
- private final ParquetFileReader reader;
- private final Map store;
-
- /*
- * Creates a column index store which lazily reads column/offset indexes for the columns in paths. (paths are the set
- * of columns used for the projection)
- */
- static ColumnIndexStore create(
- ParquetFileReader reader, BlockMetaData block, Set paths) {
- try {
- return new ColumnIndexStoreImpl(reader, block, paths);
- } catch (MissingOffsetIndexException e) {
- return EMPTY;
- }
- }
-
- private ColumnIndexStoreImpl(
- ParquetFileReader reader, BlockMetaData block, Set paths) {
- // TODO[GS]: Offset index for every paths will be required; pre-read the consecutive ones at
- // once?
- // TODO[GS]: Pre-read column index based on filter?
- this.reader = reader;
- Map store = new HashMap<>();
- for (ColumnChunkMetaData column : block.getColumns()) {
- ColumnPath path = column.getPath();
- if (paths.contains(path)) {
- store.put(path, new IndexStoreImpl(column));
- }
- }
- this.store = store;
- }
-
- @Override
- public ColumnIndex getColumnIndex(ColumnPath column) {
- return store.getOrDefault(column, MISSING_INDEX_STORE).getColumnIndex();
- }
-
- @Override
- public OffsetIndex getOffsetIndex(ColumnPath column) {
- return store.getOrDefault(column, MISSING_INDEX_STORE).getOffsetIndex();
- }
-}
diff --git a/src/main/java/org/apache/parquet/local/DictionaryPageReader.java b/src/main/java/org/apache/parquet/local/DictionaryPageReader.java
deleted file mode 100644
index 9072da6..0000000
--- a/src/main/java/org/apache/parquet/local/DictionaryPageReader.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.local;
-
-import org.apache.parquet.bytes.BytesInput;
-import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.page.DictionaryPage;
-import org.apache.parquet.column.page.DictionaryPageReadStore;
-import org.apache.parquet.hadoop.metadata.BlockMetaData;
-import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
-import org.apache.parquet.io.ParquetDecodingException;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * A {@link DictionaryPageReadStore} implementation that reads dictionaries from an open {@link
- * ParquetFileReader}.
- *
- * This implementation will delegate dictionary reads to a {@link ColumnChunkPageReadStore} to
- * avoid extra reads after a row group has been loaded into memory.
- */
-class DictionaryPageReader implements DictionaryPageReadStore {
-
- private final ParquetFileReader reader;
- private final Map columns;
- private final Map> dictionaryPageCache;
- private ColumnChunkPageReadStore rowGroup = null;
-
- /**
- * Instantiate a new DictionaryPageReader.
- *
- * @param reader The target ParquetFileReader
- * @param block The target BlockMetaData
- * @throws NullPointerException if {@code reader} or {@code block} is {@code null}
- */
- DictionaryPageReader(ParquetFileReader reader, BlockMetaData block) {
- this.reader = Objects.requireNonNull(reader);
- this.columns = new HashMap<>();
- this.dictionaryPageCache = new ConcurrentHashMap<>();
-
- for (ColumnChunkMetaData column : block.getColumns()) {
- columns.put(column.getPath().toDotString(), column);
- }
- }
-
- /**
- * Sets this reader's row group's page store. When a row group is set, this reader will delegate
- * to that row group to return dictionary pages. This avoids seeking and re-reading dictionary
- * bytes after this reader's row group is loaded into memory.
- *
- * @param rowGroup a ColumnChunkPageReadStore for this reader's row group
- */
- void setRowGroup(ColumnChunkPageReadStore rowGroup) {
- this.rowGroup = rowGroup;
- }
-
- @Override
- public DictionaryPage readDictionaryPage(ColumnDescriptor descriptor) {
- if (rowGroup != null) {
- // if the row group has already been read, use that dictionary
- return rowGroup.readDictionaryPage(descriptor);
- }
-
- String dotPath = String.join(".", descriptor.getPath());
- ColumnChunkMetaData column = columns.get(dotPath);
- if (column == null) {
- throw new ParquetDecodingException("Failed to load dictionary, unknown column: " + dotPath);
- }
-
- return dictionaryPageCache
- .computeIfAbsent(
- dotPath,
- key -> {
- try {
- final DictionaryPage dict =
- column.hasDictionaryPage() ? reader.readDictionary(column) : null;
-
- // Copy the dictionary to ensure it can be reused if it is returned
- // more than once. This can happen when a DictionaryFilter has two or
- // more predicates for the same column. Cache misses as well.
- return (dict != null) ? Optional.of(reusableCopy(dict)) : Optional.empty();
- } catch (IOException e) {
- throw new ParquetDecodingException("Failed to read dictionary", e);
- }
- })
- .orElse(null);
- }
-
- private static DictionaryPage reusableCopy(DictionaryPage dict) throws IOException {
- return new DictionaryPage(
- BytesInput.from(dict.getBytes().toByteArray()),
- dict.getDictionarySize(),
- dict.getEncoding());
- }
-}
diff --git a/src/main/java/org/apache/parquet/local/ParquetFileReader.java b/src/main/java/org/apache/parquet/local/ParquetFileReader.java
deleted file mode 100644
index 05c99a7..0000000
--- a/src/main/java/org/apache/parquet/local/ParquetFileReader.java
+++ /dev/null
@@ -1,1345 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/*
- * copied from parquet-mr, updated by An Qi
- */
-
-package org.apache.parquet.local;
-
-import org.apache.parquet.bytes.ByteBufferInputStream;
-import org.apache.parquet.bytes.BytesInput;
-import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.page.*;
-import org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter;
-import org.apache.parquet.column.values.bloomfilter.BloomFilter;
-import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
-import org.apache.parquet.crypto.*;
-import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
-import org.apache.parquet.filter2.compat.FilterCompat;
-import org.apache.parquet.format.*;
-import org.apache.parquet.hadoop.ParquetEmptyBlockException;
-import org.apache.parquet.hadoop.metadata.*;
-import org.apache.parquet.hadoop.metadata.FileMetaData;
-import org.apache.parquet.internal.column.columnindex.ColumnIndex;
-import org.apache.parquet.internal.column.columnindex.OffsetIndex;
-import org.apache.parquet.internal.filter2.columnindex.ColumnIndexFilter;
-import org.apache.parquet.internal.filter2.columnindex.ColumnIndexStore;
-import org.apache.parquet.internal.filter2.columnindex.RowRanges;
-import org.apache.parquet.internal.hadoop.metadata.IndexReference;
-import org.apache.parquet.io.InputFile;
-import org.apache.parquet.io.ParquetDecodingException;
-import org.apache.parquet.io.SeekableInputStream;
-import org.apache.parquet.local.ColumnChunkPageReadStore.ColumnChunkPageReader;
-import org.apache.parquet.local.ColumnIndexFilterUtils.OffsetRange;
-import org.apache.parquet.local.filter2.compat.RowGroupFilter;
-import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.PrimitiveType;
-import org.apache.yetus.audience.InterfaceAudience.Private;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.SequenceInputStream;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.zip.CRC32;
-
-import static org.apache.parquet.bytes.BytesUtils.readIntLittleEndian;
-import static org.apache.parquet.format.Util.readFileCryptoMetaData;
-import static org.apache.parquet.local.ColumnIndexFilterUtils.calculateOffsetRanges;
-import static org.apache.parquet.local.ColumnIndexFilterUtils.filterOffsetIndex;
-import static org.apache.parquet.local.ParquetFileWriter.EFMAGIC;
-import static org.apache.parquet.local.ParquetFileWriter.MAGIC;
-
-/** Internal implementation of the Parquet file reader as a block container */
-public class ParquetFileReader implements Closeable {
-
- private static final Logger LOG = LoggerFactory.getLogger(ParquetFileReader.class);
-
- private final ParquetMetadataConverter converter;
-
- private final CRC32 crc;
-
- public static ParquetMetadata readFooter(
- InputFile file, ParquetReadOptions options, SeekableInputStream f) throws IOException {
- ParquetMetadataConverter converter = new ParquetMetadataConverter(options);
- return readFooter(file, options, f, converter);
- }
-
- private static ParquetMetadata readFooter(
- InputFile file,
- ParquetReadOptions options,
- SeekableInputStream f,
- ParquetMetadataConverter converter)
- throws IOException {
-
- long fileLen = file.getLength();
- String filePath = file.toString();
- LOG.debug("File length {}", fileLen);
-
- int FOOTER_LENGTH_SIZE = 4;
- if (fileLen
- < MAGIC.length
- + FOOTER_LENGTH_SIZE
- + MAGIC.length) { // MAGIC + data + footer + footerIndex + MAGIC
- throw new RuntimeException(
- filePath + " is not a Parquet file (length is too low: " + fileLen + ")");
- }
-
- // Read footer length and magic string - with a single seek
- byte[] magic = new byte[MAGIC.length];
- long fileMetadataLengthIndex = fileLen - magic.length - FOOTER_LENGTH_SIZE;
- LOG.debug("reading footer index at {}", fileMetadataLengthIndex);
- f.seek(fileMetadataLengthIndex);
- int fileMetadataLength = readIntLittleEndian(f);
- f.readFully(magic);
-
- boolean encryptedFooterMode;
- if (Arrays.equals(MAGIC, magic)) {
- encryptedFooterMode = false;
- } else if (Arrays.equals(EFMAGIC, magic)) {
- encryptedFooterMode = true;
- } else {
- throw new RuntimeException(
- filePath
- + " is not a Parquet file. Expected magic number at tail, but found "
- + Arrays.toString(magic));
- }
-
- long fileMetadataIndex = fileMetadataLengthIndex - fileMetadataLength;
- LOG.debug("read footer length: {}, footer index: {}", fileMetadataLength, fileMetadataIndex);
- if (fileMetadataIndex < magic.length || fileMetadataIndex >= fileMetadataLengthIndex) {
- throw new RuntimeException(
- "corrupted file: the footer index is not within the file: " + fileMetadataIndex);
- }
- f.seek(fileMetadataIndex);
-
- FileDecryptionProperties fileDecryptionProperties = options.getDecryptionProperties();
- InternalFileDecryptor fileDecryptor = null;
- if (null != fileDecryptionProperties) {
- fileDecryptor = new InternalFileDecryptor(fileDecryptionProperties);
- }
-
- // Read all the footer bytes in one time to avoid multiple read operations,
- // since it can be pretty time consuming for a single read operation in HDFS.
- ByteBuffer footerBytesBuffer = ByteBuffer.allocate(fileMetadataLength);
- f.readFully(footerBytesBuffer);
- LOG.debug("Finished to read all footer bytes.");
- footerBytesBuffer.flip();
- InputStream footerBytesStream = ByteBufferInputStream.wrap(footerBytesBuffer);
-
- // Regular file, or encrypted file with plaintext footer
- if (!encryptedFooterMode) {
- return converter.readParquetMetadata(
- footerBytesStream, options.getMetadataFilter(), fileDecryptor, false, fileMetadataLength);
- }
-
- // Encrypted file with encrypted footer
- if (null == fileDecryptor) {
- throw new ParquetCryptoRuntimeException(
- "Trying to read file with encrypted footer. No keys available");
- }
- FileCryptoMetaData fileCryptoMetaData = readFileCryptoMetaData(footerBytesStream);
- fileDecryptor.setFileCryptoMetaData(
- fileCryptoMetaData.getEncryption_algorithm(), true, fileCryptoMetaData.getKey_metadata());
- // footer length is required only for signed plaintext footers
- return converter.readParquetMetadata(
- footerBytesStream, options.getMetadataFilter(), fileDecryptor, true, 0);
- }
-
- protected final SeekableInputStream f;
- private final InputFile file;
- private final ParquetReadOptions options;
- private final Map paths = new HashMap<>();
- private final FileMetaData fileMetaData; // may be null
- private final List blocks;
- private final List blockIndexStores;
- private final List blockRowRanges;
-
- // not final. in some cases, this may be lazily loaded for backward-compat.
- private final ParquetMetadata footer;
-
- private int currentBlock = 0;
- private ColumnChunkPageReadStore currentRowGroup = null;
- private DictionaryPageReader nextDictionaryReader = null;
-
- private InternalFileDecryptor fileDecryptor = null;
-
- public ParquetFileReader(InputFile file, ParquetMetadata footer, ParquetReadOptions options)
- throws IOException {
- this.converter = new ParquetMetadataConverter(options);
- this.file = file;
- this.options = options;
- this.f = file.newStream();
- try {
- this.footer = footer;
- this.fileMetaData = footer.getFileMetaData();
- this.fileDecryptor =
- fileMetaData.getFileDecryptor(); // must be called before filterRowGroups!
- if (null != fileDecryptor && fileDecryptor.plaintextFile()) {
- this.fileDecryptor = null; // Plaintext file. No need in decryptor
- }
-
- this.blocks = filterRowGroups(footer.getBlocks());
- this.blockIndexStores = listWithNulls(this.blocks.size());
- this.blockRowRanges = listWithNulls(this.blocks.size());
- for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) {
- paths.put(ColumnPath.get(col.getPath()), col);
- }
- this.crc = options.usePageChecksumVerification() ? new CRC32() : null;
- } catch (Exception e) {
- f.close();
- throw e;
- }
- }
-
- private static List listWithNulls(int size) {
- return new ArrayList<>(Collections.nCopies(size, null));
- }
-
- public FileMetaData getFileMetaData() {
- return fileMetaData;
- }
-
- public long getRecordCount() {
- long total = 0L;
- for (BlockMetaData block : blocks) {
- total += block.getRowCount();
- }
- return total;
- }
-
- public long getFilteredRecordCount() {
- if (!options.useColumnIndexFilter()
- || !FilterCompat.isFilteringRequired(options.getRecordFilter())) {
- return getRecordCount();
- }
- long total = 0L;
- for (int i = 0, n = blocks.size(); i < n; ++i) {
- total += getRowRanges(i).rowCount();
- }
- return total;
- }
-
- public String getFile() {
- return file.toString();
- }
-
- public List filterRowGroups(List blocks) throws IOException {
- FilterCompat.Filter recordFilter = options.getRecordFilter();
- if (FilterCompat.isFilteringRequired(recordFilter)) {
- // set up data filters based on configured levels
- List levels = new ArrayList<>();
-
- if (options.useStatsFilter()) {
- levels.add(RowGroupFilter.FilterLevel.STATISTICS);
- }
-
- if (options.useDictionaryFilter()) {
- levels.add(RowGroupFilter.FilterLevel.DICTIONARY);
- }
-
- if (options.useBloomFilter()) {
- levels.add(RowGroupFilter.FilterLevel.BLOOMFILTER);
- }
- return RowGroupFilter.filterRowGroups(levels, recordFilter, blocks, this);
- }
-
- return blocks;
- }
-
- public List getRowGroups() {
- return blocks;
- }
-
- private MessageType requestedSchema = null;
-
- public void setRequestedSchema(MessageType projection) {
- requestedSchema = projection;
- paths.clear();
- for (ColumnDescriptor col : projection.getColumns()) {
- paths.put(ColumnPath.get(col.getPath()), col);
- }
- }
-
- public MessageType getRequestedSchema() {
- if (requestedSchema == null) {
- return fileMetaData.getSchema();
- }
- return requestedSchema;
- }
-
- /**
- * Reads all the columns requested from the row group at the specified block.
- *
- * @param blockIndex the index of the requested block
- * @return the PageReadStore which can provide PageReaders for each column.
- * @throws IOException if an error occurs while reading
- */
- public PageReadStore readRowGroup(int blockIndex) throws IOException {
- return internalReadRowGroup(blockIndex);
- }
-
- /**
- * Reads all the columns requested from the row group at the current file position.
- *
- * @return the PageReadStore which can provide PageReaders for each column.
- * @throws IOException if an error occurs while reading
- */
- public PageReadStore readNextRowGroup() throws IOException {
- ColumnChunkPageReadStore rowGroup = null;
- try {
- rowGroup = internalReadRowGroup(currentBlock);
- } catch (ParquetEmptyBlockException e) {
- LOG.warn("Read empty block at index {} from {}", currentBlock, getFile());
- advanceToNextBlock();
- return readNextRowGroup();
- }
-
- if (rowGroup == null) {
- return null;
- }
- this.currentRowGroup = rowGroup;
- // avoid re-reading bytes the dictionary reader is used after this call
- if (nextDictionaryReader != null) {
- nextDictionaryReader.setRowGroup(currentRowGroup);
- }
-
- advanceToNextBlock();
-
- return currentRowGroup;
- }
-
- private ColumnChunkPageReadStore internalReadRowGroup(int blockIndex) throws IOException {
- if (blockIndex < 0 || blockIndex >= blocks.size()) {
- return null;
- }
- BlockMetaData block = blocks.get(blockIndex);
- if (block.getRowCount() == 0) {
- throw new ParquetEmptyBlockException("Illegal row group of 0 rows");
- }
- org.apache.parquet.local.ColumnChunkPageReadStore rowGroup =
- new ColumnChunkPageReadStore(block.getRowCount(), block.getRowIndexOffset());
- // prepare the list of consecutive parts to read them in one scan
- List allParts = new ArrayList();
- ConsecutivePartList currentParts = null;
- for (ColumnChunkMetaData mc : block.getColumns()) {
- ColumnPath pathKey = mc.getPath();
- ColumnDescriptor columnDescriptor = paths.get(pathKey);
- if (columnDescriptor != null) {
- long startingPos = mc.getStartingPos();
- // first part or not consecutive => new list
- if (currentParts == null || currentParts.endPos() != startingPos) {
- currentParts = new ConsecutivePartList(startingPos);
- allParts.add(currentParts);
- }
- currentParts.addChunk(
- new ChunkDescriptor(columnDescriptor, mc, startingPos, mc.getTotalSize()));
- }
- }
- // actually read all the chunks
- ChunkListBuilder builder = new ChunkListBuilder(block.getRowCount());
- for (ConsecutivePartList consecutiveChunks : allParts) {
- consecutiveChunks.readAll(f, builder);
- }
- for (Chunk chunk : builder.build()) {
- readChunkPages(chunk, block, rowGroup);
- }
-
- return rowGroup;
- }
-
- /**
- * Reads all the columns requested from the specified row group. It may skip specific pages based
- * on the column indexes according to the actual filter. As the rows are not aligned among the
- * pages of the different columns row synchronization might be required. See the documentation of
- * the class SynchronizingColumnReader for details.
- *
- * @param blockIndex the index of the requested block
- * @return the PageReadStore which can provide PageReaders for each column or null if there are no
- * rows in this block
- * @throws IOException if an error occurs while reading
- */
- public PageReadStore readFilteredRowGroup(int blockIndex) throws IOException {
- if (blockIndex < 0 || blockIndex >= blocks.size()) {
- return null;
- }
-
- // Filtering not required -> fall back to the non-filtering path
- if (!options.useColumnIndexFilter()
- || !FilterCompat.isFilteringRequired(options.getRecordFilter())) {
- return internalReadRowGroup(blockIndex);
- }
-
- BlockMetaData block = blocks.get(blockIndex);
- if (block.getRowCount() == 0) {
- throw new ParquetEmptyBlockException("Illegal row group of 0 rows");
- }
-
- RowRanges rowRanges = getRowRanges(blockIndex);
- return readFilteredRowGroup(blockIndex, rowRanges);
- }
-
- /**
- * Reads all the columns requested from the specified row group. It may skip specific pages based
- * on the {@code rowRanges} passed in. As the rows are not aligned among the pages of the
- * different columns row synchronization might be required. See the documentation of the class
- * SynchronizingColumnReader for details.
- *
- * @param blockIndex the index of the requested block
- * @param rowRanges the row ranges to be read from the requested block
- * @return the PageReadStore which can provide PageReaders for each column or null if there are no
- * rows in this block
- * @throws IOException if an error occurs while reading
- * @throws IllegalArgumentException if the {@code blockIndex} is invalid or the {@code rowRanges}
- * is null
- */
- public ColumnChunkPageReadStore readFilteredRowGroup(int blockIndex, RowRanges rowRanges)
- throws IOException {
- if (blockIndex < 0 || blockIndex >= blocks.size()) {
- throw new IllegalArgumentException(
- String.format(
- "Invalid block index %s, the valid block index range are: " + "[%s, %s]",
- blockIndex, 0, blocks.size() - 1));
- }
-
- if (Objects.isNull(rowRanges)) {
- throw new IllegalArgumentException("RowRanges must not be null");
- }
-
- BlockMetaData block = blocks.get(blockIndex);
- if (block.getRowCount() == 0L) {
- return null;
- }
-
- long rowCount = rowRanges.rowCount();
- if (rowCount == 0) {
- // There are no matching rows -> returning null
- return null;
- }
-
- if (rowCount == block.getRowCount()) {
- // All rows are matching -> fall back to the non-filtering path
- return internalReadRowGroup(blockIndex);
- }
-
- return internalReadFilteredRowGroup(block, rowRanges, getColumnIndexStore(blockIndex));
- }
-
- /**
- * Reads all the columns requested from the row group at the current file position. It may skip
- * specific pages based on the column indexes according to the actual filter. As the rows are not
- * aligned among the pages of the different columns row synchronization might be required. See the
- * documentation of the class SynchronizingColumnReader for details.
- *
- * @return the PageReadStore which can provide PageReaders for each column
- * @throws IOException if an error occurs while reading
- */
- public PageReadStore readNextFilteredRowGroup() throws IOException {
- if (currentBlock == blocks.size()) {
- return null;
- }
- // Filtering not required -> fall back to the non-filtering path
- if (!options.useColumnIndexFilter()
- || !FilterCompat.isFilteringRequired(options.getRecordFilter())) {
- return readNextRowGroup();
- }
- BlockMetaData block = blocks.get(currentBlock);
- if (block.getRowCount() == 0L) {
- LOG.warn("Read empty block at index {} from {}", currentBlock, getFile());
- // Skip the empty block
- advanceToNextBlock();
- return readNextFilteredRowGroup();
- }
- RowRanges rowRanges = getRowRanges(currentBlock);
- long rowCount = rowRanges.rowCount();
- if (rowCount == 0) {
- // There are no matching rows -> skipping this row-group
- advanceToNextBlock();
- return readNextFilteredRowGroup();
- }
- if (rowCount == block.getRowCount()) {
- // All rows are matching -> fall back to the non-filtering path
- return readNextRowGroup();
- }
-
- this.currentRowGroup =
- internalReadFilteredRowGroup(block, rowRanges, getColumnIndexStore(currentBlock));
-
- // avoid re-reading bytes the dictionary reader is used after this call
- if (nextDictionaryReader != null) {
- nextDictionaryReader.setRowGroup(currentRowGroup);
- }
-
- advanceToNextBlock();
-
- return this.currentRowGroup;
- }
-
- private ColumnChunkPageReadStore internalReadFilteredRowGroup(
- BlockMetaData block, RowRanges rowRanges, ColumnIndexStore ciStore) throws IOException {
- ColumnChunkPageReadStore rowGroup =
- new ColumnChunkPageReadStore(rowRanges, block.getRowIndexOffset());
- // prepare the list of consecutive parts to read them in one scan
- ChunkListBuilder builder = new ChunkListBuilder(block.getRowCount());
- List allParts = new ArrayList<>();
- ConsecutivePartList currentParts = null;
- for (ColumnChunkMetaData mc : block.getColumns()) {
- ColumnPath pathKey = mc.getPath();
- ColumnDescriptor columnDescriptor = paths.get(pathKey);
- if (columnDescriptor != null) {
- OffsetIndex offsetIndex = ciStore.getOffsetIndex(mc.getPath());
-
- OffsetIndex filteredOffsetIndex =
- filterOffsetIndex(offsetIndex, rowRanges, block.getRowCount());
- for (OffsetRange range :
- calculateOffsetRanges(filteredOffsetIndex, mc, offsetIndex.getOffset(0))) {
- long startingPos = range.getOffset();
- // first part or not consecutive => new list
- if (currentParts == null || currentParts.endPos() != startingPos) {
- currentParts = new ConsecutivePartList(startingPos);
- allParts.add(currentParts);
- }
- ChunkDescriptor chunkDescriptor =
- new ChunkDescriptor(columnDescriptor, mc, startingPos, range.getLength());
- currentParts.addChunk(chunkDescriptor);
- builder.setOffsetIndex(chunkDescriptor, filteredOffsetIndex);
- }
- }
- }
- // actually read all the chunks
- for (ConsecutivePartList consecutiveChunks : allParts) {
- consecutiveChunks.readAll(f, builder);
- }
- for (Chunk chunk : builder.build()) {
- readChunkPages(chunk, block, rowGroup);
- }
-
- return rowGroup;
- }
-
- private void readChunkPages(Chunk chunk, BlockMetaData block, ColumnChunkPageReadStore rowGroup)
- throws IOException {
- if (null == fileDecryptor || fileDecryptor.plaintextFile()) {
- rowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages());
- return;
- }
- // Encrypted file
- ColumnPath columnPath = ColumnPath.get(chunk.descriptor.col.getPath());
- InternalColumnDecryptionSetup columnDecryptionSetup = fileDecryptor.getColumnSetup(columnPath);
- if (!columnDecryptionSetup.isEncrypted()) { // plaintext column
- rowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages());
- } else { // encrypted column
- rowGroup.addColumn(
- chunk.descriptor.col,
- chunk.readAllPages(
- columnDecryptionSetup.getMetaDataDecryptor(),
- columnDecryptionSetup.getDataDecryptor(),
- fileDecryptor.getFileAAD(),
- block.getOrdinal(),
- columnDecryptionSetup.getOrdinal()));
- }
- }
-
- public ColumnIndexStore getColumnIndexStore(int blockIndex) {
- ColumnIndexStore ciStore = blockIndexStores.get(blockIndex);
- if (ciStore == null) {
- ciStore =
- org.apache.parquet.local.ColumnIndexStoreImpl.create(
- this, blocks.get(blockIndex), paths.keySet());
- blockIndexStores.set(blockIndex, ciStore);
- }
- return ciStore;
- }
-
- private RowRanges getRowRanges(int blockIndex) {
- assert FilterCompat.isFilteringRequired(options.getRecordFilter())
- : "Should not be invoked if filter is null or NOOP";
- RowRanges rowRanges = blockRowRanges.get(blockIndex);
- if (rowRanges == null) {
- rowRanges =
- ColumnIndexFilter.calculateRowRanges(
- options.getRecordFilter(),
- getColumnIndexStore(blockIndex),
- paths.keySet(),
- blocks.get(blockIndex).getRowCount());
- blockRowRanges.set(blockIndex, rowRanges);
- }
- return rowRanges;
- }
-
- public boolean skipNextRowGroup() {
- return advanceToNextBlock();
- }
-
- private boolean advanceToNextBlock() {
- if (currentBlock == blocks.size()) {
- return false;
- }
-
- // update the current block and instantiate a dictionary reader for it
- ++currentBlock;
- this.nextDictionaryReader = null;
-
- return true;
- }
-
- /**
- * Returns a {@link DictionaryPageReadStore} for the row group that would be returned by calling
- * {@link #readNextRowGroup()} or skipped by calling {@link #skipNextRowGroup()}.
- *
- * @return a DictionaryPageReadStore for the next row group
- */
- public DictionaryPageReadStore getNextDictionaryReader() {
- if (nextDictionaryReader == null) {
- this.nextDictionaryReader = getDictionaryReader(currentBlock);
- }
- return nextDictionaryReader;
- }
-
- public DictionaryPageReader getDictionaryReader(int blockIndex) {
- if (blockIndex < 0 || blockIndex >= blocks.size()) {
- return null;
- }
- return new DictionaryPageReader(this, blocks.get(blockIndex));
- }
-
- public DictionaryPageReader getDictionaryReader(BlockMetaData block) {
- return new DictionaryPageReader(this, block);
- }
-
- /**
- * Reads and decompresses a dictionary page for the given column chunk.
- *
- * Returns null if the given column chunk has no dictionary page.
- *
- * @param meta a column's ColumnChunkMetaData to read the dictionary from
- * @return an uncompressed DictionaryPage or null
- * @throws IOException if there is an error while reading the dictionary
- */
- DictionaryPage readDictionary(ColumnChunkMetaData meta) throws IOException {
- if (!meta.hasDictionaryPage()) {
- return null;
- }
-
- // TODO: this should use getDictionaryPageOffset() but it isn't reliable.
- if (f.getPos() != meta.getStartingPos()) {
- f.seek(meta.getStartingPos());
- }
-
- boolean encryptedColumn = false;
- InternalColumnDecryptionSetup columnDecryptionSetup = null;
- byte[] dictionaryPageAAD = null;
- BlockCipher.Decryptor pageDecryptor = null;
- if (null != fileDecryptor && !fileDecryptor.plaintextFile()) {
- columnDecryptionSetup = fileDecryptor.getColumnSetup(meta.getPath());
- if (columnDecryptionSetup.isEncrypted()) {
- encryptedColumn = true;
- }
- }
-
- PageHeader pageHeader;
- if (!encryptedColumn) {
- pageHeader = Util.readPageHeader(f);
- } else {
- byte[] dictionaryPageHeaderAAD =
- AesCipher.createModuleAAD(
- fileDecryptor.getFileAAD(),
- ModuleType.DictionaryPageHeader,
- meta.getRowGroupOrdinal(),
- columnDecryptionSetup.getOrdinal(),
- -1);
- pageHeader =
- Util.readPageHeader(
- f, columnDecryptionSetup.getMetaDataDecryptor(), dictionaryPageHeaderAAD);
- dictionaryPageAAD =
- AesCipher.createModuleAAD(
- fileDecryptor.getFileAAD(),
- ModuleType.DictionaryPage,
- meta.getRowGroupOrdinal(),
- columnDecryptionSetup.getOrdinal(),
- -1);
- pageDecryptor = columnDecryptionSetup.getDataDecryptor();
- }
-
- if (!pageHeader.isSetDictionary_page_header()) {
- return null; // TODO: should this complain?
- }
-
- DictionaryPage compressedPage =
- readCompressedDictionary(pageHeader, f, pageDecryptor, dictionaryPageAAD);
- BytesInputDecompressor decompressor =
- options.getCodecFactory().getDecompressor(meta.getCodec());
-
- return new DictionaryPage(
- decompressor.decompress(compressedPage.getBytes(), compressedPage.getUncompressedSize()),
- compressedPage.getDictionarySize(),
- compressedPage.getEncoding());
- }
-
- private DictionaryPage readCompressedDictionary(
- PageHeader pageHeader,
- SeekableInputStream fin,
- BlockCipher.Decryptor pageDecryptor,
- byte[] dictionaryPageAAD)
- throws IOException {
- DictionaryPageHeader dictHeader = pageHeader.getDictionary_page_header();
-
- int uncompressedPageSize = pageHeader.getUncompressed_page_size();
- int compressedPageSize = pageHeader.getCompressed_page_size();
-
- byte[] dictPageBytes = new byte[compressedPageSize];
- fin.readFully(dictPageBytes);
-
- BytesInput bin = BytesInput.from(dictPageBytes);
-
- if (null != pageDecryptor) {
- bin = BytesInput.from(pageDecryptor.decrypt(bin.toByteArray(), dictionaryPageAAD));
- }
-
- return new DictionaryPage(
- bin,
- uncompressedPageSize,
- dictHeader.getNum_values(),
- converter.getEncoding(dictHeader.getEncoding()));
- }
-
- public BloomFilterReader getBloomFilterDataReader(int blockIndex) {
- if (blockIndex < 0 || blockIndex >= blocks.size()) {
- return null;
- }
- return new BloomFilterReader(this, blocks.get(blockIndex));
- }
-
- public BloomFilterReader getBloomFilterDataReader(BlockMetaData block) {
- return new BloomFilterReader(this, block);
- }
-
- /**
- * Reads Bloom filter data for the given column chunk.
- *
- * @param meta a column's ColumnChunkMetaData to read the dictionary from
- * @return an BloomFilter object.
- * @throws IOException if there is an error while reading the Bloom filter.
- */
- public BloomFilter readBloomFilter(ColumnChunkMetaData meta) throws IOException {
- long bloomFilterOffset = meta.getBloomFilterOffset();
- if (bloomFilterOffset < 0) {
- return null;
- }
-
- // Prepare to decrypt Bloom filter (for encrypted columns)
- BlockCipher.Decryptor bloomFilterDecryptor = null;
- byte[] bloomFilterHeaderAAD = null;
- byte[] bloomFilterBitsetAAD = null;
- if (null != fileDecryptor && !fileDecryptor.plaintextFile()) {
- InternalColumnDecryptionSetup columnDecryptionSetup =
- fileDecryptor.getColumnSetup(meta.getPath());
- if (columnDecryptionSetup.isEncrypted()) {
- bloomFilterDecryptor = columnDecryptionSetup.getMetaDataDecryptor();
- bloomFilterHeaderAAD =
- AesCipher.createModuleAAD(
- fileDecryptor.getFileAAD(),
- ModuleType.BloomFilterHeader,
- meta.getRowGroupOrdinal(),
- columnDecryptionSetup.getOrdinal(),
- -1);
- bloomFilterBitsetAAD =
- AesCipher.createModuleAAD(
- fileDecryptor.getFileAAD(),
- ModuleType.BloomFilterBitset,
- meta.getRowGroupOrdinal(),
- columnDecryptionSetup.getOrdinal(),
- -1);
- }
- }
-
- // Read Bloom filter data header.
- f.seek(bloomFilterOffset);
- BloomFilterHeader bloomFilterHeader;
- try {
- bloomFilterHeader = Util.readBloomFilterHeader(f, bloomFilterDecryptor, bloomFilterHeaderAAD);
- } catch (IOException e) {
- LOG.warn("read no bloom filter");
- return null;
- }
-
- int numBytes = bloomFilterHeader.getNumBytes();
- if (numBytes <= 0 || numBytes > BlockSplitBloomFilter.UPPER_BOUND_BYTES) {
- LOG.warn("the read bloom filter size is wrong, size is {}", bloomFilterHeader.getNumBytes());
- return null;
- }
-
- if (!bloomFilterHeader.getHash().isSetXXHASH()
- || !bloomFilterHeader.getAlgorithm().isSetBLOCK()
- || !bloomFilterHeader.getCompression().isSetUNCOMPRESSED()) {
- LOG.warn(
- "the read bloom filter is not supported yet, algorithm = {}, hash = {}, compression = {}",
- bloomFilterHeader.getAlgorithm(),
- bloomFilterHeader.getHash(),
- bloomFilterHeader.getCompression());
- return null;
- }
-
- byte[] bitset;
- if (null == bloomFilterDecryptor) {
- bitset = new byte[numBytes];
- f.readFully(bitset);
- } else {
- bitset = bloomFilterDecryptor.decrypt(f, bloomFilterBitsetAAD);
- if (bitset.length != numBytes) {
- throw new ParquetCryptoRuntimeException("Wrong length of decrypted bloom filter bitset");
- }
- }
- return new BlockSplitBloomFilter(bitset);
- }
-
- /**
- * @param column the column chunk which the column index is to be returned for
- * @return the column index for the specified column chunk or {@code null} if there is no index
- * @throws IOException if any I/O error occurs during reading the file
- */
- @Private
- public ColumnIndex readColumnIndex(ColumnChunkMetaData column) throws IOException {
- IndexReference ref = column.getColumnIndexReference();
- if (ref == null) {
- return null;
- }
- f.seek(ref.getOffset());
-
- BlockCipher.Decryptor columnIndexDecryptor = null;
- byte[] columnIndexAAD = null;
- if (null != fileDecryptor && !fileDecryptor.plaintextFile()) {
- InternalColumnDecryptionSetup columnDecryptionSetup =
- fileDecryptor.getColumnSetup(column.getPath());
- if (columnDecryptionSetup.isEncrypted()) {
- columnIndexDecryptor = columnDecryptionSetup.getMetaDataDecryptor();
- columnIndexAAD =
- AesCipher.createModuleAAD(
- fileDecryptor.getFileAAD(),
- ModuleType.ColumnIndex,
- column.getRowGroupOrdinal(),
- columnDecryptionSetup.getOrdinal(),
- -1);
- }
- }
- return ParquetMetadataConverter.fromParquetColumnIndex(
- column.getPrimitiveType(), Util.readColumnIndex(f, columnIndexDecryptor, columnIndexAAD));
- }
-
- /**
- * @param column the column chunk which the offset index is to be returned for
- * @return the offset index for the specified column chunk or {@code null} if there is no index
- * @throws IOException if any I/O error occurs during reading the file
- */
- @Private
- public OffsetIndex readOffsetIndex(ColumnChunkMetaData column) throws IOException {
- IndexReference ref = column.getOffsetIndexReference();
- if (ref == null) {
- return null;
- }
- f.seek(ref.getOffset());
-
- BlockCipher.Decryptor offsetIndexDecryptor = null;
- byte[] offsetIndexAAD = null;
- if (null != fileDecryptor && !fileDecryptor.plaintextFile()) {
- InternalColumnDecryptionSetup columnDecryptionSetup =
- fileDecryptor.getColumnSetup(column.getPath());
- if (columnDecryptionSetup.isEncrypted()) {
- offsetIndexDecryptor = columnDecryptionSetup.getMetaDataDecryptor();
- offsetIndexAAD =
- AesCipher.createModuleAAD(
- fileDecryptor.getFileAAD(),
- ModuleType.OffsetIndex,
- column.getRowGroupOrdinal(),
- columnDecryptionSetup.getOrdinal(),
- -1);
- }
- }
- return ParquetMetadataConverter.fromParquetOffsetIndex(
- Util.readOffsetIndex(f, offsetIndexDecryptor, offsetIndexAAD));
- }
-
- @Override
- public void close() throws IOException {
- try {
- if (f != null) {
- f.close();
- }
- } finally {
- options.getCodecFactory().release();
- }
- }
-
- public ParquetMetadata getFooter() {
- return footer;
- }
-
- /*
- * Builder to concatenate the buffers of the discontinuous parts for the same column. These parts are generated as a
- * result of the column-index based filtering when some pages might be skipped at reading.
- */
- private class ChunkListBuilder {
- private class ChunkData {
- final List buffers = new ArrayList<>();
- OffsetIndex offsetIndex;
- }
-
- private final Map map = new HashMap<>();
- private ChunkDescriptor lastDescriptor;
- private final long rowCount;
- private SeekableInputStream f;
-
- public ChunkListBuilder(long rowCount) {
- this.rowCount = rowCount;
- }
-
- void add(ChunkDescriptor descriptor, List buffers, SeekableInputStream f) {
- map.computeIfAbsent(descriptor, d -> new ChunkData()).buffers.addAll(buffers);
- lastDescriptor = descriptor;
- this.f = f;
- }
-
- void setOffsetIndex(ChunkDescriptor descriptor, OffsetIndex offsetIndex) {
- map.computeIfAbsent(descriptor, d -> new ChunkData()).offsetIndex = offsetIndex;
- }
-
- List build() {
- Set> entries = map.entrySet();
- List chunks = new ArrayList<>(entries.size());
- for (Entry entry : entries) {
- ChunkDescriptor descriptor = entry.getKey();
- ChunkData data = entry.getValue();
- if (descriptor.equals(lastDescriptor)) {
- // because of a bug, the last chunk might be larger than descriptor.size
- chunks.add(
- new WorkaroundChunk(lastDescriptor, data.buffers, f, data.offsetIndex, rowCount));
- } else {
- chunks.add(new Chunk(descriptor, data.buffers, data.offsetIndex, rowCount));
- }
- }
- return chunks;
- }
- }
-
- /** The data for a column chunk */
- private class Chunk {
-
- protected final ChunkDescriptor descriptor;
- protected final ByteBufferInputStream stream;
- final OffsetIndex offsetIndex;
- final long rowCount;
-
- /**
- * @param descriptor descriptor for the chunk
- * @param buffers ByteBuffers that contain the chunk
- * @param offsetIndex the offset index for this column; might be null
- */
- public Chunk(
- ChunkDescriptor descriptor,
- List buffers,
- OffsetIndex offsetIndex,
- long rowCount) {
- this.descriptor = descriptor;
- this.stream = ByteBufferInputStream.wrap(buffers);
- this.offsetIndex = offsetIndex;
- this.rowCount = rowCount;
- }
-
- protected PageHeader readPageHeader() throws IOException {
- return readPageHeader(null, null);
- }
-
- protected PageHeader readPageHeader(BlockCipher.Decryptor blockDecryptor, byte[] pageHeaderAAD)
- throws IOException {
- return Util.readPageHeader(stream, blockDecryptor, pageHeaderAAD);
- }
-
- /**
- * Calculate checksum of input bytes, throw decoding exception if it does not match the provided
- * reference crc
- */
- private void verifyCrc(int referenceCrc, byte[] bytes, String exceptionMsg) {
- crc.reset();
- crc.update(bytes);
- if (crc.getValue() != ((long) referenceCrc & 0xffffffffL)) {
- throw new ParquetDecodingException(exceptionMsg);
- }
- }
-
- /**
- * Read all of the pages in a given column chunk.
- *
- * @return the list of pages
- */
- public ColumnChunkPageReader readAllPages() throws IOException {
- return readAllPages(null, null, null, -1, -1);
- }
-
- public ColumnChunkPageReader readAllPages(
- BlockCipher.Decryptor headerBlockDecryptor,
- BlockCipher.Decryptor pageBlockDecryptor,
- byte[] aadPrefix,
- int rowGroupOrdinal,
- int columnOrdinal)
- throws IOException {
- List pagesInChunk = new ArrayList<>();
- DictionaryPage dictionaryPage = null;
- PrimitiveType type =
- getFileMetaData().getSchema().getType(descriptor.col.getPath()).asPrimitiveType();
- long valuesCountReadSoFar = 0L;
- int dataPageCountReadSoFar = 0;
- byte[] dataPageHeaderAAD = null;
- if (null != headerBlockDecryptor) {
- dataPageHeaderAAD =
- AesCipher.createModuleAAD(
- aadPrefix,
- ModuleType.DataPageHeader,
- rowGroupOrdinal,
- columnOrdinal,
- getPageOrdinal(dataPageCountReadSoFar));
- }
- while (hasMorePages(valuesCountReadSoFar, dataPageCountReadSoFar)) {
- byte[] pageHeaderAAD = dataPageHeaderAAD;
- if (null != headerBlockDecryptor) {
- // Important: this verifies file integrity (makes sure dictionary page had not been
- // removed)
- if (null == dictionaryPage && descriptor.metadata.hasDictionaryPage()) {
- pageHeaderAAD =
- AesCipher.createModuleAAD(
- aadPrefix, ModuleType.DictionaryPageHeader, rowGroupOrdinal, columnOrdinal, -1);
- } else {
- int pageOrdinal = getPageOrdinal(dataPageCountReadSoFar);
- AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
- }
- }
- PageHeader pageHeader = readPageHeader(headerBlockDecryptor, pageHeaderAAD);
- int uncompressedPageSize = pageHeader.getUncompressed_page_size();
- int compressedPageSize = pageHeader.getCompressed_page_size();
- final BytesInput pageBytes;
- switch (pageHeader.type) {
- case DICTIONARY_PAGE:
- // there is only one dictionary page per column chunk
- if (dictionaryPage != null) {
- throw new ParquetDecodingException(
- "more than one dictionary page in column " + descriptor.col);
- }
- pageBytes = this.readAsBytesInput(compressedPageSize);
- if (options.usePageChecksumVerification() && pageHeader.isSetCrc()) {
- verifyCrc(
- pageHeader.getCrc(),
- pageBytes.toByteArray(),
- "could not verify dictionary page integrity, CRC checksum verification failed");
- }
- DictionaryPageHeader dicHeader = pageHeader.getDictionary_page_header();
- dictionaryPage =
- new DictionaryPage(
- pageBytes,
- uncompressedPageSize,
- dicHeader.getNum_values(),
- converter.getEncoding(dicHeader.getEncoding()));
- // Copy crc to new page, used for testing
- if (pageHeader.isSetCrc()) {
- dictionaryPage.setCrc(pageHeader.getCrc());
- }
- break;
- case DATA_PAGE:
- DataPageHeader dataHeaderV1 = pageHeader.getData_page_header();
- pageBytes = this.readAsBytesInput(compressedPageSize);
- if (options.usePageChecksumVerification() && pageHeader.isSetCrc()) {
- verifyCrc(
- pageHeader.getCrc(),
- pageBytes.toByteArray(),
- "could not verify page integrity, CRC checksum verification failed");
- }
- DataPageV1 dataPageV1 =
- new DataPageV1(
- pageBytes,
- dataHeaderV1.getNum_values(),
- uncompressedPageSize,
- converter.fromParquetStatistics(
- getFileMetaData().getCreatedBy(), dataHeaderV1.getStatistics(), type),
- converter.getEncoding(dataHeaderV1.getRepetition_level_encoding()),
- converter.getEncoding(dataHeaderV1.getDefinition_level_encoding()),
- converter.getEncoding(dataHeaderV1.getEncoding()));
- // Copy crc to new page, used for testing
- if (pageHeader.isSetCrc()) {
- dataPageV1.setCrc(pageHeader.getCrc());
- }
- pagesInChunk.add(dataPageV1);
- valuesCountReadSoFar += dataHeaderV1.getNum_values();
- ++dataPageCountReadSoFar;
- break;
- case DATA_PAGE_V2:
- DataPageHeaderV2 dataHeaderV2 = pageHeader.getData_page_header_v2();
- int dataSize =
- compressedPageSize
- - dataHeaderV2.getRepetition_levels_byte_length()
- - dataHeaderV2.getDefinition_levels_byte_length();
- pagesInChunk.add(
- new DataPageV2(
- dataHeaderV2.getNum_rows(),
- dataHeaderV2.getNum_nulls(),
- dataHeaderV2.getNum_values(),
- this.readAsBytesInput(dataHeaderV2.getRepetition_levels_byte_length()),
- this.readAsBytesInput(dataHeaderV2.getDefinition_levels_byte_length()),
- converter.getEncoding(dataHeaderV2.getEncoding()),
- this.readAsBytesInput(dataSize),
- uncompressedPageSize,
- converter.fromParquetStatistics(
- getFileMetaData().getCreatedBy(), dataHeaderV2.getStatistics(), type),
- dataHeaderV2.isIs_compressed()));
- valuesCountReadSoFar += dataHeaderV2.getNum_values();
- ++dataPageCountReadSoFar;
- break;
- default:
- LOG.debug(
- "skipping page of type {} of size {}", pageHeader.getType(), compressedPageSize);
- stream.skipFully(compressedPageSize);
- break;
- }
- }
- if (offsetIndex == null && valuesCountReadSoFar != descriptor.metadata.getValueCount()) {
- // Would be nice to have a CorruptParquetFileException or something as a subclass?
- throw new IOException(
- "Expected "
- + descriptor.metadata.getValueCount()
- + " values in column chunk at "
- + getFile()
- + " offset "
- + descriptor.metadata.getFirstDataPageOffset()
- + " but got "
- + valuesCountReadSoFar
- + " values instead over "
- + pagesInChunk.size()
- + " pages ending at file offset "
- + (descriptor.fileOffset + stream.position()));
- }
- BytesInputDecompressor decompressor =
- options.getCodecFactory().getDecompressor(descriptor.metadata.getCodec());
- return new ColumnChunkPageReader(
- decompressor,
- pagesInChunk,
- dictionaryPage,
- offsetIndex,
- rowCount,
- pageBlockDecryptor,
- aadPrefix,
- rowGroupOrdinal,
- columnOrdinal);
- }
-
- private boolean hasMorePages(long valuesCountReadSoFar, int dataPageCountReadSoFar) {
- return offsetIndex == null
- ? valuesCountReadSoFar < descriptor.metadata.getValueCount()
- : dataPageCountReadSoFar < offsetIndex.getPageCount();
- }
-
- private int getPageOrdinal(int dataPageCountReadSoFar) {
- if (null == offsetIndex) {
- return dataPageCountReadSoFar;
- }
-
- return offsetIndex.getPageOrdinal(dataPageCountReadSoFar);
- }
-
- /**
- * @param size the size of the page
- * @return the page
- * @throws IOException if there is an error while reading from the file stream
- */
- public BytesInput readAsBytesInput(int size) throws IOException {
- return BytesInput.from(stream.sliceBuffers(size));
- }
- }
-
- /** deals with a now fixed bug where compressedLength was missing a few bytes. */
- private class WorkaroundChunk extends Chunk {
-
- private final SeekableInputStream f;
-
- /**
- * @param descriptor the descriptor of the chunk
- * @param f the file stream positioned at the end of this chunk
- */
- private WorkaroundChunk(
- ChunkDescriptor descriptor,
- List buffers,
- SeekableInputStream f,
- OffsetIndex offsetIndex,
- long rowCount) {
- super(descriptor, buffers, offsetIndex, rowCount);
- this.f = f;
- }
-
- protected PageHeader readPageHeader() throws IOException {
- PageHeader pageHeader;
- stream.mark(8192); // headers should not be larger than 8k
- try {
- pageHeader = Util.readPageHeader(stream);
- } catch (IOException e) {
- // this is to workaround a bug where the compressedLength
- // of the chunk is missing the size of the header of the dictionary
- // to allow reading older files (using dictionary) we need this.
- // usually 13 to 19 bytes are missing
- // if the last page is smaller than this, the page header itself is truncated in the buffer.
- stream.reset(); // resetting the buffer to the position before we got the error
- LOG.info("completing the column chunk to read the page header");
- pageHeader =
- Util.readPageHeader(
- new SequenceInputStream(
- stream, f)); // trying again from the buffer + remainder of the stream.
- }
- return pageHeader;
- }
-
- public BytesInput readAsBytesInput(int size) throws IOException {
- int available = stream.available();
- if (size > available) {
- // this is to workaround a bug where the compressedLength
- // of the chunk is missing the size of the header of the dictionary
- // to allow reading older files (using dictionary) we need this.
- // usually 13 to 19 bytes are missing
- int missingBytes = size - available;
- LOG.info("completed the column chunk with {} bytes", missingBytes);
-
- List streamBuffers = stream.sliceBuffers(available);
-
- ByteBuffer lastBuffer = ByteBuffer.allocate(missingBytes);
- f.readFully(lastBuffer);
-
- List buffers = new ArrayList<>(streamBuffers.size() + 1);
- buffers.addAll(streamBuffers);
- buffers.add(lastBuffer);
-
- return BytesInput.from(buffers);
- }
-
- return super.readAsBytesInput(size);
- }
- }
-
- /** Information needed to read a column chunk or a part of it. */
- private static class ChunkDescriptor {
-
- private final ColumnDescriptor col;
- private final ColumnChunkMetaData metadata;
- private final long fileOffset;
- private final long size;
-
- /**
- * @param col column this chunk is part of
- * @param metadata metadata for the column
- * @param fileOffset offset in the file where this chunk starts
- * @param size size of the chunk
- */
- private ChunkDescriptor(
- ColumnDescriptor col, ColumnChunkMetaData metadata, long fileOffset, long size) {
- super();
- this.col = col;
- this.metadata = metadata;
- this.fileOffset = fileOffset;
- this.size = size;
- }
-
- @Override
- public int hashCode() {
- return col.hashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- } else if (obj instanceof ChunkDescriptor) {
- return col.equals(((ChunkDescriptor) obj).col);
- } else {
- return false;
- }
- }
- }
-
- /**
- * Describes a list of consecutive parts to be read at once. A consecutive part may contain whole
- * column chunks or only parts of them (some pages).
- */
- private class ConsecutivePartList {
-
- private final long offset;
- private long length;
- private final List chunks = new ArrayList<>();
-
- /** @param offset where the first chunk starts */
- ConsecutivePartList(long offset) {
- this.offset = offset;
- }
-
- /**
- * adds a chunk to the list. It must be consecutive to the previous chunk
- *
- * @param descriptor a chunk descriptor
- */
- public void addChunk(ChunkDescriptor descriptor) {
- chunks.add(descriptor);
- length += descriptor.size;
- }
-
- /**
- * @param f file to read the chunks from
- * @param builder used to build chunk list to read the pages for the different columns
- * @throws IOException if there is an error while reading from the stream
- */
- public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOException {
- f.seek(offset);
-
- int fullAllocations = Math.toIntExact(length / options.getMaxAllocationSize());
- int lastAllocationSize = Math.toIntExact(length % options.getMaxAllocationSize());
-
- int numAllocations = fullAllocations + (lastAllocationSize > 0 ? 1 : 0);
- List buffers = new ArrayList<>(numAllocations);
-
- for (int i = 0; i < fullAllocations; i += 1) {
- buffers.add(options.getAllocator().allocate(options.getMaxAllocationSize()));
- }
-
- if (lastAllocationSize > 0) {
- buffers.add(options.getAllocator().allocate(lastAllocationSize));
- }
-
- for (ByteBuffer buffer : buffers) {
- f.readFully(buffer);
- buffer.flip();
- }
-
- ByteBufferInputStream stream = ByteBufferInputStream.wrap(buffers);
- for (final ChunkDescriptor descriptor : chunks) {
- builder.add(descriptor, stream.sliceBuffers(descriptor.size), f);
- }
- }
-
- /** @return the position following the last byte of these chunks */
- public long endPos() {
- return offset + length;
- }
- }
-}
diff --git a/src/main/java/org/apache/parquet/local/ParquetFileWriter.java b/src/main/java/org/apache/parquet/local/ParquetFileWriter.java
deleted file mode 100644
index f85885c..0000000
--- a/src/main/java/org/apache/parquet/local/ParquetFileWriter.java
+++ /dev/null
@@ -1,1266 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/*
- * copied from parquet-mr, updated by An Qi
- */
-
-package org.apache.parquet.local;
-
-import org.apache.parquet.Preconditions;
-import org.apache.parquet.Version;
-import org.apache.parquet.bytes.BytesInput;
-import org.apache.parquet.bytes.BytesUtils;
-import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.Encoding;
-import org.apache.parquet.column.EncodingStats;
-import org.apache.parquet.column.page.DictionaryPage;
-import org.apache.parquet.column.statistics.Statistics;
-import org.apache.parquet.column.values.bloomfilter.BloomFilter;
-import org.apache.parquet.crypto.*;
-import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
-import org.apache.parquet.format.BlockCipher;
-import org.apache.parquet.format.Util;
-import org.apache.parquet.hadoop.metadata.*;
-import org.apache.parquet.internal.column.columnindex.ColumnIndex;
-import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder;
-import org.apache.parquet.internal.column.columnindex.OffsetIndex;
-import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder;
-import org.apache.parquet.internal.hadoop.metadata.IndexReference;
-import org.apache.parquet.io.OutputFile;
-import org.apache.parquet.io.ParquetEncodingException;
-import org.apache.parquet.io.PositionOutputStream;
-import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.PrimitiveType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.*;
-import java.util.zip.CRC32;
-
-import static org.apache.parquet.format.Util.writeFileCryptoMetaData;
-import static org.apache.parquet.format.Util.writeFileMetaData;
-import static org.apache.parquet.format.converter.ParquetMetadataConverter.MAX_STATS_SIZE;
-
-/** Internal implementation of the Parquet file writer as a block container */
-public class ParquetFileWriter {
- private static final Logger LOG = LoggerFactory.getLogger(ParquetFileWriter.class);
-
- private final ParquetMetadataConverter metadataConverter;
-
- public static final String PARQUET_METADATA_FILE = "_metadata";
- public static final String MAGIC_STR = "PAR1";
- public static final byte[] MAGIC = MAGIC_STR.getBytes(StandardCharsets.US_ASCII);
- public static final String EF_MAGIC_STR = "PARE";
- public static final byte[] EFMAGIC = EF_MAGIC_STR.getBytes(StandardCharsets.US_ASCII);
- public static final String PARQUET_COMMON_METADATA_FILE = "_common_metadata";
- public static final int CURRENT_VERSION = 1;
-
- // File creation modes
- public static enum Mode {
- CREATE,
- OVERWRITE
- }
-
- protected final PositionOutputStream out;
-
- private final AlignmentStrategy alignment;
- private final int columnIndexTruncateLength;
-
- // file data
- private List blocks = new ArrayList();
-
- // The column/offset indexes per blocks per column chunks
- private final List> columnIndexes = new ArrayList<>();
- private final List> offsetIndexes = new ArrayList<>();
-
- // The Bloom filters
- private final List