diff --git a/docs/content/flink/consumer-id.md b/docs/content/flink/consumer-id.md new file mode 100644 index 000000000000..89450bf57d7e --- /dev/null +++ b/docs/content/flink/consumer-id.md @@ -0,0 +1,120 @@ +--- +title: "Consumer ID" +weight: 5 +type: docs +aliases: +- /flink/consumer-id.html +--- + + +# Consumer ID + +Consumer id can help you accomplish the following two things: + +1. Safe consumption: When deciding whether a snapshot has expired, Paimon looks at all the consumers of the table in + the file system, and if there are consumers that still depend on this snapshot, then this snapshot will not be + deleted by expiration. +2. Resume from breakpoint: When previous job is stopped, the newly started job can continue to consume from the previous + progress without resuming from the state. + +## Usage + +You can specify the `consumer-id` when streaming read table. + +The consumer will prevent expiration of the snapshot. In order to prevent too many snapshots caused by mistakes, +you need to specify `'consumer.expiration-time'` to manage the lifetime of consumers. + +```sql +ALTER TABLE t SET ('consumer.expiration-time' = '1 d'); +``` + +Then, restart streaming write job of this table, expiration of consumers will be triggered in writing job. + +```sql +SELECT * FROM t /*+ OPTIONS('consumer-id' = 'myid', 'consumer.mode' = 'at-least-once') */; +``` + +## Ignore Progress + +Sometimes, you only want the feature of 'Safe Consumption'. You want to get a new snapshot progress when restarting the +stream consumption job , you can enable the `'consumer.ignore-progress'` option. + +```sql +SELECT * FROM t /*+ OPTIONS('consumer-id' = 'myid', 'consumer.ignore-progress' = 'true') */; +``` + +The startup of this job will retrieve the snapshot that should be read again. + +## Consumer Mode + +By default, the consumption of snapshots is strictly aligned within the checkpoint to make 'Resume from breakpoint' +feature exactly-once. + +But in some scenarios where you don't need 'Resume from breakpoint', or you don't need strict 'Resume from breakpoint', +you can consider enabling `'consumer.mode' = 'at-least-once'` mode. This mode: +1. Allow readers consume snapshots at different rates and record the slowest snapshot-id among all readers into the + consumer. It doesn't affect the checkpoint time and have good performance. +2. This mode can provide more capabilities, such as watermark alignment. + +{{< hint >}} +About `'consumer.mode'`, since the implementation of `exactly-once` mode and `at-least-once` mode are completely +different, the state of flink is incompatible and cannot be restored from the state when switching modes. +{{< /hint >}} + +## Rest Consumer + +You can reset or delete a consumer with a given consumer ID and next snapshot ID and delete a consumer with a given +consumer ID. First, you need to stop the streaming task using this consumer ID, and then execute the reset consumer +action job. + +Run the following command: + +{{< tabs "reset_consumer" >}} + +{{< tab "Flink SQL" >}} + +```sql +CALL sys.reset_consumer( + `table` => 'database_name.table_name', + consumer_id => 'consumer_id', + next_snapshot_id => +); +-- No next_snapshot_id if you want to delete the consumer +``` +{{< /tab >}} + +{{< tab "Flink Action" >}} + +```bash +/bin/flink run \ + /path/to/paimon-flink-action-{{< version >}}.jar \ + reset-consumer \ + --warehouse \ + --database \ + --table \ + --consumer_id \ + [--next_snapshot ] \ + [--catalog_conf [--catalog_conf ...]] + +## No next_snapshot if you want to delete the consumer +``` +{{< /tab >}} + +{{< /tabs >}} diff --git a/docs/content/flink/sql-alter.md b/docs/content/flink/sql-alter.md index fe96ec413796..a3dbde541f11 100644 --- a/docs/content/flink/sql-alter.md +++ b/docs/content/flink/sql-alter.md @@ -1,6 +1,6 @@ --- title: "SQL Alter" -weight: 6 +weight: 7 type: docs aliases: - /flink/sql-alter.html diff --git a/docs/content/flink/sql-lookup.md b/docs/content/flink/sql-lookup.md index 296b9c862eee..8b95c6cb1b15 100644 --- a/docs/content/flink/sql-lookup.md +++ b/docs/content/flink/sql-lookup.md @@ -1,6 +1,6 @@ --- title: "SQL Lookup" -weight: 5 +weight: 6 type: docs aliases: - /flink/sql-lookup.html diff --git a/docs/content/flink/sql-query.md b/docs/content/flink/sql-query.md index bb26d5d3c66a..89136b0b0635 100644 --- a/docs/content/flink/sql-query.md +++ b/docs/content/flink/sql-query.md @@ -172,70 +172,6 @@ prevent you from reading older incremental data. So, Paimon also provides anothe SELECT * FROM t /*+ OPTIONS('scan.file-creation-time-millis' = '1678883047356') */; ``` -### Consumer ID - -You can specify the `consumer-id` when streaming read table: -```sql -SELECT * FROM t /*+ OPTIONS('consumer-id' = 'myid', 'consumer.expiration-time' = '1 d', 'consumer.mode' = 'at-least-once') */; -``` - -When stream read Paimon tables, the next snapshot id to be recorded into the file system. This has several advantages: - -1. When previous job is stopped, the newly started job can continue to consume from the previous progress without - resuming from the state. The newly reading will start reading from next snapshot id found in consumer files. - If you don't want this behavior, you can set `'consumer.ignore-progress'` to true. -2. When deciding whether a snapshot has expired, Paimon looks at all the consumers of the table in the file system, - and if there are consumers that still depend on this snapshot, then this snapshot will not be deleted by expiration. - -{{< hint warning >}} -NOTE 1: The consumer will prevent expiration of the snapshot. You can specify `'consumer.expiration-time'` to manage the -lifetime of consumers. - -NOTE 2: If you don't want to affect the checkpoint time, you need to configure `'consumer.mode' = 'at-least-once'`. -This mode allow readers consume snapshots at different rates and record the slowest snapshot-id among all readers into -the consumer. This mode can provide more capabilities, such as watermark alignment. - -NOTE 3: About `'consumer.mode'`, since the implementation of `exactly-once` mode and `at-least-once` mode are completely -different, the state of flink is incompatible and cannot be restored from the state when switching modes. -{{< /hint >}} - -You can reset a consumer with a given consumer ID and next snapshot ID and delete a consumer with a given consumer ID. -First, you need to stop the streaming task using this consumer ID, and then execute the reset consumer action job. - -Run the following command: - -{{< tabs "reset_consumer" >}} - -{{< tab "Flink SQL" >}} - -```sql -CALL sys.reset_consumer( - `table` => 'database_name.table_name', - consumer_id => 'consumer_id', - next_snapshot_id -> -); -``` -{{< /tab >}} - -{{< tab "Flink Action" >}} - -```bash -/bin/flink run \ - /path/to/paimon-flink-action-{{< version >}}.jar \ - reset-consumer \ - --warehouse \ - --database \ - --table \ - --consumer_id \ - [--next_snapshot ] \ - [--catalog_conf [--catalog_conf ...]] -``` -{{< /tab >}} - -{{< /tabs >}} - -please don't specify --next_snapshot parameter if you want to delete the consumer. - ### Read Overwrite Streaming reading will ignore the commits generated by `INSERT OVERWRITE` by default. If you want to read the