Skip to content

Commit

Permalink
Merge branch 'apache:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
hzjhjjyy authored Jul 6, 2024
2 parents b43dd41 + db8bcd7 commit bb56966
Show file tree
Hide file tree
Showing 197 changed files with 4,676 additions and 1,856 deletions.
2 changes: 1 addition & 1 deletion docs/content/concepts/basic-concepts.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ A manifest file is a file containing changes about LSM data files and changelog

## Data Files

Data files are grouped by partitions. Currently, Paimon supports using orc (default), parquet and avro as data file's format.
Data files are grouped by partitions. Currently, Paimon supports using parquet (default), orc and avro as data file's format.

## Partition

Expand Down
2 changes: 1 addition & 1 deletion docs/content/concepts/specification.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ The data file meta is:
2. fileSize: file size.
3. rowCount: total number of rows (including add & delete) in this file.
4. minKey: the minimum key of this file.
5. minKey: the maximum key of this file.
5. maxKey: the maximum key of this file.
6. keyStats: the statistics of the key.
7. valueStats: the statistics of the value.
8. minSequenceNumber: the minimum sequence number.
Expand Down
2 changes: 1 addition & 1 deletion docs/content/flink/cdc-ingestion/_index.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
title: CDC Ingestion
bookCollapseSection: true
weight: 96
weight: 95
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
---
title: "Manage Partition"
weight: 5
title: "Expire Partition"
weight: 96
type: docs
aliases:
- /maintenance/manage-partition.html
- /flink/expire-partition.html
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
Expand All @@ -26,7 +26,7 @@ under the License.

## Expiring Partitions

You can set `partition.expiration-time` when creating a partitioned table. Paimon will periodically check
You can set `partition.expiration-time` when creating a partitioned table. Paimon streaming sink will periodically check
the status of partitions and delete expired partitions according to time.

How to determine whether a partition has expired: compare the time extracted from the partition with the current
Expand Down
17 changes: 9 additions & 8 deletions docs/content/flink/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ All available procedures are listed below.
<td>
To delete a tag. Arguments:
<li>identifier: the target table identifier. Cannot be empty.</li>
<li>tagName: name of the tag to be deleted.</li>
<li>tagName: name of the tag to be deleted. If you specify multiple tags, delimiter is ','.</li>
</td>
<td>
CALL sys.delete_tag('default.T', 'my_tag')
Expand Down Expand Up @@ -179,14 +179,15 @@ All available procedures are listed below.
</td>
<td>
To remove the orphan data files and metadata files. Arguments:
<li>identifier: the target table identifier. Cannot be empty.</li>
<li>identifier: the target table identifier. Cannot be empty, you can use database_name.* to clean whole database.</li>
<li>olderThan: to avoid deleting newly written files, this procedure only
deletes orphan files older than 1 day by default. This argument can modify the interval.
</li>
<li>dryRun: when true, view only orphan files, don't actually remove files. Default is false.</li>
</td>
<td>CALL remove_orphan_files('default.T', '2023-10-31 12:00:00')<br/><br/>
CALL remove_orphan_files('default.T', '2023-10-31 12:00:00', true)
CALL remove_orphan_files('default.*', '2023-10-31 12:00:00')<br/><br/>
CALL remove_orphan_files('default.T', '2023-10-31 12:00:00', true)
</td>
</tr>
<tr>
Expand Down Expand Up @@ -330,24 +331,24 @@ All available procedures are listed below.
<td>
To delete a branch. Arguments:
<li>identifier: the target table identifier. Cannot be empty.</li>
<li>branchName: name of the branch to be deleted.</li>
<li>branchName: name of the branch to be deleted. If you specify multiple branches, delimiter is ','.</li>
</td>
<td>
CALL sys.delete_branch('default.T', 'branch1')
</td>
</tr>
<tr>
<td>merge_branch</td>
<td>fast_forward</td>
<td>
CALL [catalog.]sys.merge_branch('identifier', 'branchName')
CALL [catalog.]sys.fast_forward('identifier', 'branchName')
</td>
<td>
To merge a branch to main branch. Arguments:
To fast_forward a branch to main branch. Arguments:
<li>identifier: the target table identifier. Cannot be empty.</li>
<li>branchName: name of the branch to be merged.</li>
</td>
<td>
CALL sys.merge_branch('default.T', 'branch1')
CALL sys.fast_forward('default.T', 'branch1')
</td>
</tr>
</tbody>
Expand Down
2 changes: 1 addition & 1 deletion docs/content/flink/sql-ddl.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ If you need cross partition upsert (primary keys not contain all partition field
{{< /hint >}}

{{< hint info >}}
By configuring [partition.expiration-time]({{< ref "maintenance/manage-partition" >}}), expired partitions can be automatically deleted.
By configuring [partition.expiration-time]({{< ref "flink/expire-partition" >}}), expired partitions can be automatically deleted.
{{< /hint >}}

### Specify Statistics Mode
Expand Down
10 changes: 5 additions & 5 deletions docs/content/maintenance/manage-branches.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,16 +127,16 @@ INSERT INTO t /*+ OPTIONS('branch' = 'branch1') */ SELECT ...
{{< /tabs >}}
## Merge Branch
## Fast Forward
Merging the custom branch to main will delete all the snapshots, tags and schemas in the main branch that are created after the branch's initial tag. And copy snapshots, tags and schemas from the branch to the main branch.
Fast-Forward the custom branch to main will delete all the snapshots, tags and schemas in the main branch that are created after the branch's initial tag. And copy snapshots, tags and schemas from the branch to the main branch.
{{< tabs "merge-branch" >}}
{{< tabs "fast_forward" >}}
{{< tab "Flink" >}}
```sql
CALL sys.merge_branch('default.T', 'branch1');
CALL sys.fast_forward('default.T', 'branch1');
```
{{< /tab >}}
Expand All @@ -148,7 +148,7 @@ Run the following command:
```bash
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-{{< version >}}.jar \
merge_branch \
fast_forward \
--warehouse <warehouse-path> \
--database <database-name> \
--table <table-name> \
Expand Down
18 changes: 11 additions & 7 deletions docs/content/maintenance/manage-snapshots.md
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ Please note that too short retain time or too small retain number may result in:
- Batch queries cannot find the file. For example, the table is relatively large and
the batch query takes 10 minutes to read, but the snapshot from 10 minutes ago
expires, at which point the batch query will read a deleted snapshot.
- Streaming reading jobs on table files (without the external log system) fail to restart.
- Streaming reading jobs on table files fail to restart.
When the job restarts, the snapshot it recorded may have expired. (You can use
[Consumer Id]({{< ref "flink/sql-query#consumer-id" >}}) to protect streaming reading
in a small retain time of snapshot expiration).
Expand Down Expand Up @@ -296,7 +296,15 @@ submit a `remove_orphan_files` job to clean them:
{{< tabs "remove_orphan_files" >}}
{{< tab "Flink" >}}
{{< tab "Spark SQL/Flink SQL" >}}
```sql
CALL sys.remove_orphan_files(table => "my_db.my_table", [older_than => "2023-10-31 12:00:00"])

CALL sys.remove_orphan_files(table => "my_db.*", [older_than => "2023-10-31 12:00:00"])
```
{{< /tab >}}
{{< tab "Flink Action" >}}
```bash
<FLINK_HOME>/bin/flink run \
Expand All @@ -322,12 +330,8 @@ To avoid deleting files that are newly added by other writing jobs, this action
--older_than '2023-10-31 12:00:00'
```
{{< /tab >}}
The table can be `*` to clean all tables in the database.
{{< tab "Spark" >}}
```sql
CALL sys.remove_orphan_files(table => "tableId", [older_than => "2023-10-31 12:00:00"])
```
{{< /tab >}}
{{< /tabs >}}
105 changes: 1 addition & 104 deletions docs/content/maintenance/write-performance.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,109 +68,6 @@ It is recommended that the parallelism of sink should be less than or equal to t
</tbody>
</table>

## Compaction

### Asynchronous Compaction

Compaction is inherently asynchronous, but if you want it to be completely asynchronous and not blocking writing,
expect a mode to have maximum writing throughput, the compaction can be done slowly and not in a hurry.
You can use the following strategies for your table:

```shell
num-sorted-run.stop-trigger = 2147483647
sort-spill-threshold = 10
changelog-producer.lookup-wait = false
```

This configuration will generate more files during peak write periods and gradually merge into optimal read
performance during low write periods.

In the case of `'changelog-producer' = 'lookup'`, by default, the lookup will be completed at checkpointing, which
will block the checkpoint. So if you want an asynchronous lookup, you should also set `'changelog-producer.lookup-wait' = 'false'`.

### Number of Sorted Runs to Pause Writing

When the number of sorted runs is small, Paimon writers will perform compaction asynchronously in separated threads, so
records can be continuously written into the table. However, to avoid unbounded growth of sorted runs, writers will
pause writing when the number of sorted runs hits the threshold. The following table property determines
the threshold.

<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 20%">Option</th>
<th class="text-left" style="width: 5%">Required</th>
<th class="text-left" style="width: 5%">Default</th>
<th class="text-left" style="width: 10%">Type</th>
<th class="text-left" style="width: 60%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><h5>num-sorted-run.stop-trigger</h5></td>
<td>No</td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>The number of sorted runs that trigger the stopping of writes, the default value is 'num-sorted-run.compaction-trigger' + 3.</td>
</tr>
</tbody>
</table>

Write stalls will become less frequent when `num-sorted-run.stop-trigger` becomes larger, thus improving writing
performance. However, if this value becomes too large, more memory and CPU time will be needed when querying the
table. If you are concerned about the OOM problem, please configure the following option.
Its value depends on your memory size.

<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 20%">Option</th>
<th class="text-left" style="width: 5%">Required</th>
<th class="text-left" style="width: 5%">Default</th>
<th class="text-left" style="width: 10%">Type</th>
<th class="text-left" style="width: 60%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><h5>sort-spill-threshold</h5></td>
<td>No</td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>If the maximum number of sort readers exceeds this value, a spill will be attempted. This prevents too many readers from consuming too much memory and causing OOM.</td>
</tr>
</tbody>
</table>

### Number of Sorted Runs to Trigger Compaction

Paimon uses [LSM tree]({{< ref "primary-key-table/overview#lsm-trees" >}}) which supports a large number of updates. LSM organizes files in several [sorted runs]({{< ref "primary-key-table/overview#sorted-runs" >}}). When querying records from an LSM tree, all sorted runs must be combined to produce a complete view of all records.

One can easily see that too many sorted runs will result in poor query performance. To keep the number of sorted runs in a reasonable range, Paimon writers will automatically perform [compactions]({{< ref "primary-key-table/overview#compaction" >}}). The following table property determines the minimum number of sorted runs to trigger a compaction.

<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 20%">Option</th>
<th class="text-left" style="width: 5%">Required</th>
<th class="text-left" style="width: 5%">Default</th>
<th class="text-left" style="width: 10%">Type</th>
<th class="text-left" style="width: 60%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><h5>num-sorted-run.compaction-trigger</h5></td>
<td>No</td>
<td style="word-wrap: break-word;">5</td>
<td>Integer</td>
<td>The sorted run number to trigger compaction. Includes level0 files (one file one sorted run) and high-level runs (one level one sorted run).</td>
</tr>
</tbody>
</table>

Compaction will become less frequent when `num-sorted-run.compaction-trigger` becomes larger, thus improving writing performance. However, if this value becomes too large, more memory and CPU time will be needed when querying the table. This is a trade-off between writing and query performance.

## Local Merging

If your job suffers from primary key data skew
Expand Down Expand Up @@ -239,7 +136,7 @@ There are three main places in Paimon writer that takes up memory:
* Writer's memory buffer, shared and preempted by all writers of a single task. This memory value can be adjusted by the `write-buffer-size` table property.
* Memory consumed when merging several sorted runs for compaction. Can be adjusted by the `num-sorted-run.compaction-trigger` option to change the number of sorted runs to be merged.
* If the row is very large, reading too many lines of data at once will consume a lot of memory when making a compaction. Reducing the `read.batch-size` option can alleviate the impact of this case.
* The memory consumed by writing columnar (ORC, Parquet, etc.) file. Decreasing the `orc.write.batch-size` option can reduce the consumption of memory for ORC format.
* The memory consumed by writing columnar ORC file. Decreasing the `orc.write.batch-size` option can reduce the consumption of memory for ORC format.
* If files are automatically compaction in the write task, dictionaries for certain large columns can significantly consume memory during compaction.
* To disable dictionary encoding for all fields in Parquet format, set `'parquet.enable.dictionary'= 'false'`.
* To disable dictionary encoding for all fields in ORC format, set `orc.dictionary.key.threshold='0'`. Additionally,set `orc.column.encoding.direct='field1,field2'` to disable dictionary encoding for specific columns.
Expand Down
6 changes: 2 additions & 4 deletions docs/content/primary-key-table/changelog-producer.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
title: "Changelog Producer"
weight: 4
weight: 5
type: docs
aliases:
- /primary-key-table/changelog-producer.html
Expand Down Expand Up @@ -31,9 +31,7 @@ Streaming write can continuously produce the latest changes for streaming read.
By specifying the `changelog-producer` table property when creating the table, users can choose the pattern of changes produced from table files.

{{< hint info >}}

The `changelog-producer` table property only affects changelog from table files. It does not affect the external log system.

`changelog-producer` may significantly reduce compaction performance, please do not enable it unless necessary.
{{< /hint >}}

## None
Expand Down
Loading

0 comments on commit bb56966

Please sign in to comment.