Skip to content

Commit

Permalink
Merge branch 'apache:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
hzjhjjyy authored Jun 4, 2024
2 parents 1242646 + 97aa226 commit a92c221
Show file tree
Hide file tree
Showing 127 changed files with 6,041 additions and 663 deletions.
4 changes: 4 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,10 @@ paimon-format/src/main/java/org/apache/orc/OrcConf.java
paimon-format/src/main/java/org/apache/orc/OrcFile.java
from https://orc.apache.org/ version 2.0

paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
from https://parquet.apache.org/ version 1.14.0

MIT License
-----------

Expand Down
4 changes: 1 addition & 3 deletions docs/content/append-table/append-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,12 @@ CREATE TABLE my_table (
price DOUBLE,
sales BIGINT
) WITH (
'file.compression' = 'zstd'
'file.compression.zstd-level' = '3'
);
```
{{< /tab >}}
{{< /tabs >}}

The recommended compression for the Append table is `'zstd'`.

## Automatic small file merging

In streaming writing job, without bucket definition, there is no compaction in writer, instead, will use
Expand Down
6 changes: 3 additions & 3 deletions docs/content/engines/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ under the License.

## Compatibility Matrix

| Engine | Version | Batch Read | Batch Write | Create Table | Alter Table | Streaming Write | Streaming Read | Batch Overwrite | DELETE & UPDATE | MERGE INTO |
|:-------------------------------------------------------------------------------:|:-------------:|:-----------:|:-------------:|:-------------:|:-------------:|:----------------:|:----------------:|:----------------:|:------------------:|:-----------:|
| Engine | Version | Batch Read | Batch Write | Create Table | Alter Table | Streaming Write | Streaming Read | Batch Overwrite | DELETE & UPDATE | MERGE INTO |
|:-------------------------------------------------------------------------------:|:-------------:|:-----------:|:-------------:|:-------------:|:-------------:|:----------------:|:----------------:|:---------------:|:------------------:|:-----------:|
| Flink | 1.15 - 1.19 |||| ✅(1.17+) |||| ✅(1.17+) ||
| Spark | 3.1 - 3.5 || ✅(3.3+) ||| ✅(3.3+) | ✅(3.3+) | ✅(3.3+) | ✅(3.2+) | ✅(3.2+) |
| Spark | 3.1 - 3.5 || ✅(3.3+) ||| ✅(3.3+) | ✅(3.3+) | ✅(3.2+) | ✅(3.2+) | ✅(3.2+) |
| Hive | 2.1 - 3.1 ||||||||||
| Trino | 420 - 439 || ✅(427+) | ✅(427+) | ✅(427+) ||||||
| Presto | 0.236 - 0.280 ||||||||||
Expand Down
32 changes: 32 additions & 0 deletions docs/content/flink/sql-write.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,38 @@ snapshot expiration, and even partition expiration in Flink Sink (if it is confi

For multiple jobs to write the same table, you can refer to [dedicated compaction job]({{< ref "maintenance/dedicated-compaction#dedicated-compaction-job" >}}) for more info.

### Clustering

In Paimon, clustering is a feature that allows you to cluster data in your [Append Table]({{< ref "append-table/append-table#Append Table" >}})
based on the values of certain columns during the write process. This organization of data can significantly enhance the efficiency of downstream
tasks when reading the data, as it enables faster and more targeted data retrieval. This feature is only supported for [Append Table]({{< ref "append-table/append-table#Append Table" >}})
and batch execution mode.

To utilize clustering, you can specify the columns you want to cluster when creating or writing to a table. Here's a simple example of how to enable clustering:

```sql
CREATE TABLE my_table (
a STRING,
b STRING,
c STRING,
) WITH (
'sink.clustering.by-columns' = 'a,b',
);
```

You can also use SQL hints to dynamically set clustering options:

```sql
INSERT INTO my_table /*+ OPTIONS('sink.clustering.by-columns' = 'a,b') */
SELECT * FROM source;
```

The data is clustered using an automatically chosen strategy (such as ORDER, ZORDER, or HILBERT), but you can manually specify the clustering strategy
by setting the `sink.clustering.strategy`. Clustering relies on sampling and sorting. If the clustering process takes too much time, you can decrease
the total sample number by setting the `sink.clustering.sample-factor` or disable the sorting step by setting the `sink.clustering.sort-in-cluster` to false.

You can refer to [FlinkConnectorOptions]({{< ref "maintenance/configurations#FlinkConnectorOptions" >}}) for more info about the configurations above.

## Overwriting the Whole Table

For unpartitioned tables, Paimon supports overwriting the whole table.
Expand Down
2 changes: 1 addition & 1 deletion docs/content/maintenance/dedicated-compaction.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ Run the following command to submit a compaction job for the table.
--table <table-name> \
[--partition <partition-name>] \
[--table_conf <table_conf>] \
[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]] \
[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]
```
Example: compact table
Expand Down
6 changes: 2 additions & 4 deletions docs/content/maintenance/write-performance.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,9 @@ layers to be in Avro format.

## File Compression

By default, Paimon uses high-performance compression algorithms such as LZ4 and SNAPPY, but their compression rates
are not so good. If you want to reduce the write/read performance, you can modify the compression algorithm:
By default, Paimon uses zstd with level 1, you can modify the compression algorithm:

1. `'file.compression'`: Default file compression format. If you need a higher compression rate, I recommend using `'ZSTD'`.
2. `'file.compression.per.level'`: Define different compression policies for different level. For example `'0:lz4,1:zstd'`.
`'file.compression.zstd-level'`: Default zstd level is 1. For higher compression rates, it can be configured to 9, but the read and write speed will significantly decrease.

## Stability

Expand Down
82 changes: 82 additions & 0 deletions docs/content/migration/clone-tables.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
---
title: "Clone Tables"
weight: 3
type: docs
aliases:
- /migration/clone-tables.html
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Clone Tables

Paimon supports cloning tables for data migration.
Currently, only table files used by the latest snapshot will be cloned.

To clone a table, run the following command to submit a clone job.
If the table you clone is not modified at the same time, it is recommended to submit a Flink batch job for better performance.
However, if you want to clone the table while writing it at the same time, submit a Flink streaming job for automatic failure recovery.

```bash
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-{{< version >}}.jar \
clone \
--warehouse <source-warehouse-path> \
[--database <source-database-name>] \
[--table <source-table-name>] \
[--catalog_conf <source-paimon-catalog-conf> [--catalog_conf <source-paimon-catalog-conf> ...]] \
--target_warehouse <target-warehouse-path> \
[--target_database <target-database>] \
[--target_table <target-table-name>] \
[--target_catalog_conf <target-paimon-catalog-conf> [--target_catalog_conf <target-paimon-catalog-conf> ...]]
[--parallelism <parallelism>]
```
{{< hint info >}}
1. If `database` is not specified, all tables in all databases of the specified warehouse will be cloned.
2. If `table` is not specified, all tables of the specified database will be cloned.
{{< /hint >}}
Example: Clone `test_db.test_table` from source warehouse to target warehouse.
```bash
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-{{< version >}}.jar \
clone \
--warehouse s3:///path/to/warehouse_source \
--database test_db \
--table test_table \
--catalog_conf s3.endpoint=https://****.com \
--catalog_conf s3.access-key=***** \
--catalog_conf s3.secret-key=***** \
--target_warehouse s3:///path/to/warehouse_target \
--target_database test_db \
--target_table test_table \
--target_catalog_conf s3.endpoint=https://****.com \
--target_catalog_conf s3.access-key=***** \
--target_catalog_conf s3.secret-key=*****
```
For more usage of the clone action, see
```bash
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-{{< version >}}.jar \
clone --help
```
5 changes: 2 additions & 3 deletions docs/content/primary-key-table/merge-engine.md
Original file line number Diff line number Diff line change
Expand Up @@ -354,9 +354,8 @@ By specifying `'merge-engine' = 'first-row'`, users can keep the first row of th
`deduplicate` merge engine that in the `first-row` merge engine, it will generate insert only changelog.

{{< hint info >}}
1. `first-row` merge engine must be used together with `lookup` [changelog producer]({{< ref "primary-key-table/changelog-producer" >}}).
2. You can not specify `sequence.field`.
3. Not accept `DELETE` and `UPDATE_BEFORE` message. You can config `ignore-delete` to ignore these two kinds records.
1. You can not specify `sequence.field`.
2. Not accept `DELETE` and `UPDATE_BEFORE` message. You can config `ignore-delete` to ignore these two kinds records.
{{< /hint >}}

This is of great help in replacing log deduplication in streaming computation.
34 changes: 26 additions & 8 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -188,18 +188,18 @@
<td>Boolean</td>
<td>Force produce changelog in delete sql, or you can use 'streaming-read-overwrite' to read changelog from overwrite commit.</td>
</tr>
<tr>
<td><h5>deletion-vectors.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to enable deletion vectors mode. In this mode, index files containing deletion vectors are generated when data is written, which marks the data for deletion. During read operations, by applying these index files, merging can be avoided.</td>
</tr>
<tr>
<td><h5>deletion-vector.index-file.target-size</h5></td>
<td style="word-wrap: break-word;">2 mb</td>
<td>MemorySize</td>
<td>The target size of deletion vector index file.</td>
</tr>
<tr>
<td><h5>deletion-vectors.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to enable deletion vectors mode. In this mode, index files containing deletion vectors are generated when data is written, which marks the data for deletion. During read operations, by applying these index files, merging can be avoided.</td>
</tr>
<tr>
<td><h5>dynamic-bucket.assigner-parallelism</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down Expand Up @@ -249,17 +249,29 @@
<td>The threshold for read file async.</td>
</tr>
<tr>
<td><h5>file.compression</h5></td>
<td><h5>file.block-size</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>MemorySize</td>
<td>File block size of format, default value of orc stripe is 64 MB, and parquet row group is 128 MB.</td>
</tr>
<tr>
<td><h5>file.compression</h5></td>
<td style="word-wrap: break-word;">"zstd"</td>
<td>String</td>
<td>Default file compression format, orc is lz4 and parquet is snappy. It can be overridden by file.compression.per.level</td>
<td>Default file compression. For faster read and write, it is recommended to use LZ4.</td>
</tr>
<tr>
<td><h5>file.compression.per.level</h5></td>
<td style="word-wrap: break-word;"></td>
<td>Map</td>
<td>Define different compression policies for different level, you can add the conf like this: 'file.compression.per.level' = '0:lz4,1:zstd'.</td>
</tr>
<tr>
<td><h5>file.compression.zstd-level</h5></td>
<td style="word-wrap: break-word;">1</td>
<td>Integer</td>
<td>Default file compression zstd level. For higher compression rates, it can be configured to 9, but the read and write speed will significantly decrease.</td>
</tr>
<tr>
<td><h5>file.format</h5></td>
<td style="word-wrap: break-word;">"orc"</td>
Expand Down Expand Up @@ -357,6 +369,12 @@
<td>Float</td>
<td>The index load factor for lookup.</td>
</tr>
<tr>
<td><h5>manifest.compression</h5></td>
<td style="word-wrap: break-word;">"zstd"</td>
<td>String</td>
<td>Default file compression for manifest.</td>
</tr>
<tr>
<td><h5>manifest.format</h5></td>
<td style="word-wrap: break-word;">"avro"</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@
<td>Boolean</td>
<td>When changelog-producer is set to LOOKUP, commit will wait for changelog generation by lookup.</td>
</tr>
<tr>
<td><h5>end-input.watermark</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Long</td>
<td>Optional endInput watermark used in case of batch mode or bounded stream.</td>
</tr>
<tr>
<td><h5>lookup.async</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down Expand Up @@ -170,6 +176,30 @@
<td>Duration</td>
<td>If no records flow in a partition of a stream for that amount of time, then that partition is considered "idle" and will not hold back the progress of watermarks in downstream operators.</td>
</tr>
<tr>
<td><h5>sink.clustering.by-columns</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Specifies the column name(s) used for comparison during range partitioning, in the format 'columnName1,columnName2'. If not set or set to an empty string, it indicates that the range partitioning feature is not enabled. This option will be effective only for bucket unaware table without primary keys and batch execution mode.</td>
</tr>
<tr>
<td><h5>sink.clustering.sample-factor</h5></td>
<td style="word-wrap: break-word;">100</td>
<td>Integer</td>
<td>Specifies the sample factor. Let S represent the total number of samples, F represent the sample factor, and P represent the sink parallelism, then S=F×P. The minimum allowed sample factor is 20.</td>
</tr>
<tr>
<td><h5>sink.clustering.sort-in-cluster</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Indicates whether to further sort data belonged to each sink task after range partitioning.</td>
</tr>
<tr>
<td><h5>sink.clustering.strategy</h5></td>
<td style="word-wrap: break-word;">"auto"</td>
<td>String</td>
<td>Specifies the comparison algorithm used for range partitioning, including 'zorder', 'hilbert', and 'order', corresponding to the z-order curve algorithm, hilbert curve algorithm, and basic type comparison algorithm, respectively. When not configured, it will automatically determine the algorithm based on the number of columns in 'sink.clustering.by-columns'. 'order' is used for 1 column, 'zorder' for less than 5 columns, and 'hilbert' for 5 or more columns.</td>
</tr>
<tr>
<td><h5>sink.committer-cpu</h5></td>
<td style="word-wrap: break-word;">1.0</td>
Expand Down
12 changes: 0 additions & 12 deletions docs/layouts/shortcodes/generated/orc_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,6 @@
<td>Integer</td>
<td>Comma-separated list of fields for which dictionary encoding is to be skipped in orc.</td>
</tr>
<tr>
<td><h5>orc.compress</h5></td>
<td style="word-wrap: break-word;">"lz4"</td>
<td>String</td>
<td>Define the compression codec for ORC file, if a higher compression ratio is required, it is recommended to configure it as 'zstd', and you can configure: orc.compression.zstd.level</td>
</tr>
<tr>
<td><h5>orc.compression.zstd.level</h5></td>
<td style="word-wrap: break-word;">3</td>
<td>Integer</td>
<td>Define the compression level to use with ZStandard codec while writing data. The valid range is 1~22.</td>
</tr>
<tr>
<td><h5>orc.dictionary.key.threshold</h5></td>
<td style="word-wrap: break-word;">0.8</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,33 @@ public void testOrcReadProjection1() throws Exception {
*/
}

@Test
public void testParquetReadProjection() throws Exception {
innerTestProjection(
Collections.singletonMap("parquet", prepareData(orc(), "parquet")),
new int[] {0, 5, 10, 14});
/*
* OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Mac OS X 10.16
* Apple M1 Pro
* read: Best/Avg Time(ms) Row Rate(K/s) Per Row(ns) Relative
* ------------------------------------------------------------------------------------------------
* OPERATORTEST_read_read-orc 716 / 728 4187.4 238.8 1.0X
*/
}

@Test
public void testParquetReadProjection1() throws Exception {
innerTestProjection(
Collections.singletonMap("parquet", prepareData(orc(), "parquet")), new int[] {10});
/*
* OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Mac OS X 10.16
* Apple M1 Pro
* read: Best/Avg Time(ms) Row Rate(K/s) Per Row(ns) Relative
* ------------------------------------------------------------------------------------------------
* OPERATORTEST_read_read-orc 716 / 728 4187.4 238.8 1.0X
*/
}

private Options orc() {
Options options = new Options();
options.set(CoreOptions.FILE_FORMAT, CoreOptions.FILE_FORMAT_ORC);
Expand Down
Loading

0 comments on commit a92c221

Please sign in to comment.