Skip to content

Commit

Permalink
[doc] Recommend to use Append For Scalable Table
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Dec 5, 2023
1 parent 168b682 commit d4a60c1
Showing 1 changed file with 93 additions and 102 deletions.
195 changes: 93 additions & 102 deletions docs/content/concepts/append-only-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,111 @@ under the License.
# Append Only Table

If a table does not have a primary key defined, it is an append-only table by default. Separated by the definition of bucket,
we have two different append-only mode: "Append For Queue" and "Append For Scalable Table".

## Append For Queue
we have two different append-only mode: "Append For Scalable Table" and "Append For Queue".

You can only insert a complete record into the table. No delete or update is supported, and you cannot define primary keys.
This type of table is suitable for use cases that do not require updates (such as log data synchronization).

## Append For Scalable Table

### Definition

By defining `'bucket' = '-1'` in table properties, you can assign a special mode (we call it "unaware-bucket mode") to this
table (see [Example]({{< ref "#example-1" >}})). In this mode, all the things are different. We don't have
the concept of bucket anymore, and we don't guarantee the order of streaming read. We regard this table as a batch off-line table (
although we can stream read and write still). All the records will go into one directory (for compatibility, we put them in bucket-0),
and we do not maintain the order anymore. As we don't have the concept of bucket, we will not shuffle the input records by bucket anymore,
which will speed up the inserting.

Using this mode, you can replace your Hive table to lake table.

{{< img src="/img/for-scalable.png">}}

### Compaction

In unaware-bucket mode, we don't do compaction in writer, instead, we use `Compact Coordinator` to scan the small files and submit compaction task
to `Compact Worker`. By this, we can easily do compaction for one simple data directory in parallel. In streaming mode, if you run insert sql in flink,
the topology will be like this:

{{< img src="/img/unaware-bucket-topo.png">}}

It will do its best to compact small files, but when a single small file in one partition remains long time
and no new file added to the partition, the `Compact Coordinator` will remove it from memory to reduce memory usage.
After you restart the job, it will scan the small files and add it to memory again. The options to control the compact
behavior is exactly the same as [Append For Qeueue]({{< ref "#compaction" >}}). If you set `write-only` to true, the
`Compact Coordinator` and `Compact Worker` will be removed in the topology.

The auto compaction is only supported in Flink engine streaming mode. You can also start a compaction job in flink by flink action in paimon
and disable all the other compaction by set `write-only`.

### Sort Compact

The data in a per-partition out of order will lead a slow select, compaction may slow down the inserting. It is a good choice for you to set
`write-only` for inserting job, and after per-partition data done, trigger a partition `Sort Compact` action. See [Sort Compact]({{< ref "maintenance/dedicated-compaction#sort-compact" >}}).

### Streaming Source

Unaware-bucket mode append-only table supported streaming read and write, but no longer guarantee order anymore. You cannot regard it
as a queue, instead, as a lake with storage bins. Every commit will generate a new record bin, we can read the
increase by reading the new record bin, but records in one bin are flowing to anywhere they want, and we fetch them in any possible order.
While in the `Append For Queue` mode, records are not stored in bins, but in record pipe. We can see the difference below.

### Streaming Multiple Partitions Write

Since the number of write tasks that Paimon-sink needs to handle is: the number of partitions to which the data is written * the number of buckets per partition.
Therefore, we need to try to control the number of write tasks per paimon-sink task as much as possible,so that it is distributed within a reasonable range.
If each sink-task handles too many write tasks, not only will it cause problems with too many small files, but it may also lead to out-of-memory errors.

In addition, write failures introduce orphan files, which undoubtedly adds to the cost of maintaining paimon. We need to avoid this problem as much as possible.

For flink-jobs with auto-merge enabled, we recommend trying to follow the following formula to adjust the parallelism of paimon-sink(This doesn't just apply to append-only-tables, it actually applies to most scenarios):
```
(N*B)/P < 100 (This value needs to be adjusted according to the actual situation)
N(the number of partitions to which the data is written)
B(bucket number)
P(parallelism of paimon-sink)
100 (This is an empirically derived threshold,For flink-jobs with auto-merge disabled, this value can be reduced.
However, please note that you are only transferring part of the work to the user-compaction-job, you still have to deal with the problem in essence,
the amount of work you have to deal with has not been reduced, and the user-compaction-job still needs to be adjusted according to the above formula.)
```
You can also set `write-buffer-spillable` to true, writer can spill the records to disk. This can reduce small
files as much as possible.To use this option, you need to have a certain size of local disks for your flink cluster. This is especially important for those using flink on k8s.

For append-only-table,You can set `write-buffer-for-append` option for append-only table. Setting this parameter to true, writer will cache
the records use Segment Pool to avoid OOM.

### Example

The following is an example of creating the Append-Only table and specifying the bucket key.

{{< tabs "create-append-only-table-unaware-bucket" >}}

{{< tab "Flink" >}}

```sql
CREATE TABLE MyTable (
product_id BIGINT,
price DOUBLE,
sales BIGINT
) WITH (
'bucket' = '-1'
);
```
{{< /tab >}}

{{< /tabs >}}

## Append For Queue

### Definition

In this mode, you can regard append-only table as a queue separated by bucket. Every record in the same bucket is ordered strictly,
streaming read will transfer the record to down-stream exactly in the order of writing. To use this mode, you do not need
streaming read will transfer the record to down-stream exactly in the order of writing. To use this mode, you do not need
to config special configurations, all the data will go into one bucket as a queue. You can also define the `bucket` and
`bucket-key` to enable larger parallelism and disperse data (see [Example]({{< ref "#example" >}})).

{{< img src="/img/for-queue.png">}}


### Compaction

By default, the sink node will automatically perform compaction to control the number of files. The following options
Expand Down Expand Up @@ -189,100 +277,3 @@ CREATE TABLE MyTable (
{{< /tab >}}

{{< /tabs >}}



## Append For Scalable Table

{{< hint info >}}
This is an experimental feature.
{{< /hint >}}

### Definition

By defining `'bucket' = '-1'` in table properties, you can assign a special mode (we call it "unaware-bucket mode") to this
table (see [Example]({{< ref "#example-1" >}})). In this mode, all the things are different. We don't have
the concept of bucket anymore, and we don't guarantee the order of streaming read. We regard this table as a batch off-line table (
although we can stream read and write still). All the records will go into one directory (for compatibility, we put them in bucket-0),
and we do not maintain the order anymore. As we don't have the concept of bucket, we will not shuffle the input records by bucket anymore,
which will speed up the inserting.

Using this mode, you can replace your Hive table to lake table.

{{< img src="/img/for-scalable.png">}}

### Compaction

In unaware-bucket mode, we don't do compaction in writer, instead, we use `Compact Coordinator` to scan the small files and submit compaction task
to `Compact Worker`. By this, we can easily do compaction for one simple data directory in parallel. In streaming mode, if you run insert sql in flink,
the topology will be like this:

{{< img src="/img/unaware-bucket-topo.png">}}

It will do its best to compact small files, but when a single small file in one partition remains long time
and no new file added to the partition, the `Compact Coordinator` will remove it from memory to reduce memory usage.
After you restart the job, it will scan the small files and add it to memory again. The options to control the compact
behavior is exactly the same as [Append For Qeueue]({{< ref "#compaction" >}}). If you set `write-only` to true, the
`Compact Coordinator` and `Compact Worker` will be removed in the topology.

The auto compaction is only supported in Flink engine streaming mode. You can also start a compaction job in flink by flink action in paimon
and disable all the other compaction by set `write-only`.

### Sort Compact

The data in a per-partition out of order will lead a slow select, compaction may slow down the inserting. It is a good choice for you to set
`write-only` for inserting job, and after per-partition data done, trigger a partition `Sort Compact` action. See [Sort Compact]({{< ref "maintenance/dedicated-compaction#sort-compact" >}}).

### Streaming Source

Unaware-bucket mode append-only table supported streaming read and write, but no longer guarantee order anymore. You cannot regard it
as a queue, instead, as a lake with storage bins. Every commit will generate a new record bin, we can read the
increase by reading the new record bin, but records in one bin are flowing to anywhere they want, and we fetch them in any possible order.
While in the `Append For Queue` mode, records are not stored in bins, but in record pipe. We can see the difference below.


### Example

The following is an example of creating the Append-Only table and specifying the bucket key.

{{< tabs "create-append-only-table-unaware-bucket" >}}

{{< tab "Flink" >}}

```sql
CREATE TABLE MyTable (
product_id BIGINT,
price DOUBLE,
sales BIGINT
) WITH (
'bucket' = '-1'
);
```
{{< /tab >}}

{{< /tabs >}}

## Multiple Partitions Write

Since the number of write tasks that Paimon-sink needs to handle is: the number of partitions to which the data is written * the number of buckets per partition.
Therefore, we need to try to control the number of write tasks per paimon-sink task as much as possible,so that it is distributed within a reasonable range.
If each sink-task handles too many write tasks, not only will it cause problems with too many small files, but it may also lead to out-of-memory errors.

In addition, write failures introduce orphan files, which undoubtedly adds to the cost of maintaining paimon. We need to avoid this problem as much as possible.

For flink-jobs with auto-merge enabled, we recommend trying to follow the following formula to adjust the parallelism of paimon-sink(This doesn't just apply to append-only-tables, it actually applies to most scenarios):
```
(N*B)/P < 100 (This value needs to be adjusted according to the actual situation)
N(the number of partitions to which the data is written)
B(bucket number)
P(parallelism of paimon-sink)
100 (This is an empirically derived threshold,For flink-jobs with auto-merge disabled, this value can be reduced.
However, please note that you are only transferring part of the work to the user-compaction-job, you still have to deal with the problem in essence,
the amount of work you have to deal with has not been reduced, and the user-compaction-job still needs to be adjusted according to the above formula.)
```
You can also set `write-buffer-spillable` to true, writer can spill the records to disk. This can reduce small
files as much as possible.To use this option, you need to have a certain size of local disks for your flink cluster. This is especially important for those using flink on k8s.

For append-only-table,You can set `write-buffer-for-append` option for append-only table. Setting this parameter to true, writer will cache
the records use Segment Pool to avoid OOM.

0 comments on commit d4a60c1

Please sign in to comment.