Skip to content

Commit

Permalink
[format] Default compression of file to zstd with level 1
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Jun 3, 2024
1 parent b648ed8 commit 26dcf5b
Show file tree
Hide file tree
Showing 32 changed files with 230 additions and 171 deletions.
34 changes: 26 additions & 8 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -188,18 +188,18 @@
<td>Boolean</td>
<td>Force produce changelog in delete sql, or you can use 'streaming-read-overwrite' to read changelog from overwrite commit.</td>
</tr>
<tr>
<td><h5>deletion-vectors.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to enable deletion vectors mode. In this mode, index files containing deletion vectors are generated when data is written, which marks the data for deletion. During read operations, by applying these index files, merging can be avoided.</td>
</tr>
<tr>
<td><h5>deletion-vector.index-file.target-size</h5></td>
<td style="word-wrap: break-word;">2 mb</td>
<td>MemorySize</td>
<td>The target size of deletion vector index file.</td>
</tr>
<tr>
<td><h5>deletion-vectors.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to enable deletion vectors mode. In this mode, index files containing deletion vectors are generated when data is written, which marks the data for deletion. During read operations, by applying these index files, merging can be avoided.</td>
</tr>
<tr>
<td><h5>dynamic-bucket.assigner-parallelism</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down Expand Up @@ -249,17 +249,29 @@
<td>The threshold for read file async.</td>
</tr>
<tr>
<td><h5>file.compression</h5></td>
<td><h5>file.block-size</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>MemorySize</td>
<td>File block size of format, default value of orc stripe is 64 MB, and parquet row group is 128 MB.</td>
</tr>
<tr>
<td><h5>file.compression</h5></td>
<td style="word-wrap: break-word;">"zstd"</td>
<td>String</td>
<td>Default file compression format, orc is lz4 and parquet is snappy. It can be overridden by file.compression.per.level</td>
<td>Default file compression. It can be overridden by format own configuration.</td>
</tr>
<tr>
<td><h5>file.compression.per.level</h5></td>
<td style="word-wrap: break-word;"></td>
<td>Map</td>
<td>Define different compression policies for different level, you can add the conf like this: 'file.compression.per.level' = '0:lz4,1:zstd'.</td>
</tr>
<tr>
<td><h5>file.compression.zstd-level</h5></td>
<td style="word-wrap: break-word;">1</td>
<td>Integer</td>
<td>Default file compression zstd level. It can be overridden by format own configuration.</td>
</tr>
<tr>
<td><h5>file.format</h5></td>
<td style="word-wrap: break-word;">"orc"</td>
Expand Down Expand Up @@ -357,6 +369,12 @@
<td>Float</td>
<td>The index load factor for lookup.</td>
</tr>
<tr>
<td><h5>manifest.compression</h5></td>
<td style="word-wrap: break-word;">"zstd"</td>
<td>String</td>
<td>Default file compression for manifest.</td>
</tr>
<tr>
<td><h5>manifest.format</h5></td>
<td style="word-wrap: break-word;">"avro"</td>
Expand Down
12 changes: 0 additions & 12 deletions docs/layouts/shortcodes/generated/orc_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,6 @@
<td>Integer</td>
<td>Comma-separated list of fields for which dictionary encoding is to be skipped in orc.</td>
</tr>
<tr>
<td><h5>orc.compress</h5></td>
<td style="word-wrap: break-word;">"lz4"</td>
<td>String</td>
<td>Define the compression codec for ORC file, if a higher compression ratio is required, it is recommended to configure it as 'zstd', and you can configure: orc.compression.zstd.level</td>
</tr>
<tr>
<td><h5>orc.compression.zstd.level</h5></td>
<td style="word-wrap: break-word;">3</td>
<td>Integer</td>
<td>Define the compression level to use with ZStandard codec while writing data. The valid range is 1~22.</td>
</tr>
<tr>
<td><h5>orc.dictionary.key.threshold</h5></td>
<td style="word-wrap: break-word;">0.8</td>
Expand Down
28 changes: 26 additions & 2 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,23 @@ public class CoreOptions implements Serializable {
public static final ConfigOption<String> FILE_COMPRESSION =
key("file.compression")
.stringType()
.defaultValue("zstd")
.withDescription(
"Default file compression. It can be overridden by format own configuration.");

public static final ConfigOption<Integer> FILE_COMPRESSION_ZSTD_LEVEL =
key("file.compression.zstd-level")
.intType()
.defaultValue(1)
.withDescription(
"Default file compression zstd level. It can be overridden by format own configuration.");

public static final ConfigOption<MemorySize> FILE_BLOCK_SIZE =
key("file.block-size")
.memoryType()
.noDefaultValue()
.withDescription(
"Default file compression format, orc is lz4 and parquet is snappy. It can be overridden by "
+ FILE_COMPRESSION_PER_LEVEL.key());
"File block size of format, default value of orc stripe is 64 MB, and parquet row group is 128 MB.");

public static final ConfigOption<MemorySize> FILE_INDEX_IN_MANIFEST_THRESHOLD =
key("file-index.in-manifest-threshold")
Expand All @@ -173,6 +186,12 @@ public class CoreOptions implements Serializable {
.defaultValue(CoreOptions.FILE_FORMAT_AVRO)
.withDescription("Specify the message format of manifest files.");

public static final ConfigOption<String> MANIFEST_COMPRESSION =
key("manifest.compression")
.stringType()
.defaultValue("zstd")
.withDescription("Default file compression for manifest.");

public static final ConfigOption<MemorySize> MANIFEST_TARGET_FILE_SIZE =
key("manifest.target-file-size")
.memoryType()
Expand Down Expand Up @@ -1227,6 +1246,10 @@ public FileFormat manifestFormat() {
return createFileFormat(options, MANIFEST_FORMAT);
}

public String manifestCompression() {
return options.get(MANIFEST_COMPRESSION);
}

public MemorySize manifestTargetSize() {
return options.get(MANIFEST_TARGET_FILE_SIZE);
}
Expand Down Expand Up @@ -1325,6 +1348,7 @@ public boolean fieldCollectAggDistinct(String fieldName) {
.defaultValue(false));
}

@Nullable
public String fileCompression() {
return options.get(FILE_COMPRESSION);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,12 @@ private static Optional<FileFormat> fromIdentifier(
}

public static FileFormat getFileFormat(Options options, String formatIdentifier) {
int readBatchSize = options.get(CoreOptions.READ_BATCH_SIZE);
return FileFormat.fromIdentifier(
formatIdentifier,
new FormatContext(options.removePrefix(formatIdentifier + "."), readBatchSize));
FormatContext context =
new FormatContext(
options.removePrefix(formatIdentifier + "."),
options.get(CoreOptions.READ_BATCH_SIZE),
options.get(CoreOptions.FILE_COMPRESSION_ZSTD_LEVEL),
options.get(CoreOptions.FILE_BLOCK_SIZE));
return FileFormat.fromIdentifier(formatIdentifier, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@

package org.apache.paimon.format;

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;

import javax.annotation.Nullable;

/** Factory to create {@link FileFormat}. */
public interface FileFormatFactory {

Expand All @@ -29,12 +33,26 @@ public interface FileFormatFactory {

/** the format context. */
class FormatContext {

private final Options formatOptions;
private final int readBatchSize;
private final int zstdLevel;
@Nullable private final MemorySize blockSize;

@VisibleForTesting
public FormatContext(Options formatOptions, int readBatchSize) {
this(formatOptions, readBatchSize, 1, null);
}

public FormatContext(
Options formatOptions,
int readBatchSize,
int zstdLevel,
@Nullable MemorySize blockSize) {
this.formatOptions = formatOptions;
this.readBatchSize = readBatchSize;
this.zstdLevel = zstdLevel;
this.blockSize = blockSize;
}

public Options formatOptions() {
Expand All @@ -44,5 +62,14 @@ public Options formatOptions() {
public int readBatchSize() {
return readBatchSize;
}

public int zstdLevel() {
return zstdLevel;
}

@Nullable
public MemorySize blockSize() {
return blockSize;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

import org.apache.paimon.fs.PositionOutputStream;

import javax.annotation.Nullable;

import java.io.IOException;

/** A factory to create {@link FormatWriter} for file. */
Expand All @@ -35,5 +33,5 @@ public interface FormatWriterFactory {
* @throws IOException Thrown if the writer cannot be opened, or if the output stream throws an
* exception.
*/
FormatWriter create(PositionOutputStream out, @Nullable String compression) throws IOException;
FormatWriter create(PositionOutputStream out, String compression) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ protected ManifestFile.Factory manifestFileFactory(boolean forWrite) {
schemaManager,
partitionType,
options.manifestFormat(),
options.manifestCompression(),
pathFactory(),
options.manifestTargetSize().getBytes(),
forWrite ? writeManifestCache : null);
Expand All @@ -132,12 +133,14 @@ protected ManifestList.Factory manifestListFactory(boolean forWrite) {
return new ManifestList.Factory(
fileIO,
options.manifestFormat(),
options.manifestCompression(),
pathFactory(),
forWrite ? writeManifestCache : null);
}

protected IndexManifestFile.Factory indexManifestFileFactory() {
return new IndexManifestFile.Factory(fileIO, options.manifestFormat(), pathFactory());
return new IndexManifestFile.Factory(
fileIO, options.manifestFormat(), options.manifestCompression(), pathFactory());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@ private IndexManifestFile(
FileIO fileIO,
FormatReaderFactory readerFactory,
FormatWriterFactory writerFactory,
String compression,
PathFactory pathFactory) {
super(
fileIO,
new IndexManifestEntrySerializer(),
readerFactory,
writerFactory,
compression,
pathFactory,
null);
}
Expand All @@ -68,11 +70,17 @@ public static class Factory {

private final FileIO fileIO;
private final FileFormat fileFormat;
private final String compression;
private final FileStorePathFactory pathFactory;

public Factory(FileIO fileIO, FileFormat fileFormat, FileStorePathFactory pathFactory) {
public Factory(
FileIO fileIO,
FileFormat fileFormat,
String compression,
FileStorePathFactory pathFactory) {
this.fileIO = fileIO;
this.fileFormat = fileFormat;
this.compression = compression;
this.pathFactory = pathFactory;
}

Expand All @@ -82,6 +90,7 @@ public IndexManifestFile create() {
fileIO,
fileFormat.createReaderFactory(schema),
fileFormat.createWriterFactory(schema),
compression,
pathFactory.indexManifestFileFactory());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.paimon.manifest;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.format.FormatReaderFactory;
Expand Down Expand Up @@ -60,10 +59,11 @@ private ManifestFile(
ManifestEntrySerializer serializer,
FormatReaderFactory readerFactory,
FormatWriterFactory writerFactory,
String compression,
PathFactory pathFactory,
long suggestedFileSize,
@Nullable SegmentsCache<String> cache) {
super(fileIO, serializer, readerFactory, writerFactory, pathFactory, cache);
super(fileIO, serializer, readerFactory, writerFactory, compression, pathFactory, cache);
this.schemaManager = schemaManager;
this.partitionType = partitionType;
this.writerFactory = writerFactory;
Expand Down Expand Up @@ -93,11 +93,7 @@ public List<ManifestFileMeta> write(List<ManifestEntry> entries) {

public RollingFileWriter<ManifestEntry, ManifestFileMeta> createRollingWriter() {
return new RollingFileWriter<>(
() ->
new ManifestEntryWriter(
writerFactory,
pathFactory.newPath(),
CoreOptions.FILE_COMPRESSION.defaultValue()),
() -> new ManifestEntryWriter(writerFactory, pathFactory.newPath(), compression),
suggestedFileSize);
}

Expand Down Expand Up @@ -157,6 +153,7 @@ public static class Factory {
private final SchemaManager schemaManager;
private final RowType partitionType;
private final FileFormat fileFormat;
private final String compression;
private final FileStorePathFactory pathFactory;
private final long suggestedFileSize;
@Nullable private final SegmentsCache<String> cache;
Expand All @@ -166,13 +163,15 @@ public Factory(
SchemaManager schemaManager,
RowType partitionType,
FileFormat fileFormat,
String compression,
FileStorePathFactory pathFactory,
long suggestedFileSize,
@Nullable SegmentsCache<String> cache) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
this.partitionType = partitionType;
this.fileFormat = fileFormat;
this.compression = compression;
this.pathFactory = pathFactory;
this.suggestedFileSize = suggestedFileSize;
this.cache = cache;
Expand All @@ -187,6 +186,7 @@ public ManifestFile create() {
new ManifestEntrySerializer(),
fileFormat.createReaderFactory(entryType),
fileFormat.createWriterFactory(entryType),
compression,
pathFactory.manifestFileFactory(),
suggestedFileSize,
cache);
Expand All @@ -199,6 +199,7 @@ public ObjectsFile<SimpleFileEntry> createSimpleFileEntryReader() {
new SimpleFileEntrySerializer(),
fileFormat.createReaderFactory(entryType),
fileFormat.createWriterFactory(entryType),
compression,
pathFactory.manifestFileFactory(),
cache);
}
Expand Down
Loading

0 comments on commit 26dcf5b

Please sign in to comment.