diff --git a/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java b/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java
index d393a9192523e..d3114cee6d76d 100644
--- a/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java
+++ b/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java
@@ -60,8 +60,8 @@ public abstract class FormatReadWriteTest {
private final String formatType;
- private FileIO fileIO;
- private Path file;
+ protected FileIO fileIO;
+ protected Path file;
protected FormatReadWriteTest(String formatType) {
this.formatType = formatType;
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ColumnConfigParser.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ColumnConfigParser.java
new file mode 100644
index 0000000000000..a4e33807f6ee5
--- /dev/null
+++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ColumnConfigParser.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.parquet;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+
+/**
+ * Parses the specified key-values in the format of root.key#column.path from a {@link
+ * Configuration} object.
+ *
+ *
NOTE: The file was copied from Apache parquet project.
+ */
+public class ColumnConfigParser {
+
+ private static class ConfigHelper {
+ private final String prefix;
+ private final Function function;
+ private final BiConsumer consumer;
+
+ public ConfigHelper(
+ String prefix, Function function, BiConsumer consumer) {
+ this.prefix = prefix;
+ this.function = function;
+ this.consumer = consumer;
+ }
+
+ public void processKey(String key) {
+ if (key.startsWith(prefix)) {
+ String columnPath = key.substring(prefix.length());
+ T value = function.apply(key);
+ consumer.accept(columnPath, value);
+ }
+ }
+ }
+
+ private final List> helpers = new ArrayList<>();
+
+ public ColumnConfigParser withColumnConfig(
+ String rootKey, Function function, BiConsumer consumer) {
+ helpers.add(new ConfigHelper(rootKey + '#', function, consumer));
+ return this;
+ }
+
+ public void parseConfig(Configuration conf) {
+ for (Map.Entry entry : conf) {
+ for (ConfigHelper> helper : helpers) {
+ // We retrieve the value from function instead of parsing from the string here to
+ // use the exact
+ // implementations
+ // in Configuration
+ helper.processKey(entry.getKey());
+ }
+ }
+ }
+}
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/RowDataParquetBuilder.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/RowDataParquetBuilder.java
index 26d38a1c18636..da55f94942fd6 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/RowDataParquetBuilder.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/RowDataParquetBuilder.java
@@ -19,6 +19,7 @@
package org.apache.paimon.format.parquet.writer;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.parquet.ColumnConfigParser;
import org.apache.paimon.options.Options;
import org.apache.paimon.types.RowType;
@@ -46,33 +47,61 @@ public RowDataParquetBuilder(RowType rowType, Options options) {
@Override
public ParquetWriter createWriter(OutputFile out, String compression)
throws IOException {
- return new ParquetRowDataBuilder(out, rowType)
- .withConf(conf)
- .withCompressionCodec(CompressionCodecName.fromConf(getCompression(compression)))
- .withRowGroupSize(
- conf.getLong(
- ParquetOutputFormat.BLOCK_SIZE, ParquetWriter.DEFAULT_BLOCK_SIZE))
- .withPageSize(
- conf.getInt(ParquetOutputFormat.PAGE_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE))
- .withDictionaryPageSize(
- conf.getInt(
- ParquetOutputFormat.DICTIONARY_PAGE_SIZE,
- ParquetProperties.DEFAULT_DICTIONARY_PAGE_SIZE))
- .withMaxPaddingSize(
- conf.getInt(
- ParquetOutputFormat.MAX_PADDING_BYTES,
- ParquetWriter.MAX_PADDING_SIZE_DEFAULT))
- .withDictionaryEncoding(
- conf.getBoolean(
- ParquetOutputFormat.ENABLE_DICTIONARY,
- ParquetProperties.DEFAULT_IS_DICTIONARY_ENABLED))
- .withValidation(conf.getBoolean(ParquetOutputFormat.VALIDATION, false))
- .withWriterVersion(
- ParquetProperties.WriterVersion.fromString(
- conf.get(
- ParquetOutputFormat.WRITER_VERSION,
- ParquetProperties.DEFAULT_WRITER_VERSION.toString())))
- .build();
+ ParquetRowDataBuilder builder =
+ new ParquetRowDataBuilder(out, rowType)
+ .withConf(conf)
+ .withCompressionCodec(
+ CompressionCodecName.fromConf(getCompression(compression)))
+ .withRowGroupSize(
+ conf.getLong(
+ ParquetOutputFormat.BLOCK_SIZE,
+ ParquetWriter.DEFAULT_BLOCK_SIZE))
+ .withPageSize(
+ conf.getInt(
+ ParquetOutputFormat.PAGE_SIZE,
+ ParquetWriter.DEFAULT_PAGE_SIZE))
+ .withDictionaryPageSize(
+ conf.getInt(
+ ParquetOutputFormat.DICTIONARY_PAGE_SIZE,
+ ParquetProperties.DEFAULT_DICTIONARY_PAGE_SIZE))
+ .withMaxPaddingSize(
+ conf.getInt(
+ ParquetOutputFormat.MAX_PADDING_BYTES,
+ ParquetWriter.MAX_PADDING_SIZE_DEFAULT))
+ .withDictionaryEncoding(
+ conf.getBoolean(
+ ParquetOutputFormat.ENABLE_DICTIONARY,
+ ParquetProperties.DEFAULT_IS_DICTIONARY_ENABLED))
+ .withValidation(conf.getBoolean(ParquetOutputFormat.VALIDATION, false))
+ .withWriterVersion(
+ ParquetProperties.WriterVersion.fromString(
+ conf.get(
+ ParquetOutputFormat.WRITER_VERSION,
+ ParquetProperties.DEFAULT_WRITER_VERSION
+ .toString())))
+ .withBloomFilterEnabled(
+ conf.getBoolean(
+ ParquetOutputFormat.BLOOM_FILTER_ENABLED,
+ ParquetProperties.DEFAULT_BLOOM_FILTER_ENABLED));
+ new ColumnConfigParser()
+ .withColumnConfig(
+ ParquetOutputFormat.ENABLE_DICTIONARY,
+ key -> conf.getBoolean(key, false),
+ builder::withDictionaryEncoding)
+ .withColumnConfig(
+ ParquetOutputFormat.BLOOM_FILTER_ENABLED,
+ key -> conf.getBoolean(key, false),
+ builder::withBloomFilterEnabled)
+ .withColumnConfig(
+ ParquetOutputFormat.BLOOM_FILTER_EXPECTED_NDV,
+ key -> conf.getLong(key, -1L),
+ builder::withBloomFilterNDV)
+ .withColumnConfig(
+ ParquetOutputFormat.BLOOM_FILTER_FPP,
+ key -> conf.getDouble(key, ParquetProperties.DEFAULT_BLOOM_FILTER_FPP),
+ builder::withBloomFilterFPP)
+ .parseConfig(conf);
+ return builder.build();
}
public String getCompression(String compression) {
diff --git a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java
index d5338b1e78be5..221d524fff5c9 100644
--- a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java
+++ b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java
@@ -18,10 +18,27 @@
package org.apache.paimon.format.parquet;
+import org.apache.paimon.data.GenericRow;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.format.FileFormatFactory;
import org.apache.paimon.format.FormatReadWriteTest;
+import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
/** A parquet {@link FormatReadWriteTest}. */
public class ParquetFormatReadWriteTest extends FormatReadWriteTest {
@@ -35,4 +52,39 @@ protected FileFormat fileFormat() {
return new ParquetFileFormat(
new FileFormatFactory.FormatContext(new Options(), 1024, 1024));
}
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testEnableBloomFilter(boolean enabled) throws Exception {
+ Options options = new Options();
+ options.set("parquet.bloom.filter.enabled", String.valueOf(enabled));
+ ParquetFileFormat format =
+ new ParquetFileFormat(new FileFormatFactory.FormatContext(options, 1024, 1024));
+
+ RowType rowType = DataTypes.ROW(DataTypes.INT().notNull(), DataTypes.BIGINT());
+
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ rowType = (RowType) rowType.notNull();
+ }
+
+ PositionOutputStream out = fileIO.newOutputStream(file, false);
+ FormatWriter writer = format.createWriterFactory(rowType).create(out, "zstd");
+ writer.addElement(GenericRow.of(1, 1L));
+ writer.addElement(GenericRow.of(2, 2L));
+ writer.addElement(GenericRow.of(3, null));
+ writer.close();
+ out.close();
+
+ try (ParquetFileReader reader = ParquetUtil.getParquetReader(fileIO, file)) {
+ ParquetMetadata parquetMetadata = reader.getFooter();
+ List blockMetaDataList = parquetMetadata.getBlocks();
+ for (BlockMetaData blockMetaData : blockMetaDataList) {
+ List columnChunkMetaDataList = blockMetaData.getColumns();
+ for (ColumnChunkMetaData columnChunkMetaData : columnChunkMetaDataList) {
+ BloomFilter filter = reader.readBloomFilter(columnChunkMetaData);
+ Assertions.assertThat(enabled == (filter != null)).isTrue();
+ }
+ }
+ }
+ }
}