Skip to content

Commit

Permalink
[doc] Add Flink Procedure with named argument to documents (apache#4167)
Browse files Browse the repository at this point in the history
  • Loading branch information
yunfengzhou-hub authored Sep 13, 2024
1 parent 15d3302 commit 5ee9373
Show file tree
Hide file tree
Showing 9 changed files with 352 additions and 105 deletions.
126 changes: 109 additions & 17 deletions docs/content/flink/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ All available procedures are listed below.
<tr>
<td>compact</td>
<td>
-- Use named argument<br/>
CALL [catalog.]sys.compact(
`table` => 'table',
partitions => 'partitions',
order_strategy => 'order_strategy',
order_by => 'order_by',
options => 'options',
`where` => 'where',
partition_idle_time => 'partition_idle_time') <br/><br/>
-- Use indexed argument<br/>
CALL [catalog.]sys.compact('table') <br/><br/>
CALL [catalog.]sys.compact('table', 'partitions') <br/><br/>
CALL [catalog.]sys.compact('table', 'partitions', 'order_strategy', 'order_by') <br/><br/>
Expand Down Expand Up @@ -86,12 +96,21 @@ All available procedures are listed below.
<tr>
<td>compact_database</td>
<td>
-- Use named argument<br/>
CALL [catalog.]sys.compact_database(
including_databases => 'includingDatabases',
mode => 'mode',
including_tables => 'includingTables',
excluding_tables => 'excludingTables',
table_options => 'tableOptions',
partition_idle_time => 'partitionIdleTime') <br/><br/>
-- Use indexed argument<br/>
CALL [catalog.]sys.compact_database() <br/><br/>
CALL [catalog.]sys.compact_database('includingDatabases') <br/><br/>
CALL [catalog.]sys.compact_database('includingDatabases', 'mode') <br/><br/>
CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables') <br/><br/>
CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables') <br/><br/>
CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables', 'tableOptions')
CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables', 'tableOptions') <br/><br/>
CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables', 'tableOptions', 'partitionIdleTime')
</td>
<td>
Expand All @@ -106,12 +125,23 @@ All available procedures are listed below.
<li>partition_idle_time: this is used to do a full compaction for partition which had not received any new data for 'partition_idle_time'. And only these partitions will be compacted.</li>
</td>
<td>
CALL sys.compact_database('db1|db2', 'combined', 'table_.*', 'ignore', 'sink.parallelism=4')
CALL sys.compact_database(
including_databases => 'db1|db2',
mode => 'combined',
including_tables => 'table_.*',
excluding_tables => 'ignore',
table_options => 'sink.parallelism=4')
</td>
</tr>
<tr>
<td>create_tag</td>
<td>
-- Use named argument<br/>
-- based on the specified snapshot <br/>
CALL [catalog.]sys.create_tag(`table` => 'identifier', tag => 'tagName', snapshot_id => snapshotId) <br/>
-- based on the latest snapshot <br/>
CALL [catalog.]sys.create_tag(`table` => 'identifier', snapshot_id => 'tagName') <br/><br/>
-- Use indexed argument<br/>
-- based on the specified snapshot <br/>
CALL [catalog.]sys.create_tag('identifier', 'tagName', snapshotId) <br/>
-- based on the latest snapshot <br/>
Expand All @@ -125,13 +155,16 @@ All available procedures are listed below.
<li>time_retained: The maximum time retained for newly created tags.</li>
</td>
<td>
CALL sys.create_tag('default.T', 'my_tag', 10, '1 d')
CALL sys.create_tag(`table` => 'default.T', tag => 'my_tag', snapshot_id => cast(10 as bigint), time_retained => '1 d')
</td>
</tr>
<tr>
<td>create_tag_from_timestamp</td>
<td>
-- Create a tag from the first snapshot whose commit-time greater than the specified timestamp. <br/>
-- Use named argument<br/>
CALL [catalog.]sys.create_tag_from_timestamp(`table` => 'identifier', tag => 'tagName', timestamp => timestamp, time_retained => time_retained) <br/><br/>
-- Use indexed argument<br/>
CALL [catalog.]sys.create_tag_from_timestamp('identifier', 'tagName', timestamp, time_retained)
</td>
<td>
Expand All @@ -152,6 +185,9 @@ All available procedures are listed below.
<td>create_tag_from_watermark</td>
<td>
-- Create a tag from the first snapshot whose watermark greater than the specified timestamp.<br/>
-- Use named argument<br/>
CALL [catalog.]sys.create_tag_from_watermark(`table` => 'identifier', tag => 'tagName', watermark => watermark, time_retained => time_retained) <br/><br/>
-- Use indexed argument<br/>
CALL [catalog.]sys.create_tag_from_watermark('identifier', 'tagName', watermark, time_retained)
</td>
<td>
Expand All @@ -171,6 +207,9 @@ All available procedures are listed below.
<tr>
<td>delete_tag</td>
<td>
-- Use named argument<br/>
CALL [catalog.]sys.delete_tag(`table` => 'identifier', tag => 'tagName') <br/><br/>
-- Use indexed argument<br/>
CALL [catalog.]sys.delete_tag('identifier', 'tagName')
</td>
<td>
Expand All @@ -179,17 +218,30 @@ All available procedures are listed below.
<li>tagName: name of the tag to be deleted. If you specify multiple tags, delimiter is ','.</li>
</td>
<td>
CALL sys.delete_tag('default.T', 'my_tag')
CALL sys.delete_tag(`table` => 'default.T', tag => 'my_tag')
</td>
</tr>
<tr>
<td>merge_into</td>
<td>
-- for Flink 1.18<br/>
CALL [catalog].sys.merge_into('identifier','targetAlias',<br/>
'sourceSqls','sourceTable','mergeCondition',<br/>
'matchedUpsertCondition','matchedUpsertSetting',<br/>
'notMatchedInsertCondition','notMatchedInsertValues',<br/>
'matchedDeleteCondition')<br/><br/>
-- for Flink 1.19 and later <br/>
CALL [catalog].sys.merge_into(<br/>
target_table => 'identifier',<br/>
target_alias => 'targetAlias',<br/>
source_sqls => 'sourceSqls',<br/>
source_table => 'sourceTable',<br/>
merge_condition => 'mergeCondition',<br/>
matched_upsert_condition => 'matchedUpsertCondition',<br/>
matched_upsert_setting => 'matchedUpsertSetting',<br/>
not_matched_insert_condition => 'notMatchedInsertCondition',<br/>
not_matched_insert_values => 'notMatchedInsertValues',<br/>
matched_delete_condition => 'matchedDeleteCondition') <br/><br/>
</td>
<td>
To perform "MERGE INTO" syntax. See <a href="/how-to/writing-tables#merging-into-table">merge_into action</a> for
Expand All @@ -201,6 +253,9 @@ All available procedures are listed below.
-- and if there is no match,<br/>
-- insert the order from<br/>
-- the source table<br/>
-- for Flink 1.18<br/>
CALL [catalog].sys.merge_into('default.T','','','default.S','T.id=S.order_id','','price=T.price+20','','*','')<br/><br/>
-- for Flink 1.19 and later <br/>
CALL sys.merge_into(<br/>
target_table => 'default.T',<br/>
source_table => 'default.S',<br/>
Expand All @@ -212,6 +267,9 @@ All available procedures are listed below.
<tr>
<td>remove_orphan_files</td>
<td>
-- Use named argument<br/>
CALL [catalog.]sys.remove_orphan_files(`table` => 'identifier', older_than => 'olderThan', dry_run => 'dryRun') <br/><br/>
-- Use indexed argument<br/>
CALL [catalog.]sys.remove_orphan_files('identifier')<br/><br/>
CALL [catalog.]sys.remove_orphan_files('identifier', 'olderThan')<br/><br/>
CALL [catalog.]sys.remove_orphan_files('identifier', 'olderThan', 'dryRun')<br/><br/>
Expand All @@ -226,15 +284,18 @@ All available procedures are listed below.
<li>dryRun: when true, view only orphan files, don't actually remove files. Default is false.</li>
<li>parallelism: The maximum number of concurrent deleting files. By default is the number of processors available to the Java virtual machine.</li>
</td>
<td>CALL remove_orphan_files('default.T', '2023-10-31 12:00:00')<br/><br/>
CALL remove_orphan_files('default.*', '2023-10-31 12:00:00')<br/><br/>
CALL remove_orphan_files('default.T', '2023-10-31 12:00:00', true)<br/><br/>
CALL remove_orphan_files('default.T', '2023-10-31 12:00:00', false, '5')
<td>CALL remove_orphan_files(`table` => 'default.T', older_than => '2023-10-31 12:00:00')<br/><br/>
CALL remove_orphan_files(`table` => 'default.*', older_than => '2023-10-31 12:00:00')<br/><br/>
CALL remove_orphan_files(`table` => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => true)<br/><br/>
CALL remove_orphan_files(`table` => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => false, parallelism => '5')
</td>
</tr>
<tr>
<td>reset_consumer</td>
<td>
-- Use named argument<br/>
CALL [catalog.]sys.reset_consumer(`table` => 'identifier', consumer_id => 'consumerId', next_snapshot_id => 'nextSnapshotId') <br/><br/>
-- Use indexed argument<br/>
-- reset the new next snapshot id in the consumer<br/>
CALL [catalog.]sys.reset_consumer('identifier', 'consumerId', nextSnapshotId)<br/><br/>
-- delete consumer<br/>
Expand All @@ -246,11 +307,17 @@ All available procedures are listed below.
<li>consumerId: consumer to be reset or deleted.</li>
<li>nextSnapshotId (Long): the new next snapshot id of the consumer.</li>
</td>
<td>CALL sys.reset_consumer('default.T', 'myid', 10)</td>
<td>CALL sys.reset_consumer(`table` => 'default.T', consumer_id => 'myid', next_snapshot_id => cast(10 as bigint))</td>
</tr>
<tr>
<td>rollback_to</td>
<td>
-- for Flink 1.18<br/>
-- rollback to a snapshot<br/>
CALL sys.rollback_to('identifier', snapshotId)<br/><br/>
-- rollback to a tag<br/>
CALL sys.rollback_to('identifier', 'tagName')<br/><br/>
-- for Flink 1.19 and later<br/>
-- rollback to a snapshot<br/>
CALL sys.rollback_to(`table` => 'identifier', snapshot_id => snapshotId)<br/><br/>
-- rollback to a tag<br/>
Expand All @@ -262,11 +329,24 @@ All available procedures are listed below.
<li>snapshotId (Long): id of the snapshot that will roll back to.</li>
<li>tagName: name of the tag that will roll back to.</li>
</td>
<td>CALL sys.rollback_to(`table` => 'default.T', snapshot_id => 10)</td>
<td>
-- for Flink 1.18<br/>
CALL sys.rollback_to('default.T', 10)
-- for Flink 1.19 and later<br/>
CALL sys.rollback_to(`table` => 'default.T', snapshot_id => 10)
</td>
</tr>
<tr>
<td>expire_snapshots</td>
<td>
-- Use named argument<br/>
CALL [catalog.]sys.reset_consumer(<br/>
`table` => 'identifier', <br/>
retain_max => 'retain_max', <br/>
retain_min => 'retain_min', <br/>
older_than => 'older_than', <br/>
max_deletes => 'max_deletes') <br/><br/>
-- Use indexed argument<br/>
-- for Flink 1.18<br/>
CALL sys.expire_snapshots(table, retain_max)<br/><br/>
-- for Flink 1.19 and later<br/>
Expand Down Expand Up @@ -329,11 +409,14 @@ All available procedures are listed below.
<li>databaseName : the target database name.</li>
<li>tableName: the target table identifier.</li>
</td>
<td>CALL sys.repair('test_db.T')</td>
<td>CALL sys.repair(`table` => 'test_db.T')</td>
</tr>
<tr>
<td>rewrite_file_index</td>
<td>
-- Use named argument<br/>
CALL sys.rewrite_file_index(&lt`table` => identifier&gt [, &ltpartitions => partitions&gt])<br/><br/>
-- Use indexed argument<br/>
CALL sys.rewrite_file_index(&ltidentifier&gt [, &ltpartitions&gt])<br/><br/>
</td>
<td>
Expand All @@ -343,13 +426,16 @@ All available procedures are listed below.
</td>
<td>
-- rewrite the file index for the whole table<br/>
CALL sys.rewrite_file_index('test_db.T')<br/><br/>
CALL sys.rewrite_file_index(`table` => 'test_db.T')<br/><br/>
-- repair all tables in a specific partition<br/>
CALL sys.rewrite_file_index('test_db.T', 'pt=a')<br/><br/>
CALL sys.rewrite_file_index(`table` => 'test_db.T', partitions => 'pt=a')<br/><br/>
</td>
<tr>
<td>create_branch</td>
<td>
-- Use named argument<br/>
CALL [catalog.]sys.create_branch(`table` => 'identifier', branch => 'branchName', tag => 'tagName')<br/><br/>
-- Use indexed argument<br/>
-- based on the specified tag <br/>
CALL [catalog.]sys.create_branch('identifier', 'branchName', 'tagName')
-- create empty branch <br/>
Expand All @@ -362,13 +448,16 @@ All available procedures are listed below.
<li>tagName: name of the tag which the new branch is based on.</li>
</td>
<td>
CALL sys.create_branch('default.T', 'branch1', 'tag1')<br/><br/>
CALL sys.create_branch('default.T', 'branch1')<br/><br/>
CALL sys.create_branch(`table` => 'default.T', branch => 'branch1', tag => 'tag1')<br/><br/>
CALL sys.create_branch(`table` => 'default.T', branch => 'branch1')<br/><br/>
</td>
</tr>
<tr>
<td>delete_branch</td>
<td>
-- Use named argument<br/>
CALL [catalog.]sys.delete_branch(`table` => 'identifier', branch => 'branchName')<br/><br/>
-- Use indexed argument<br/>
CALL [catalog.]sys.delete_branch('identifier', 'branchName')
</td>
<td>
Expand All @@ -377,12 +466,15 @@ All available procedures are listed below.
<li>branchName: name of the branch to be deleted. If you specify multiple branches, delimiter is ','.</li>
</td>
<td>
CALL sys.delete_branch('default.T', 'branch1')
CALL sys.delete_branch(`table` => 'default.T', branch => 'branch1')
</td>
</tr>
<tr>
<td>fast_forward</td>
<td>
-- Use named argument<br/>
CALL [catalog.]sys.fast_forward(`table` => 'identifier', branch => 'branchName')<br/><br/>
-- Use indexed argument<br/>
CALL [catalog.]sys.fast_forward('identifier', 'branchName')
</td>
<td>
Expand All @@ -391,7 +483,7 @@ All available procedures are listed below.
<li>branchName: name of the branch to be merged.</li>
</td>
<td>
CALL sys.fast_forward('default.T', 'branch1')
CALL sys.fast_forward(`table` => 'default.T', branch => 'branch1')
</td>
</tr>
</tbody>
Expand Down
18 changes: 18 additions & 0 deletions docs/content/flink/sql-query.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,21 @@ First, you need to stop the streaming task using this consumer ID, and then exec

Run the following command:

{{< tabs "reset_consumer" >}}

{{< tab "Flink SQL" >}}

```sql
CALL sys.reset_consumer(
`table` => 'database_name.table_name',
consumer_id => 'consumer_id',
next_snapshot_id -> <snapshot_id>
);
```
{{< /tab >}}

{{< tab "Flink Action" >}}

```bash
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-{{< version >}}.jar \
Expand All @@ -215,6 +230,9 @@ Run the following command:
[--next_snapshot <next-snapshot-id>] \
[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]
```
{{< /tab >}}
{{< /tabs >}}
please don't specify --next_snapshot parameter if you want to delete the consumer.
Expand Down
33 changes: 33 additions & 0 deletions docs/content/learn-paimon/understand-files.md
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,22 @@ is needed in order to reduce the number of small files.
Let's trigger the full-compaction now, and run a dedicated compaction job through `flink run`:

{{< label Batch >}}

{{< tabs "compact" >}}

{{< tab "Flink SQL" >}}
```sql
CALL sys.compact(
`table` => 'database_name.table_name',
partitions => 'partition_name',
order_strategy => 'order_strategy',
order_by => 'order_by',
options => 'paimon_table_dynamic_conf'
);
```
{{< /tab >}}

{{< tab "Flink Action" >}}
```bash
<FLINK_HOME>/bin/flink run \
-D execution.runtime-mode=batch \
Expand All @@ -257,15 +273,32 @@ Let's trigger the full-compaction now, and run a dedicated compaction job throug
[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]] \
[--table_conf <paimon-table-dynamic-conf> [--table_conf <paimon-table-dynamic-conf>] ...]
```
{{< /tab >}}
{{< /tabs >}}
an example would be (suppose you're already in Flink home)
{{< tabs "compact example" >}}
{{< tab "Flink SQL" >}}
```sql
CALL sys.compact('T');
```
{{< /tab >}}
{{< tab "Flink Action" >}}
```bash
./bin/flink run \
./lib/paimon-flink-action-{{< version >}}.jar \
compact \
--path file:///tmp/paimon/default.db/T
```
{{< /tab >}}
{{< /tabs >}}
All current table files will be compacted and a new snapshot, namely `snapshot-4`, is
made and contains the following information:
Expand Down
Loading

0 comments on commit 5ee9373

Please sign in to comment.