From dcb84c647cba46a197e82b39203d928d5ded45b9 Mon Sep 17 00:00:00 2001 From: Aitozi Date: Thu, 7 Nov 2024 20:58:52 +0800 Subject: [PATCH] [parquet] Support to enable parquet bloomfilter --- .../paimon/format/FormatReadWriteTest.java | 4 +- .../format/parquet/ColumnConfigParser.java | 77 +++++++++++++++++ .../parquet/writer/RowDataParquetBuilder.java | 83 +++++++++++++------ .../parquet/ParquetFormatReadWriteTest.java | 52 ++++++++++++ 4 files changed, 187 insertions(+), 29 deletions(-) create mode 100644 paimon-format/src/main/java/org/apache/paimon/format/parquet/ColumnConfigParser.java diff --git a/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java b/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java index d393a9192523e..d3114cee6d76d 100644 --- a/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java @@ -60,8 +60,8 @@ public abstract class FormatReadWriteTest { private final String formatType; - private FileIO fileIO; - private Path file; + protected FileIO fileIO; + protected Path file; protected FormatReadWriteTest(String formatType) { this.formatType = formatType; diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ColumnConfigParser.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ColumnConfigParser.java new file mode 100644 index 0000000000000..a4e33807f6ee5 --- /dev/null +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ColumnConfigParser.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format.parquet; + +import org.apache.hadoop.conf.Configuration; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.function.BiConsumer; +import java.util.function.Function; + +/** + * Parses the specified key-values in the format of root.key#column.path from a {@link + * Configuration} object. + * + *

NOTE: The file was copied from Apache parquet project. + */ +public class ColumnConfigParser { + + private static class ConfigHelper { + private final String prefix; + private final Function function; + private final BiConsumer consumer; + + public ConfigHelper( + String prefix, Function function, BiConsumer consumer) { + this.prefix = prefix; + this.function = function; + this.consumer = consumer; + } + + public void processKey(String key) { + if (key.startsWith(prefix)) { + String columnPath = key.substring(prefix.length()); + T value = function.apply(key); + consumer.accept(columnPath, value); + } + } + } + + private final List> helpers = new ArrayList<>(); + + public ColumnConfigParser withColumnConfig( + String rootKey, Function function, BiConsumer consumer) { + helpers.add(new ConfigHelper(rootKey + '#', function, consumer)); + return this; + } + + public void parseConfig(Configuration conf) { + for (Map.Entry entry : conf) { + for (ConfigHelper helper : helpers) { + // We retrieve the value from function instead of parsing from the string here to + // use the exact + // implementations + // in Configuration + helper.processKey(entry.getKey()); + } + } + } +} diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/RowDataParquetBuilder.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/RowDataParquetBuilder.java index 26d38a1c18636..da55f94942fd6 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/RowDataParquetBuilder.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/RowDataParquetBuilder.java @@ -19,6 +19,7 @@ package org.apache.paimon.format.parquet.writer; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.format.parquet.ColumnConfigParser; import org.apache.paimon.options.Options; import org.apache.paimon.types.RowType; @@ -46,33 +47,61 @@ public RowDataParquetBuilder(RowType rowType, Options options) { @Override public ParquetWriter createWriter(OutputFile out, String compression) throws IOException { - return new ParquetRowDataBuilder(out, rowType) - .withConf(conf) - .withCompressionCodec(CompressionCodecName.fromConf(getCompression(compression))) - .withRowGroupSize( - conf.getLong( - ParquetOutputFormat.BLOCK_SIZE, ParquetWriter.DEFAULT_BLOCK_SIZE)) - .withPageSize( - conf.getInt(ParquetOutputFormat.PAGE_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE)) - .withDictionaryPageSize( - conf.getInt( - ParquetOutputFormat.DICTIONARY_PAGE_SIZE, - ParquetProperties.DEFAULT_DICTIONARY_PAGE_SIZE)) - .withMaxPaddingSize( - conf.getInt( - ParquetOutputFormat.MAX_PADDING_BYTES, - ParquetWriter.MAX_PADDING_SIZE_DEFAULT)) - .withDictionaryEncoding( - conf.getBoolean( - ParquetOutputFormat.ENABLE_DICTIONARY, - ParquetProperties.DEFAULT_IS_DICTIONARY_ENABLED)) - .withValidation(conf.getBoolean(ParquetOutputFormat.VALIDATION, false)) - .withWriterVersion( - ParquetProperties.WriterVersion.fromString( - conf.get( - ParquetOutputFormat.WRITER_VERSION, - ParquetProperties.DEFAULT_WRITER_VERSION.toString()))) - .build(); + ParquetRowDataBuilder builder = + new ParquetRowDataBuilder(out, rowType) + .withConf(conf) + .withCompressionCodec( + CompressionCodecName.fromConf(getCompression(compression))) + .withRowGroupSize( + conf.getLong( + ParquetOutputFormat.BLOCK_SIZE, + ParquetWriter.DEFAULT_BLOCK_SIZE)) + .withPageSize( + conf.getInt( + ParquetOutputFormat.PAGE_SIZE, + ParquetWriter.DEFAULT_PAGE_SIZE)) + .withDictionaryPageSize( + conf.getInt( + ParquetOutputFormat.DICTIONARY_PAGE_SIZE, + ParquetProperties.DEFAULT_DICTIONARY_PAGE_SIZE)) + .withMaxPaddingSize( + conf.getInt( + ParquetOutputFormat.MAX_PADDING_BYTES, + ParquetWriter.MAX_PADDING_SIZE_DEFAULT)) + .withDictionaryEncoding( + conf.getBoolean( + ParquetOutputFormat.ENABLE_DICTIONARY, + ParquetProperties.DEFAULT_IS_DICTIONARY_ENABLED)) + .withValidation(conf.getBoolean(ParquetOutputFormat.VALIDATION, false)) + .withWriterVersion( + ParquetProperties.WriterVersion.fromString( + conf.get( + ParquetOutputFormat.WRITER_VERSION, + ParquetProperties.DEFAULT_WRITER_VERSION + .toString()))) + .withBloomFilterEnabled( + conf.getBoolean( + ParquetOutputFormat.BLOOM_FILTER_ENABLED, + ParquetProperties.DEFAULT_BLOOM_FILTER_ENABLED)); + new ColumnConfigParser() + .withColumnConfig( + ParquetOutputFormat.ENABLE_DICTIONARY, + key -> conf.getBoolean(key, false), + builder::withDictionaryEncoding) + .withColumnConfig( + ParquetOutputFormat.BLOOM_FILTER_ENABLED, + key -> conf.getBoolean(key, false), + builder::withBloomFilterEnabled) + .withColumnConfig( + ParquetOutputFormat.BLOOM_FILTER_EXPECTED_NDV, + key -> conf.getLong(key, -1L), + builder::withBloomFilterNDV) + .withColumnConfig( + ParquetOutputFormat.BLOOM_FILTER_FPP, + key -> conf.getDouble(key, ParquetProperties.DEFAULT_BLOOM_FILTER_FPP), + builder::withBloomFilterFPP) + .parseConfig(conf); + return builder.build(); } public String getCompression(String compression) { diff --git a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java index d5338b1e78be5..221d524fff5c9 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java @@ -18,10 +18,27 @@ package org.apache.paimon.format.parquet; +import org.apache.paimon.data.GenericRow; import org.apache.paimon.format.FileFormat; import org.apache.paimon.format.FileFormatFactory; import org.apache.paimon.format.FormatReadWriteTest; +import org.apache.paimon.format.FormatWriter; +import org.apache.paimon.fs.PositionOutputStream; import org.apache.paimon.options.Options; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.apache.parquet.column.values.bloomfilter.BloomFilter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; /** A parquet {@link FormatReadWriteTest}. */ public class ParquetFormatReadWriteTest extends FormatReadWriteTest { @@ -35,4 +52,39 @@ protected FileFormat fileFormat() { return new ParquetFileFormat( new FileFormatFactory.FormatContext(new Options(), 1024, 1024)); } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testEnableBloomFilter(boolean enabled) throws Exception { + Options options = new Options(); + options.set("parquet.bloom.filter.enabled", String.valueOf(enabled)); + ParquetFileFormat format = + new ParquetFileFormat(new FileFormatFactory.FormatContext(options, 1024, 1024)); + + RowType rowType = DataTypes.ROW(DataTypes.INT().notNull(), DataTypes.BIGINT()); + + if (ThreadLocalRandom.current().nextBoolean()) { + rowType = (RowType) rowType.notNull(); + } + + PositionOutputStream out = fileIO.newOutputStream(file, false); + FormatWriter writer = format.createWriterFactory(rowType).create(out, "zstd"); + writer.addElement(GenericRow.of(1, 1L)); + writer.addElement(GenericRow.of(2, 2L)); + writer.addElement(GenericRow.of(3, null)); + writer.close(); + out.close(); + + try (ParquetFileReader reader = ParquetUtil.getParquetReader(fileIO, file)) { + ParquetMetadata parquetMetadata = reader.getFooter(); + List blockMetaDataList = parquetMetadata.getBlocks(); + for (BlockMetaData blockMetaData : blockMetaDataList) { + List columnChunkMetaDataList = blockMetaData.getColumns(); + for (ColumnChunkMetaData columnChunkMetaData : columnChunkMetaDataList) { + BloomFilter filter = reader.readBloomFilter(columnChunkMetaData); + Assertions.assertThat(enabled == (filter != null)).isTrue(); + } + } + } + } }