Skip to content

Commit

Permalink
[parquet] Pass conf to parquet writer (#3435)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored May 30, 2024
1 parent a9a09f5 commit 4cbb443
Showing 1 changed file with 11 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.options.Options;
import org.apache.paimon.types.RowType;

import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.hadoop.ParquetOutputFormat;
import org.apache.parquet.hadoop.ParquetWriter;
Expand All @@ -36,31 +37,31 @@
public class RowDataParquetBuilder implements ParquetBuilder<InternalRow> {

private final RowType rowType;
private final Options conf;
private final Configuration conf;

public RowDataParquetBuilder(RowType rowType, Options conf) {
public RowDataParquetBuilder(RowType rowType, Options options) {
this.rowType = rowType;
this.conf = conf;
this.conf = new Configuration(false);
options.toMap().forEach(conf::set);
}

@Override
public ParquetWriter<InternalRow> createWriter(OutputFile out, String compression)
throws IOException {

return new ParquetRowDataBuilder(out, rowType)
.withConf(conf)
.withCompressionCodec(CompressionCodecName.fromConf(getCompression(compression)))
.withRowGroupSize(
conf.getLong(
ParquetOutputFormat.BLOCK_SIZE, ParquetWriter.DEFAULT_BLOCK_SIZE))
.withPageSize(
conf.getInteger(
ParquetOutputFormat.PAGE_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE))
conf.getInt(ParquetOutputFormat.PAGE_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE))
.withDictionaryPageSize(
conf.getInteger(
conf.getInt(
ParquetOutputFormat.DICTIONARY_PAGE_SIZE,
ParquetProperties.DEFAULT_DICTIONARY_PAGE_SIZE))
.withMaxPaddingSize(
conf.getInteger(
conf.getInt(
ParquetOutputFormat.MAX_PADDING_BYTES,
ParquetWriter.MAX_PADDING_SIZE_DEFAULT))
.withDictionaryEncoding(
Expand All @@ -70,7 +71,7 @@ public ParquetWriter<InternalRow> createWriter(OutputFile out, String compressio
.withValidation(conf.getBoolean(ParquetOutputFormat.VALIDATION, false))
.withWriterVersion(
ParquetProperties.WriterVersion.fromString(
conf.getString(
conf.get(
ParquetOutputFormat.WRITER_VERSION,
ParquetProperties.DEFAULT_WRITER_VERSION.toString())))
.build();
Expand All @@ -82,8 +83,7 @@ public String getCompression(@Nullable String compression) {
compressName = compression;
} else {
compressName =
conf.getString(
ParquetOutputFormat.COMPRESSION, CompressionCodecName.SNAPPY.name());
conf.get(ParquetOutputFormat.COMPRESSION, CompressionCodecName.SNAPPY.name());
}
return compressName;
}
Expand Down

0 comments on commit 4cbb443

Please sign in to comment.