Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[format] Format prefix in option keys are processed in each format #4245

Merged
merged 3 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1458,7 +1458,7 @@ public Integer getLocalSampleMagnification() {

public static FileFormat createFileFormat(Options options, ConfigOption<String> formatOption) {
String formatIdentifier = normalizeFileFormat(options.get(formatOption));
return FileFormat.getFileFormat(options, formatIdentifier);
return FileFormat.fromIdentifier(formatIdentifier, options);
}

public Map<Integer, String> fileCompressionPerLevel() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.paimon.format;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.format.FileFormatFactory.FormatContext;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
Expand All @@ -29,7 +28,9 @@
import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.ServiceLoader;

Expand Down Expand Up @@ -74,9 +75,15 @@ public Optional<SimpleStatsExtractor> createStatsExtractor(
return Optional.empty();
}

@VisibleForTesting
public static FileFormat fromIdentifier(String identifier, Options options) {
return fromIdentifier(identifier, new FormatContext(options, 1024, 1024));
return fromIdentifier(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This called method fromIdentifier should be private.

identifier,
new FormatContext(
options,
options.get(CoreOptions.READ_BATCH_SIZE),
options.get(CoreOptions.WRITE_BATCH_SIZE),
options.get(CoreOptions.FILE_COMPRESSION_ZSTD_LEVEL),
options.get(CoreOptions.FILE_BLOCK_SIZE)));
}

/** Create a {@link FileFormat} from format identifier and format options. */
Expand All @@ -103,14 +110,14 @@ private static Optional<FileFormat> fromIdentifier(
return Optional.empty();
}

public static FileFormat getFileFormat(Options options, String formatIdentifier) {
FormatContext context =
new FormatContext(
options.removePrefix(formatIdentifier + "."),
options.get(CoreOptions.READ_BATCH_SIZE),
options.get(CoreOptions.WRITE_BATCH_SIZE),
options.get(CoreOptions.FILE_COMPRESSION_ZSTD_LEVEL),
options.get(CoreOptions.FILE_BLOCK_SIZE));
return FileFormat.fromIdentifier(formatIdentifier, context);
protected Options getIdentifierPrefixOptions(Options options) {
Map<String, String> result = new HashMap<>();
String prefix = formatIdentifier.toLowerCase() + ".";
for (String key : options.keySet()) {
if (key.toLowerCase().startsWith(prefix)) {
result.put(prefix + key.substring(prefix.length()), options.get(key));
}
}
return new Options(result);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,32 +34,32 @@ public interface FileFormatFactory {
/** the format context. */
class FormatContext {

private final Options formatOptions;
private final Options options;
private final int readBatchSize;
private final int writeBatchSize;
private final int zstdLevel;
@Nullable private final MemorySize blockSize;

@VisibleForTesting
public FormatContext(Options formatOptions, int readBatchSize, int writeBatchSize) {
this(formatOptions, readBatchSize, writeBatchSize, 1, null);
public FormatContext(Options options, int readBatchSize, int writeBatchSize) {
this(options, readBatchSize, writeBatchSize, 1, null);
}

public FormatContext(
Options formatOptions,
Options options,
int readBatchSize,
int writeBatchSize,
int zstdLevel,
@Nullable MemorySize blockSize) {
this.formatOptions = formatOptions;
this.options = options;
this.readBatchSize = readBatchSize;
this.writeBatchSize = writeBatchSize;
this.zstdLevel = zstdLevel;
this.blockSize = blockSize;
}

public Options formatOptions() {
return formatOptions;
public Options options() {
return options;
}

public int readBatchSize() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public FileFormat discover(String identifier) {
}

private FileFormat create(String identifier) {
return FileFormat.getFileFormat(options.toConfiguration(), identifier);
return FileFormat.fromIdentifier(identifier, options.toConfiguration());
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public static IcebergManifestFile create(FileStoreTable table, IcebergPathFactor
"org.apache.paimon.avro.generated.record:manifest_entry,"
+ "manifest_entry_data_file:r2,"
+ "r2_partition:r102");
FileFormat manifestFileAvro = FileFormat.getFileFormat(manifestFileAvroOptions, "avro");
FileFormat manifestFileAvro = FileFormat.fromIdentifier("avro", manifestFileAvroOptions);
return new IcebergManifestFile(
table.fileIO(),
partitionType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public static IcebergManifestList create(FileStoreTable table, IcebergPathFactor
"avro.row-name-mapping",
"org.apache.paimon.avro.generated.record:manifest_file,"
+ "manifest_file_partitions:r508");
FileFormat manifestListAvro = FileFormat.getFileFormat(manifestListAvroOptions, "avro");
FileFormat manifestListAvro = FileFormat.fromIdentifier("avro", manifestListAvroOptions);
return new IcebergManifestList(
table.fileIO(),
manifestListAvro.createReaderFactory(IcebergManifestFileMeta.schema()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,8 @@ private WriteFormatContext(
format,
parentFactories.get(format).createDataFilePathFactory(partition, bucket));

FileFormat fileFormat = FileFormat.getFileFormat(options.toConfiguration(), format);
FileFormat fileFormat =
FileFormat.fromIdentifier(format, options.toConfiguration());
// In avro format, minValue, maxValue, and nullCount are not counted, set
// StatsExtractor is Optional.empty() and will use SimpleStatsExtractor to collect
// stats
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ private static DataFileMeta constructFileMeta(
table.rowType().getFieldNames());

SimpleStatsExtractor simpleStatsExtractor =
FileFormat.getFileFormat(
((FileStoreTable) table).coreOptions().toConfiguration(),
format)
FileFormat.fromIdentifier(
format,
((FileStoreTable) table).coreOptions().toConfiguration())
.createStatsExtractor(table.rowType(), factories)
.orElseThrow(
() ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,9 @@ public class FileFormatTest {
public void testWriteRead(@TempDir java.nio.file.Path tempDir) throws IOException {
FileFormat avro = createFileFormat("snappy");
RowType rowType = RowType.of(new IntType(), new IntType());

Path path = new Path(tempDir.toUri().toString(), "1.avro");
// write

// write
List<InternalRow> expected = new ArrayList<>();
expected.add(GenericRow.of(1, 11));
expected.add(GenericRow.of(2, 22));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,20 +53,23 @@ public class AvroFileFormat extends FileFormat {

public static final String IDENTIFIER = "avro";

public static final ConfigOption<String> AVRO_OUTPUT_CODEC =
ConfigOptions.key("codec")
private static final ConfigOption<String> AVRO_OUTPUT_CODEC =
ConfigOptions.key("avro.codec")
.stringType()
.defaultValue(SNAPPY_CODEC)
.withDescription("The compression codec for avro");

public static final ConfigOption<Map<String, String>> AVRO_ROW_NAME_MAPPING =
ConfigOptions.key("row-name-mapping").mapType().defaultValue(new HashMap<>());
private static final ConfigOption<Map<String, String>> AVRO_ROW_NAME_MAPPING =
ConfigOptions.key("avro.row-name-mapping").mapType().defaultValue(new HashMap<>());

private final FormatContext context;
private final Options options;
private final int zstdLevel;

public AvroFileFormat(FormatContext context) {
super(IDENTIFIER);
this.context = context;

this.options = getIdentifierPrefixOptions(context.options());
this.zstdLevel = context.zstdLevel();
}

@Override
Expand Down Expand Up @@ -95,13 +98,12 @@ public void validateDataFields(RowType rowType) {
}

private CodecFactory createCodecFactory(String compression) {
Options options = context.formatOptions();
if (options.contains(AVRO_OUTPUT_CODEC)) {
return CodecFactory.fromString(options.get(AVRO_OUTPUT_CODEC));
}

if (compression.equalsIgnoreCase("zstd")) {
return CodecFactory.zstandardCodec(context.zstdLevel());
return CodecFactory.zstandardCodec(zstdLevel);
}
return CodecFactory.fromString(options.get(AVRO_OUTPUT_CODEC));
}
Expand All @@ -117,8 +119,7 @@ private RowAvroWriterFactory(RowType rowType) {
(out, compression) -> {
Schema schema =
AvroSchemaConverter.convertToSchema(
rowType,
context.formatOptions().get(AVRO_ROW_NAME_MAPPING));
rowType, options.get(AVRO_ROW_NAME_MAPPING));
AvroRowDatumWriter datumWriter = new AvroRowDatumWriter(rowType);
DataFileWriter<InternalRow> dataFileWriter =
new DataFileWriter<>(datumWriter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public class OrcFileFormat extends FileFormat {

public OrcFileFormat(FormatContext formatContext) {
super(IDENTIFIER);
this.orcProperties = getOrcProperties(formatContext.formatOptions(), formatContext);
this.orcProperties = getOrcProperties(formatContext.options(), formatContext);
this.readerConf = new org.apache.hadoop.conf.Configuration();
this.orcProperties.forEach((k, v) -> readerConf.set(k.toString(), v.toString()));
this.writerConf = new org.apache.hadoop.conf.Configuration();
Expand Down Expand Up @@ -145,12 +145,9 @@ public FormatWriterFactory createWriterFactory(RowType type) {
return new OrcWriterFactory(vectorizer, orcProperties, writerConf, writeBatchSize);
}

private static Properties getOrcProperties(Options options, FormatContext formatContext) {
private Properties getOrcProperties(Options options, FormatContext formatContext) {
Properties orcProperties = new Properties();

Properties properties = new Properties();
options.addAllToProperties(properties);
properties.forEach((k, v) -> orcProperties.put(IDENTIFIER + "." + k, v));
orcProperties.putAll(getIdentifierPrefixOptions(options).toMap());

if (!orcProperties.containsKey(OrcConf.COMPRESSION_ZSTD_LEVEL.getAttribute())) {
orcProperties.setProperty(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,32 +42,31 @@
/** Parquet {@link FileFormat}. */
public class ParquetFileFormat extends FileFormat {

private final FormatContext formatContext;
private final Options options;
private final int readBatchSize;

public ParquetFileFormat(FormatContext formatContext) {
super(IDENTIFIER);
this.formatContext = formatContext;

this.options = getParquetConfiguration(formatContext);
this.readBatchSize = formatContext.readBatchSize();
}

@VisibleForTesting
Options formatOptions() {
return formatContext.formatOptions();
Options getOptions() {
return options;
}

@Override
public FormatReaderFactory createReaderFactory(
RowType projectedRowType, List<Predicate> filters) {
return new ParquetReaderFactory(
getParquetConfiguration(formatContext),
projectedRowType,
formatContext.readBatchSize(),
ParquetFilters.convert(filters));
options, projectedRowType, readBatchSize, ParquetFilters.convert(filters));
}

@Override
public FormatWriterFactory createWriterFactory(RowType type) {
return new ParquetWriterFactory(
new RowDataParquetBuilder(type, getParquetConfiguration(formatContext)));
return new ParquetWriterFactory(new RowDataParquetBuilder(type, options));
}

@Override
Expand All @@ -81,11 +80,8 @@ public Optional<SimpleStatsExtractor> createStatsExtractor(
return Optional.of(new ParquetSimpleStatsExtractor(type, statsCollectors));
}

public static Options getParquetConfiguration(FormatContext context) {
Options parquetOptions = new Options();
context.formatOptions()
.toMap()
.forEach((key, value) -> parquetOptions.setString(IDENTIFIER + "." + key, value));
private Options getParquetConfiguration(FormatContext context) {
Options parquetOptions = getIdentifierPrefixOptions(context.options());

if (!parquetOptions.containsKey("parquet.compression.codec.zstd.level")) {
parquetOptions.set(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class OrcFileFormatTest {
@Test
public void testAbsent() {
Options options = new Options();
options.setString("haha", "1");
options.setString("orc.haha", "1");
OrcFileFormat orc =
new OrcFileFormatFactory().create(new FormatContext(options, 1024, 1024));
assertThat(orc.orcProperties().getProperty(IDENTIFIER + ".haha", "")).isEqualTo("1");
Expand All @@ -47,8 +47,8 @@ public void testAbsent() {
@Test
public void testPresent() {
Options options = new Options();
options.setString("haha", "1");
options.setString("compress", "zlib");
options.setString("orc.haha", "1");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change haha to other normal orc option?

options.setString("orc.compress", "zlib");
OrcFileFormat orc =
new OrcFileFormatFactory().create(new FormatContext(options, 1024, 1024));
assertThat(orc.orcProperties().getProperty(IDENTIFIER + ".haha", "")).isEqualTo("1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class OrcBulkWriterTest {
void testRowBatch(@TempDir java.nio.file.Path tempDir) throws IOException {
Options options = new Options();
options.set(CoreOptions.WRITE_BATCH_SIZE, 1);
FileFormat orc = FileFormat.getFileFormat(options, "orc");
FileFormat orc = FileFormat.fromIdentifier("orc", options);
Assertions.assertThat(orc).isInstanceOf(OrcFileFormat.class);

RowType rowType =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ class OrcZstdTest {
@Test
void testWriteOrcWithZstd(@TempDir java.nio.file.Path tempDir) throws IOException {
Options options = new Options();
options.set("compress", "zstd");
options.set("stripe.size", "31457280");
options.set("compression.zstd.level", "1");
options.set("orc.compress", "zstd");
options.set("orc.stripe.size", "31457280");
options.set("orc.compression.zstd.level", "1");
OrcFileFormat orc =
new OrcFileFormatFactory()
.create(new FileFormatFactory.FormatContext(options, 1024, 1024));
Expand Down Expand Up @@ -92,9 +92,9 @@ void testWriteOrcWithZstd(@TempDir java.nio.file.Path tempDir) throws IOExceptio
Assertions.assertThat(formatWriter).isInstanceOf(OrcBulkWriter.class);

Options optionsWithLowLevel = new Options();
optionsWithLowLevel.set("compress", "zstd");
optionsWithLowLevel.set("stripe.size", "31457280");
optionsWithLowLevel.set("compression.zstd.level", "1");
optionsWithLowLevel.set("orc.compress", "zstd");
optionsWithLowLevel.set("orc.stripe.size", "31457280");
optionsWithLowLevel.set("orc.compression.zstd.level", "1");

Random random = new Random();
for (int i = 0; i < 1000; i++) {
Expand Down
Loading
Loading