Skip to content

Commit

Permalink
[format] Default compression of file to zstd with level 1 (#3463)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Jun 4, 2024
1 parent eff9575 commit df06cd1
Show file tree
Hide file tree
Showing 39 changed files with 266 additions and 220 deletions.
4 changes: 1 addition & 3 deletions docs/content/append-table/append-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,12 @@ CREATE TABLE my_table (
price DOUBLE,
sales BIGINT
) WITH (
'file.compression' = 'zstd'
'file.compression.zstd-level' = '3'
);
```
{{< /tab >}}
{{< /tabs >}}

The recommended compression for the Append table is `'zstd'`.

## Automatic small file merging

In streaming writing job, without bucket definition, there is no compaction in writer, instead, will use
Expand Down
6 changes: 2 additions & 4 deletions docs/content/maintenance/write-performance.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,9 @@ layers to be in Avro format.

## File Compression

By default, Paimon uses high-performance compression algorithms such as LZ4 and SNAPPY, but their compression rates
are not so good. If you want to reduce the write/read performance, you can modify the compression algorithm:
By default, Paimon uses zstd with level 1, you can modify the compression algorithm:

1. `'file.compression'`: Default file compression format. If you need a higher compression rate, I recommend using `'ZSTD'`.
2. `'file.compression.per.level'`: Define different compression policies for different level. For example `'0:lz4,1:zstd'`.
`'file.compression.zstd-level'`: Default zstd level is 1. For higher compression rates, it can be configured to 9, but the read and write speed will significantly decrease.

## Stability

Expand Down
34 changes: 26 additions & 8 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -188,18 +188,18 @@
<td>Boolean</td>
<td>Force produce changelog in delete sql, or you can use 'streaming-read-overwrite' to read changelog from overwrite commit.</td>
</tr>
<tr>
<td><h5>deletion-vectors.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to enable deletion vectors mode. In this mode, index files containing deletion vectors are generated when data is written, which marks the data for deletion. During read operations, by applying these index files, merging can be avoided.</td>
</tr>
<tr>
<td><h5>deletion-vector.index-file.target-size</h5></td>
<td style="word-wrap: break-word;">2 mb</td>
<td>MemorySize</td>
<td>The target size of deletion vector index file.</td>
</tr>
<tr>
<td><h5>deletion-vectors.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to enable deletion vectors mode. In this mode, index files containing deletion vectors are generated when data is written, which marks the data for deletion. During read operations, by applying these index files, merging can be avoided.</td>
</tr>
<tr>
<td><h5>dynamic-bucket.assigner-parallelism</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down Expand Up @@ -249,17 +249,29 @@
<td>The threshold for read file async.</td>
</tr>
<tr>
<td><h5>file.compression</h5></td>
<td><h5>file.block-size</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>MemorySize</td>
<td>File block size of format, default value of orc stripe is 64 MB, and parquet row group is 128 MB.</td>
</tr>
<tr>
<td><h5>file.compression</h5></td>
<td style="word-wrap: break-word;">"zstd"</td>
<td>String</td>
<td>Default file compression format, orc is lz4 and parquet is snappy. It can be overridden by file.compression.per.level</td>
<td>Default file compression. For faster read and write, it is recommended to use LZ4.</td>
</tr>
<tr>
<td><h5>file.compression.per.level</h5></td>
<td style="word-wrap: break-word;"></td>
<td>Map</td>
<td>Define different compression policies for different level, you can add the conf like this: 'file.compression.per.level' = '0:lz4,1:zstd'.</td>
</tr>
<tr>
<td><h5>file.compression.zstd-level</h5></td>
<td style="word-wrap: break-word;">1</td>
<td>Integer</td>
<td>Default file compression zstd level. For higher compression rates, it can be configured to 9, but the read and write speed will significantly decrease.</td>
</tr>
<tr>
<td><h5>file.format</h5></td>
<td style="word-wrap: break-word;">"orc"</td>
Expand Down Expand Up @@ -357,6 +369,12 @@
<td>Float</td>
<td>The index load factor for lookup.</td>
</tr>
<tr>
<td><h5>manifest.compression</h5></td>
<td style="word-wrap: break-word;">"zstd"</td>
<td>String</td>
<td>Default file compression for manifest.</td>
</tr>
<tr>
<td><h5>manifest.format</h5></td>
<td style="word-wrap: break-word;">"avro"</td>
Expand Down
12 changes: 0 additions & 12 deletions docs/layouts/shortcodes/generated/orc_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,6 @@
<td>Integer</td>
<td>Comma-separated list of fields for which dictionary encoding is to be skipped in orc.</td>
</tr>
<tr>
<td><h5>orc.compress</h5></td>
<td style="word-wrap: break-word;">"lz4"</td>
<td>String</td>
<td>Define the compression codec for ORC file, if a higher compression ratio is required, it is recommended to configure it as 'zstd', and you can configure: orc.compression.zstd.level</td>
</tr>
<tr>
<td><h5>orc.compression.zstd.level</h5></td>
<td style="word-wrap: break-word;">3</td>
<td>Integer</td>
<td>Define the compression level to use with ZStandard codec while writing data. The valid range is 1~22.</td>
</tr>
<tr>
<td><h5>orc.dictionary.key.threshold</h5></td>
<td style="word-wrap: break-word;">0.8</td>
Expand Down
28 changes: 26 additions & 2 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,23 @@ public class CoreOptions implements Serializable {
public static final ConfigOption<String> FILE_COMPRESSION =
key("file.compression")
.stringType()
.defaultValue("zstd")
.withDescription(
"Default file compression. For faster read and write, it is recommended to use LZ4.");

public static final ConfigOption<Integer> FILE_COMPRESSION_ZSTD_LEVEL =
key("file.compression.zstd-level")
.intType()
.defaultValue(1)
.withDescription(
"Default file compression zstd level. For higher compression rates, it can be configured to 9, but the read and write speed will significantly decrease.");

public static final ConfigOption<MemorySize> FILE_BLOCK_SIZE =
key("file.block-size")
.memoryType()
.noDefaultValue()
.withDescription(
"Default file compression format, orc is lz4 and parquet is snappy. It can be overridden by "
+ FILE_COMPRESSION_PER_LEVEL.key());
"File block size of format, default value of orc stripe is 64 MB, and parquet row group is 128 MB.");

public static final ConfigOption<MemorySize> FILE_INDEX_IN_MANIFEST_THRESHOLD =
key("file-index.in-manifest-threshold")
Expand All @@ -173,6 +186,12 @@ public class CoreOptions implements Serializable {
.defaultValue(CoreOptions.FILE_FORMAT_AVRO)
.withDescription("Specify the message format of manifest files.");

public static final ConfigOption<String> MANIFEST_COMPRESSION =
key("manifest.compression")
.stringType()
.defaultValue("zstd")
.withDescription("Default file compression for manifest.");

public static final ConfigOption<MemorySize> MANIFEST_TARGET_FILE_SIZE =
key("manifest.target-file-size")
.memoryType()
Expand Down Expand Up @@ -1227,6 +1246,10 @@ public FileFormat manifestFormat() {
return createFileFormat(options, MANIFEST_FORMAT);
}

public String manifestCompression() {
return options.get(MANIFEST_COMPRESSION);
}

public MemorySize manifestTargetSize() {
return options.get(MANIFEST_TARGET_FILE_SIZE);
}
Expand Down Expand Up @@ -1325,6 +1348,7 @@ public boolean fieldCollectAggDistinct(String fieldName) {
.defaultValue(false));
}

@Nullable
public String fileCompression() {
return options.get(FILE_COMPRESSION);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,12 @@ private static Optional<FileFormat> fromIdentifier(
}

public static FileFormat getFileFormat(Options options, String formatIdentifier) {
int readBatchSize = options.get(CoreOptions.READ_BATCH_SIZE);
return FileFormat.fromIdentifier(
formatIdentifier,
new FormatContext(options.removePrefix(formatIdentifier + "."), readBatchSize));
FormatContext context =
new FormatContext(
options.removePrefix(formatIdentifier + "."),
options.get(CoreOptions.READ_BATCH_SIZE),
options.get(CoreOptions.FILE_COMPRESSION_ZSTD_LEVEL),
options.get(CoreOptions.FILE_BLOCK_SIZE));
return FileFormat.fromIdentifier(formatIdentifier, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@

package org.apache.paimon.format;

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;

import javax.annotation.Nullable;

/** Factory to create {@link FileFormat}. */
public interface FileFormatFactory {

Expand All @@ -29,12 +33,26 @@ public interface FileFormatFactory {

/** the format context. */
class FormatContext {

private final Options formatOptions;
private final int readBatchSize;
private final int zstdLevel;
@Nullable private final MemorySize blockSize;

@VisibleForTesting
public FormatContext(Options formatOptions, int readBatchSize) {
this(formatOptions, readBatchSize, 1, null);
}

public FormatContext(
Options formatOptions,
int readBatchSize,
int zstdLevel,
@Nullable MemorySize blockSize) {
this.formatOptions = formatOptions;
this.readBatchSize = readBatchSize;
this.zstdLevel = zstdLevel;
this.blockSize = blockSize;
}

public Options formatOptions() {
Expand All @@ -44,5 +62,14 @@ public Options formatOptions() {
public int readBatchSize() {
return readBatchSize;
}

public int zstdLevel() {
return zstdLevel;
}

@Nullable
public MemorySize blockSize() {
return blockSize;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

import org.apache.paimon.fs.PositionOutputStream;

import javax.annotation.Nullable;

import java.io.IOException;

/** A factory to create {@link FormatWriter} for file. */
Expand All @@ -35,5 +33,5 @@ public interface FormatWriterFactory {
* @throws IOException Thrown if the writer cannot be opened, or if the output stream throws an
* exception.
*/
FormatWriter create(PositionOutputStream out, @Nullable String compression) throws IOException;
FormatWriter create(PositionOutputStream out, String compression) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void testSimpleTypes() throws IOException {
FileFormat format = fileFormat();

PositionOutputStream out = fileIO.newOutputStream(file, false);
FormatWriter writer = format.createWriterFactory(rowType).create(out, null);
FormatWriter writer = format.createWriterFactory(rowType).create(out, "zstd");
writer.addElement(GenericRow.of(1, 1L));
writer.addElement(GenericRow.of(2, 2L));
writer.addElement(GenericRow.of(3, null));
Expand Down Expand Up @@ -118,7 +118,7 @@ public void testFullTypes() throws IOException {
FileFormat format = fileFormat();

PositionOutputStream out = fileIO.newOutputStream(file, false);
FormatWriter writer = format.createWriterFactory(rowType).create(out, null);
FormatWriter writer = format.createWriterFactory(rowType).create(out, "zstd");
writer.addElement(expected);
writer.flush();
writer.finish();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ protected ManifestFile.Factory manifestFileFactory(boolean forWrite) {
schemaManager,
partitionType,
options.manifestFormat(),
options.manifestCompression(),
pathFactory(),
options.manifestTargetSize().getBytes(),
forWrite ? writeManifestCache : null);
Expand All @@ -132,12 +133,14 @@ protected ManifestList.Factory manifestListFactory(boolean forWrite) {
return new ManifestList.Factory(
fileIO,
options.manifestFormat(),
options.manifestCompression(),
pathFactory(),
forWrite ? writeManifestCache : null);
}

protected IndexManifestFile.Factory indexManifestFileFactory() {
return new IndexManifestFile.Factory(fileIO, options.manifestFormat(), pathFactory());
return new IndexManifestFile.Factory(
fileIO, options.manifestFormat(), options.manifestCompression(), pathFactory());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@ private IndexManifestFile(
FileIO fileIO,
FormatReaderFactory readerFactory,
FormatWriterFactory writerFactory,
String compression,
PathFactory pathFactory) {
super(
fileIO,
new IndexManifestEntrySerializer(),
readerFactory,
writerFactory,
compression,
pathFactory,
null);
}
Expand All @@ -68,11 +70,17 @@ public static class Factory {

private final FileIO fileIO;
private final FileFormat fileFormat;
private final String compression;
private final FileStorePathFactory pathFactory;

public Factory(FileIO fileIO, FileFormat fileFormat, FileStorePathFactory pathFactory) {
public Factory(
FileIO fileIO,
FileFormat fileFormat,
String compression,
FileStorePathFactory pathFactory) {
this.fileIO = fileIO;
this.fileFormat = fileFormat;
this.compression = compression;
this.pathFactory = pathFactory;
}

Expand All @@ -82,6 +90,7 @@ public IndexManifestFile create() {
fileIO,
fileFormat.createReaderFactory(schema),
fileFormat.createWriterFactory(schema),
compression,
pathFactory.indexManifestFileFactory());
}
}
Expand Down
Loading

0 comments on commit df06cd1

Please sign in to comment.