Skip to content

Commit

Permalink
[parquet] Support to enable parquet bloomfilter (#4479)
Browse files Browse the repository at this point in the history
  • Loading branch information
Aitozi authored Nov 8, 2024
1 parent 03abd94 commit fa308f6
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ public abstract class FormatReadWriteTest {

private final String formatType;

private FileIO fileIO;
private Path file;
protected FileIO fileIO;
protected Path file;

protected FormatReadWriteTest(String formatType) {
this.formatType = formatType;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.format.parquet;

import org.apache.hadoop.conf.Configuration;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Function;

/**
* Parses the specified key-values in the format of root.key#column.path from a {@link
* Configuration} object.
*
* <p>NOTE: The file was copied from Apache parquet project.
*/
public class ColumnConfigParser {

private static class ConfigHelper<T> {
private final String prefix;
private final Function<String, T> function;
private final BiConsumer<String, T> consumer;

public ConfigHelper(
String prefix, Function<String, T> function, BiConsumer<String, T> consumer) {
this.prefix = prefix;
this.function = function;
this.consumer = consumer;
}

public void processKey(String key) {
if (key.startsWith(prefix)) {
String columnPath = key.substring(prefix.length());
T value = function.apply(key);
consumer.accept(columnPath, value);
}
}
}

private final List<ConfigHelper<?>> helpers = new ArrayList<>();

public <T> ColumnConfigParser withColumnConfig(
String rootKey, Function<String, T> function, BiConsumer<String, T> consumer) {
helpers.add(new ConfigHelper<T>(rootKey + '#', function, consumer));
return this;
}

public void parseConfig(Configuration conf) {
for (Map.Entry<String, String> entry : conf) {
for (ConfigHelper<?> helper : helpers) {
// We retrieve the value from function instead of parsing from the string here to
// use the exact
// implementations
// in Configuration
helper.processKey(entry.getKey());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.format.parquet.writer;

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.parquet.ColumnConfigParser;
import org.apache.paimon.options.Options;
import org.apache.paimon.types.RowType;

Expand Down Expand Up @@ -46,33 +47,61 @@ public RowDataParquetBuilder(RowType rowType, Options options) {
@Override
public ParquetWriter<InternalRow> createWriter(OutputFile out, String compression)
throws IOException {
return new ParquetRowDataBuilder(out, rowType)
.withConf(conf)
.withCompressionCodec(CompressionCodecName.fromConf(getCompression(compression)))
.withRowGroupSize(
conf.getLong(
ParquetOutputFormat.BLOCK_SIZE, ParquetWriter.DEFAULT_BLOCK_SIZE))
.withPageSize(
conf.getInt(ParquetOutputFormat.PAGE_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE))
.withDictionaryPageSize(
conf.getInt(
ParquetOutputFormat.DICTIONARY_PAGE_SIZE,
ParquetProperties.DEFAULT_DICTIONARY_PAGE_SIZE))
.withMaxPaddingSize(
conf.getInt(
ParquetOutputFormat.MAX_PADDING_BYTES,
ParquetWriter.MAX_PADDING_SIZE_DEFAULT))
.withDictionaryEncoding(
conf.getBoolean(
ParquetOutputFormat.ENABLE_DICTIONARY,
ParquetProperties.DEFAULT_IS_DICTIONARY_ENABLED))
.withValidation(conf.getBoolean(ParquetOutputFormat.VALIDATION, false))
.withWriterVersion(
ParquetProperties.WriterVersion.fromString(
conf.get(
ParquetOutputFormat.WRITER_VERSION,
ParquetProperties.DEFAULT_WRITER_VERSION.toString())))
.build();
ParquetRowDataBuilder builder =
new ParquetRowDataBuilder(out, rowType)
.withConf(conf)
.withCompressionCodec(
CompressionCodecName.fromConf(getCompression(compression)))
.withRowGroupSize(
conf.getLong(
ParquetOutputFormat.BLOCK_SIZE,
ParquetWriter.DEFAULT_BLOCK_SIZE))
.withPageSize(
conf.getInt(
ParquetOutputFormat.PAGE_SIZE,
ParquetWriter.DEFAULT_PAGE_SIZE))
.withDictionaryPageSize(
conf.getInt(
ParquetOutputFormat.DICTIONARY_PAGE_SIZE,
ParquetProperties.DEFAULT_DICTIONARY_PAGE_SIZE))
.withMaxPaddingSize(
conf.getInt(
ParquetOutputFormat.MAX_PADDING_BYTES,
ParquetWriter.MAX_PADDING_SIZE_DEFAULT))
.withDictionaryEncoding(
conf.getBoolean(
ParquetOutputFormat.ENABLE_DICTIONARY,
ParquetProperties.DEFAULT_IS_DICTIONARY_ENABLED))
.withValidation(conf.getBoolean(ParquetOutputFormat.VALIDATION, false))
.withWriterVersion(
ParquetProperties.WriterVersion.fromString(
conf.get(
ParquetOutputFormat.WRITER_VERSION,
ParquetProperties.DEFAULT_WRITER_VERSION
.toString())))
.withBloomFilterEnabled(
conf.getBoolean(
ParquetOutputFormat.BLOOM_FILTER_ENABLED,
ParquetProperties.DEFAULT_BLOOM_FILTER_ENABLED));
new ColumnConfigParser()
.withColumnConfig(
ParquetOutputFormat.ENABLE_DICTIONARY,
key -> conf.getBoolean(key, false),
builder::withDictionaryEncoding)
.withColumnConfig(
ParquetOutputFormat.BLOOM_FILTER_ENABLED,
key -> conf.getBoolean(key, false),
builder::withBloomFilterEnabled)
.withColumnConfig(
ParquetOutputFormat.BLOOM_FILTER_EXPECTED_NDV,
key -> conf.getLong(key, -1L),
builder::withBloomFilterNDV)
.withColumnConfig(
ParquetOutputFormat.BLOOM_FILTER_FPP,
key -> conf.getDouble(key, ParquetProperties.DEFAULT_BLOOM_FILTER_FPP),
builder::withBloomFilterFPP)
.parseConfig(conf);
return builder.build();
}

public String getCompression(String compression) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,27 @@

package org.apache.paimon.format.parquet;

import org.apache.paimon.data.GenericRow;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.format.FileFormatFactory;
import org.apache.paimon.format.FormatReadWriteTest;
import org.apache.paimon.format.FormatWriter;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.options.Options;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;

import org.apache.parquet.column.values.bloomfilter.BloomFilter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

/** A parquet {@link FormatReadWriteTest}. */
public class ParquetFormatReadWriteTest extends FormatReadWriteTest {
Expand All @@ -35,4 +52,39 @@ protected FileFormat fileFormat() {
return new ParquetFileFormat(
new FileFormatFactory.FormatContext(new Options(), 1024, 1024));
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testEnableBloomFilter(boolean enabled) throws Exception {
Options options = new Options();
options.set("parquet.bloom.filter.enabled", String.valueOf(enabled));
ParquetFileFormat format =
new ParquetFileFormat(new FileFormatFactory.FormatContext(options, 1024, 1024));

RowType rowType = DataTypes.ROW(DataTypes.INT().notNull(), DataTypes.BIGINT());

if (ThreadLocalRandom.current().nextBoolean()) {
rowType = (RowType) rowType.notNull();
}

PositionOutputStream out = fileIO.newOutputStream(file, false);
FormatWriter writer = format.createWriterFactory(rowType).create(out, "zstd");
writer.addElement(GenericRow.of(1, 1L));
writer.addElement(GenericRow.of(2, 2L));
writer.addElement(GenericRow.of(3, null));
writer.close();
out.close();

try (ParquetFileReader reader = ParquetUtil.getParquetReader(fileIO, file)) {
ParquetMetadata parquetMetadata = reader.getFooter();
List<BlockMetaData> blockMetaDataList = parquetMetadata.getBlocks();
for (BlockMetaData blockMetaData : blockMetaDataList) {
List<ColumnChunkMetaData> columnChunkMetaDataList = blockMetaData.getColumns();
for (ColumnChunkMetaData columnChunkMetaData : columnChunkMetaDataList) {
BloomFilter filter = reader.readBloomFilter(columnChunkMetaData);
Assertions.assertThat(enabled == (filter != null)).isTrue();
}
}
}
}
}

0 comments on commit fa308f6

Please sign in to comment.