diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index c3018600c99f..430b42c3bd7f 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -59,6 +59,16 @@ All available procedures are listed below.
compact |
+ -- Use named argument
+ CALL [catalog.]sys.compact(
+ `table` => 'table',
+ partitions => 'partitions',
+ order_strategy => 'order_strategy',
+ order_by => 'order_by',
+ options => 'options',
+ `where` => 'where',
+ partition_idle_time => 'partition_idle_time')
+ -- Use indexed argument
CALL [catalog.]sys.compact('table')
CALL [catalog.]sys.compact('table', 'partitions')
CALL [catalog.]sys.compact('table', 'partitions', 'order_strategy', 'order_by')
@@ -86,12 +96,21 @@ All available procedures are listed below.
|
compact_database |
+ -- Use named argument
+ CALL [catalog.]sys.compact_database(
+ including_databases => 'includingDatabases',
+ mode => 'mode',
+ including_tables => 'includingTables',
+ excluding_tables => 'excludingTables',
+ table_options => 'tableOptions',
+ partition_idle_time => 'partitionIdleTime')
+ -- Use indexed argument
CALL [catalog.]sys.compact_database()
CALL [catalog.]sys.compact_database('includingDatabases')
CALL [catalog.]sys.compact_database('includingDatabases', 'mode')
CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables')
CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables')
- CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables', 'tableOptions')
+ CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables', 'tableOptions')
CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables', 'tableOptions', 'partitionIdleTime')
|
@@ -106,12 +125,23 @@ All available procedures are listed below.
partition_idle_time: this is used to do a full compaction for partition which had not received any new data for 'partition_idle_time'. And only these partitions will be compacted.
|
- CALL sys.compact_database('db1|db2', 'combined', 'table_.*', 'ignore', 'sink.parallelism=4')
+ CALL sys.compact_database(
+ including_databases => 'db1|db2',
+ mode => 'combined',
+ including_tables => 'table_.*',
+ excluding_tables => 'ignore',
+ table_options => 'sink.parallelism=4')
|
create_tag |
+ -- Use named argument
+ -- based on the specified snapshot
+ CALL [catalog.]sys.create_tag(`table` => 'identifier', tag => 'tagName', snapshot_id => snapshotId)
+ -- based on the latest snapshot
+ CALL [catalog.]sys.create_tag(`table` => 'identifier', snapshot_id => 'tagName')
+ -- Use indexed argument
-- based on the specified snapshot
CALL [catalog.]sys.create_tag('identifier', 'tagName', snapshotId)
-- based on the latest snapshot
@@ -125,13 +155,16 @@ All available procedures are listed below.
time_retained: The maximum time retained for newly created tags.
|
- CALL sys.create_tag('default.T', 'my_tag', 10, '1 d')
+ CALL sys.create_tag(`table` => 'default.T', tag => 'my_tag', snapshot_id => cast(10 as bigint), time_retained => '1 d')
|
create_tag_from_timestamp |
-- Create a tag from the first snapshot whose commit-time greater than the specified timestamp.
+ -- Use named argument
+ CALL [catalog.]sys.create_tag_from_timestamp(`table` => 'identifier', tag => 'tagName', timestamp => timestamp, time_retained => time_retained)
+ -- Use indexed argument
CALL [catalog.]sys.create_tag_from_timestamp('identifier', 'tagName', timestamp, time_retained)
|
@@ -152,6 +185,9 @@ All available procedures are listed below.
| create_tag_from_watermark |
-- Create a tag from the first snapshot whose watermark greater than the specified timestamp.
+ -- Use named argument
+ CALL [catalog.]sys.create_tag_from_watermark(`table` => 'identifier', tag => 'tagName', watermark => watermark, time_retained => time_retained)
+ -- Use indexed argument
CALL [catalog.]sys.create_tag_from_watermark('identifier', 'tagName', watermark, time_retained)
|
@@ -171,6 +207,9 @@ All available procedures are listed below.
|
delete_tag |
+ -- Use named argument
+ CALL [catalog.]sys.delete_tag(`table` => 'identifier', tag => 'tagName')
+ -- Use indexed argument
CALL [catalog.]sys.delete_tag('identifier', 'tagName')
|
@@ -179,17 +218,30 @@ All available procedures are listed below.
tagName: name of the tag to be deleted. If you specify multiple tags, delimiter is ','.
|
- CALL sys.delete_tag('default.T', 'my_tag')
+ CALL sys.delete_tag(`table` => 'default.T', tag => 'my_tag')
|
merge_into |
+ -- for Flink 1.18
CALL [catalog].sys.merge_into('identifier','targetAlias',
'sourceSqls','sourceTable','mergeCondition',
'matchedUpsertCondition','matchedUpsertSetting',
'notMatchedInsertCondition','notMatchedInsertValues',
'matchedDeleteCondition')
+ -- for Flink 1.19 and later
+ CALL [catalog].sys.merge_into(
+ target_table => 'identifier',
+ target_alias => 'targetAlias',
+ source_sqls => 'sourceSqls',
+ source_table => 'sourceTable',
+ merge_condition => 'mergeCondition',
+ matched_upsert_condition => 'matchedUpsertCondition',
+ matched_upsert_setting => 'matchedUpsertSetting',
+ not_matched_insert_condition => 'notMatchedInsertCondition',
+ not_matched_insert_values => 'notMatchedInsertValues',
+ matched_delete_condition => 'matchedDeleteCondition')
|
To perform "MERGE INTO" syntax. See merge_into action for
@@ -201,6 +253,9 @@ All available procedures are listed below.
-- and if there is no match,
-- insert the order from
-- the source table
+ -- for Flink 1.18
+ CALL [catalog].sys.merge_into('default.T','','','default.S','T.id=S.order_id','','price=T.price+20','','*','')
+ -- for Flink 1.19 and later
CALL sys.merge_into(
target_table => 'default.T',
source_table => 'default.S',
@@ -212,6 +267,9 @@ All available procedures are listed below.
|
remove_orphan_files |
+ -- Use named argument
+ CALL [catalog.]sys.remove_orphan_files(`table` => 'identifier', older_than => 'olderThan', dry_run => 'dryRun')
+ -- Use indexed argument
CALL [catalog.]sys.remove_orphan_files('identifier')
CALL [catalog.]sys.remove_orphan_files('identifier', 'olderThan')
CALL [catalog.]sys.remove_orphan_files('identifier', 'olderThan', 'dryRun')
@@ -226,15 +284,18 @@ All available procedures are listed below.
dryRun: when true, view only orphan files, don't actually remove files. Default is false.
parallelism: The maximum number of concurrent deleting files. By default is the number of processors available to the Java virtual machine.
|
- CALL remove_orphan_files('default.T', '2023-10-31 12:00:00')
- CALL remove_orphan_files('default.*', '2023-10-31 12:00:00')
- CALL remove_orphan_files('default.T', '2023-10-31 12:00:00', true)
- CALL remove_orphan_files('default.T', '2023-10-31 12:00:00', false, '5')
+ | CALL remove_orphan_files(`table` => 'default.T', older_than => '2023-10-31 12:00:00')
+ CALL remove_orphan_files(`table` => 'default.*', older_than => '2023-10-31 12:00:00')
+ CALL remove_orphan_files(`table` => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => true)
+ CALL remove_orphan_files(`table` => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => false, parallelism => '5')
|
reset_consumer |
+ -- Use named argument
+ CALL [catalog.]sys.reset_consumer(`table` => 'identifier', consumer_id => 'consumerId', next_snapshot_id => 'nextSnapshotId')
+ -- Use indexed argument
-- reset the new next snapshot id in the consumer
CALL [catalog.]sys.reset_consumer('identifier', 'consumerId', nextSnapshotId)
-- delete consumer
@@ -246,11 +307,17 @@ All available procedures are listed below.
consumerId: consumer to be reset or deleted.
nextSnapshotId (Long): the new next snapshot id of the consumer.
|
- CALL sys.reset_consumer('default.T', 'myid', 10) |
+ CALL sys.reset_consumer(`table` => 'default.T', consumer_id => 'myid', next_snapshot_id => cast(10 as bigint)) |
rollback_to |
+ -- for Flink 1.18
+ -- rollback to a snapshot
+ CALL sys.rollback_to('identifier', snapshotId)
+ -- rollback to a tag
+ CALL sys.rollback_to('identifier', 'tagName')
+ -- for Flink 1.19 and later
-- rollback to a snapshot
CALL sys.rollback_to(`table` => 'identifier', snapshot_id => snapshotId)
-- rollback to a tag
@@ -262,11 +329,24 @@ All available procedures are listed below.
snapshotId (Long): id of the snapshot that will roll back to.
tagName: name of the tag that will roll back to.
|
- CALL sys.rollback_to(`table` => 'default.T', snapshot_id => 10) |
+
+ -- for Flink 1.18
+ CALL sys.rollback_to('default.T', 10)
+ -- for Flink 1.19 and later
+ CALL sys.rollback_to(`table` => 'default.T', snapshot_id => 10)
+ |
expire_snapshots |
+ -- Use named argument
+ CALL [catalog.]sys.reset_consumer(
+ `table` => 'identifier',
+ retain_max => 'retain_max',
+ retain_min => 'retain_min',
+ older_than => 'older_than',
+ max_deletes => 'max_deletes')
+ -- Use indexed argument
-- for Flink 1.18
CALL sys.expire_snapshots(table, retain_max)
-- for Flink 1.19 and later
@@ -329,11 +409,14 @@ All available procedures are listed below.
databaseName : the target database name.
tableName: the target table identifier.
|
- CALL sys.repair('test_db.T') |
+ CALL sys.repair(`table` => 'test_db.T') |
rewrite_file_index |
+ -- Use named argument
+ CALL sys.rewrite_file_index(<`table` => identifier> [, <partitions => partitions>])
+ -- Use indexed argument
CALL sys.rewrite_file_index(<identifier> [, <partitions>])
|
@@ -343,13 +426,16 @@ All available procedures are listed below.
|
-- rewrite the file index for the whole table
- CALL sys.rewrite_file_index('test_db.T')
+ CALL sys.rewrite_file_index(`table` => 'test_db.T')
-- repair all tables in a specific partition
- CALL sys.rewrite_file_index('test_db.T', 'pt=a')
+ CALL sys.rewrite_file_index(`table` => 'test_db.T', partitions => 'pt=a')
|
create_branch |
+ -- Use named argument
+ CALL [catalog.]sys.create_branch(`table` => 'identifier', branch => 'branchName', tag => 'tagName')
+ -- Use indexed argument
-- based on the specified tag
CALL [catalog.]sys.create_branch('identifier', 'branchName', 'tagName')
-- create empty branch
@@ -362,13 +448,16 @@ All available procedures are listed below.
tagName: name of the tag which the new branch is based on.
|
- CALL sys.create_branch('default.T', 'branch1', 'tag1')
- CALL sys.create_branch('default.T', 'branch1')
+ CALL sys.create_branch(`table` => 'default.T', branch => 'branch1', tag => 'tag1')
+ CALL sys.create_branch(`table` => 'default.T', branch => 'branch1')
|
delete_branch |
+ -- Use named argument
+ CALL [catalog.]sys.delete_branch(`table` => 'identifier', branch => 'branchName')
+ -- Use indexed argument
CALL [catalog.]sys.delete_branch('identifier', 'branchName')
|
@@ -377,12 +466,15 @@ All available procedures are listed below.
branchName: name of the branch to be deleted. If you specify multiple branches, delimiter is ','.
|
- CALL sys.delete_branch('default.T', 'branch1')
+ CALL sys.delete_branch(`table` => 'default.T', branch => 'branch1')
|
fast_forward |
+ -- Use named argument
+ CALL [catalog.]sys.fast_forward(`table` => 'identifier', branch => 'branchName')
+ -- Use indexed argument
CALL [catalog.]sys.fast_forward('identifier', 'branchName')
|
@@ -391,7 +483,7 @@ All available procedures are listed below.
branchName: name of the branch to be merged.
|
- CALL sys.fast_forward('default.T', 'branch1')
+ CALL sys.fast_forward(`table` => 'default.T', branch => 'branch1')
|
diff --git a/docs/content/flink/sql-query.md b/docs/content/flink/sql-query.md
index e58d98d62063..bb26d5d3c66a 100644
--- a/docs/content/flink/sql-query.md
+++ b/docs/content/flink/sql-query.md
@@ -204,6 +204,21 @@ First, you need to stop the streaming task using this consumer ID, and then exec
Run the following command:
+{{< tabs "reset_consumer" >}}
+
+{{< tab "Flink SQL" >}}
+
+```sql
+CALL sys.reset_consumer(
+ `table` => 'database_name.table_name',
+ consumer_id => 'consumer_id',
+ next_snapshot_id ->
+);
+```
+{{< /tab >}}
+
+{{< tab "Flink Action" >}}
+
```bash
/bin/flink run \
/path/to/paimon-flink-action-{{< version >}}.jar \
@@ -215,6 +230,9 @@ Run the following command:
[--next_snapshot ] \
[--catalog_conf [--catalog_conf ...]]
```
+{{< /tab >}}
+
+{{< /tabs >}}
please don't specify --next_snapshot parameter if you want to delete the consumer.
diff --git a/docs/content/learn-paimon/understand-files.md b/docs/content/learn-paimon/understand-files.md
index d9e437b0d4e1..b18c259993a7 100644
--- a/docs/content/learn-paimon/understand-files.md
+++ b/docs/content/learn-paimon/understand-files.md
@@ -245,6 +245,22 @@ is needed in order to reduce the number of small files.
Let's trigger the full-compaction now, and run a dedicated compaction job through `flink run`:
{{< label Batch >}}
+
+{{< tabs "compact" >}}
+
+{{< tab "Flink SQL" >}}
+```sql
+CALL sys.compact(
+ `table` => 'database_name.table_name',
+ partitions => 'partition_name',
+ order_strategy => 'order_strategy',
+ order_by => 'order_by',
+ options => 'paimon_table_dynamic_conf'
+);
+```
+{{< /tab >}}
+
+{{< tab "Flink Action" >}}
```bash
/bin/flink run \
-D execution.runtime-mode=batch \
@@ -257,15 +273,32 @@ Let's trigger the full-compaction now, and run a dedicated compaction job throug
[--catalog_conf [--catalog_conf ...]] \
[--table_conf [--table_conf ] ...]
```
+{{< /tab >}}
+
+{{< /tabs >}}
an example would be (suppose you're already in Flink home)
+{{< tabs "compact example" >}}
+
+{{< tab "Flink SQL" >}}
+
+```sql
+CALL sys.compact('T');
+```
+{{< /tab >}}
+
+{{< tab "Flink Action" >}}
+
```bash
./bin/flink run \
./lib/paimon-flink-action-{{< version >}}.jar \
compact \
--path file:///tmp/paimon/default.db/T
```
+{{< /tab >}}
+
+{{< /tabs >}}
All current table files will be compacted and a new snapshot, namely `snapshot-4`, is
made and contains the following information:
diff --git a/docs/content/maintenance/dedicated-compaction.md b/docs/content/maintenance/dedicated-compaction.md
index 819ce448bb1d..471bdad22275 100644
--- a/docs/content/maintenance/dedicated-compaction.md
+++ b/docs/content/maintenance/dedicated-compaction.md
@@ -81,6 +81,20 @@ To run a dedicated job for compaction, follow these instructions.
{{< tabs "dedicated-compaction-job" >}}
+{{< tab "Flink SQL" >}}
+
+Run the following sql:
+
+```sql
+CALL sys.compact(
+ `table` => 'default.T',
+ partitions => 'p=0',
+ options => 'sink.parallelism=4',
+ `where` => 'dt>10 and h<20'
+);
+```
+{{< /tab >}}
+
{{< tab "Flink Action Jar" >}}
Run the following command to submit a compaction job for the table.
@@ -128,25 +142,6 @@ For more usage of the compact action, see
{{< /tab >}}
-{{< tab "Flink SQL" >}}
-
-Run the following sql:
-
-```sql
--- compact table
-CALL sys.compact(`table` => 'default.T');
-
--- compact table with options
-CALL sys.compact(`table` => 'default.T', `options` => 'sink.parallelism=4');
-
--- compact table partition
-CALL sys.compact(`table` => 'default.T', `partitions` => 'p=0');
-
--- compact table partition with filter
-CALL sys.compact(`table` => 'default.T', `where` => 'dt>10 and h<20');
-```
-{{< /tab >}}
-
{{< /tabs >}}
{{< hint info >}}
@@ -160,6 +155,30 @@ You can run the following command to submit a compaction job for multiple databa
{{< tabs "database-compaction-job" >}}
+{{< tab "Flink SQL" >}}
+
+Run the following sql:
+
+```sql
+CALL sys.compact_database(
+ including_databases => 'includingDatabases',
+ mode => 'mode',
+ including_tables => 'includingTables',
+ excluding_tables => 'excludingTables',
+ table_options => 'tableOptions'
+)
+
+-- example
+CALL sys.compact_database(
+ including_databases => 'db1|db2',
+ mode => 'combined',
+ including_tables => 'table_.*',
+ excluding_tables => 'ignore',
+ table_options => 'sink.parallelism=4'
+)
+```
+{{< /tab >}}
+
{{< tab "Flink Action Jar" >}}
```bash
@@ -243,26 +262,6 @@ For more usage of the compact_database action, see
{{< /tab >}}
-{{< tab "Flink SQL" >}}
-
-Run the following sql:
-
-```sql
-CALL sys.compact_database('includingDatabases')
-
-CALL sys.compact_database('includingDatabases', 'mode')
-
-CALL sys.compact_database('includingDatabases', 'mode', 'includingTables')
-
-CALL sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables')
-
-CALL sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables', 'tableOptions')
-
--- example
-CALL sys.compact_database('db1|db2', 'combined', 'table_.*', 'ignore', 'sink.parallelism=4')
-```
-{{< /tab >}}
-
{{< /tabs >}}
## Sort Compact
@@ -273,6 +272,16 @@ you can trigger a compact with specified column sort to speed up queries.
{{< tabs "sort-compaction-job" >}}
+{{< tab "Flink SQL" >}}
+
+Run the following sql:
+
+```sql
+-- sort compact table
+CALL sys.compact(`table` => 'default.T', order_strategy => 'zorder', order_by => 'a,b')
+```
+{{< /tab >}}
+
{{< tab "Flink Action Jar" >}}
```bash
@@ -296,16 +305,6 @@ The sort parallelism is the same as the sink parallelism, you can dynamically sp
{{< /tab >}}
-{{< tab "Flink SQL" >}}
-
-Run the following sql:
-
-```sql
--- sort compact table
-CALL sys.compact(`table` => 'default.T', order_strategy => 'zorder', order_by => 'a,b')
-```
-{{< /tab >}}
-
{{< /tabs >}}
## Historical Partition Compact
@@ -324,6 +323,17 @@ This feature now is only used in batch mode.
This is for one table.
{{< tabs "history-partition-compaction-job for table" >}}
+{{< tab "Flink SQL" >}}
+
+Run the following sql:
+
+```sql
+-- history partition compact table
+CALL sys.compact(`table` => 'default.T', 'partition_idle_time' => '1 d')
+```
+
+{{< /tab >}}
+
{{< tab "Flink Action Jar" >}}
```bash
@@ -347,23 +357,41 @@ There are one new configuration in `Historical Partition Compact`
{{< /tab >}}
+{{< /tabs >}}
+
+### For Databases
+
+This is for multiple tables in different databases.
+{{< tabs "history-partition-compaction-job for databases" >}}
+
{{< tab "Flink SQL" >}}
Run the following sql:
```sql
-- history partition compact table
-CALL sys.compact(`table` => 'default.T', 'partition_idle_time' => '1 d')
+CALL sys.compact_database(
+ including_databases => 'includingDatabases',
+ mode => 'mode',
+ including_tables => 'includingTables',
+ excluding_tables => 'excludingTables',
+ table_options => 'tableOptions',
+ partition_idle_time => 'partition_idle_time'
+);
```
-{{< /tab >}}
-
-{{< /tabs >}}
+Example: compact historical partitions for tables in database
-### For Databases
+```sql
+-- history partition compact table
+CALL sys.compact_database(
+ includingDatabases => 'test_db',
+ mode => 'combined',
+ partition_idle_time => '1 d'
+);
+```
-This is for multiple tables in different databases.
-{{< tabs "history-partition-compaction-job for databases" >}}
+{{< /tab >}}
{{< tab "Flink Action Jar" >}}
@@ -398,22 +426,4 @@ Example: compact historical partitions for tables in database
{{< /tab >}}
-{{< tab "Flink SQL" >}}
-
-Run the following sql:
-
-```sql
--- history partition compact table
-CALL sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables', 'tableOptions', 'partition_idle_time')
-```
-
-Example: compact historical partitions for tables in database
-
-```sql
--- history partition compact table
-CALL sys.compact_database('test_db', 'combined', '', '', '', '1 d')
-```
-
-{{< /tab >}}
-
{{< /tabs >}}
\ No newline at end of file
diff --git a/docs/content/maintenance/manage-branches.md b/docs/content/maintenance/manage-branches.md
index 5b1fa7ba1d2b..b34a881b0f03 100644
--- a/docs/content/maintenance/manage-branches.md
+++ b/docs/content/maintenance/manage-branches.md
@@ -38,7 +38,7 @@ Paimon supports creating branch from a specific tag or snapshot, or just creatin
{{< tabs "create-branches" >}}
-{{< tab "Flink" >}}
+{{< tab "Flink SQL" >}}
Run the following sql:
@@ -89,7 +89,7 @@ You can delete branch by its name.
{{< tabs "delete-branches" >}}
-{{< tab "Flink" >}}
+{{< tab "Flink SQL" >}}
Run the following sql:
@@ -173,7 +173,7 @@ Fast-Forward the custom branch to main will delete all the snapshots, tags and s
{{< tabs "fast_forward" >}}
-{{< tab "Flink" >}}
+{{< tab "Flink SQL" >}}
```sql
CALL sys.fast_forward('default.T', 'branch1');
diff --git a/docs/content/maintenance/manage-snapshots.md b/docs/content/maintenance/manage-snapshots.md
index d25124008d35..00c0322de8c2 100644
--- a/docs/content/maintenance/manage-snapshots.md
+++ b/docs/content/maintenance/manage-snapshots.md
@@ -232,7 +232,17 @@ Rollback a table to a specific snapshot ID.
{{< tabs "rollback-to" >}}
-{{< tab "Flink" >}}
+{{< tab "Flink SQL" >}}
+
+Run the following command:
+
+```sql
+CALL sys.rollback_to(`table` => 'database_name.table_name', snapshot_id => );
+```
+
+{{< /tab >}}
+
+{{< tab "Flink Action" >}}
Run the following command:
@@ -298,9 +308,9 @@ submit a `remove_orphan_files` job to clean them:
{{< tab "Spark SQL/Flink SQL" >}}
```sql
-CALL sys.remove_orphan_files(table => "my_db.my_table", [older_than => "2023-10-31 12:00:00"])
+CALL sys.remove_orphan_files(`table` => "my_db.my_table", [older_than => "2023-10-31 12:00:00"])
-CALL sys.remove_orphan_files(table => "my_db.*", [older_than => "2023-10-31 12:00:00"])
+CALL sys.remove_orphan_files(`table` => "my_db.*", [older_than => "2023-10-31 12:00:00"])
```
{{< /tab >}}
diff --git a/docs/content/maintenance/manage-tags.md b/docs/content/maintenance/manage-tags.md
index 6c78c0973f9c..5a5ad765d51c 100644
--- a/docs/content/maintenance/manage-tags.md
+++ b/docs/content/maintenance/manage-tags.md
@@ -96,7 +96,19 @@ You can create a tag with given name and snapshot ID.
{{< tabs "create-tag" >}}
-{{< tab "Flink" >}}
+{{< tab "Flink SQL" >}}
+
+Run the following command:
+
+```sql
+CALL sys.create_tag(`table` => 'database_name.table_name', tag => 'tag_name', [snapshot_id => ]);
+```
+
+If `snapshot_id` unset, snapshot_id defaults to the latest.
+
+{{< /tab >}}
+
+{{< tab "Flink Action" >}}
Run the following command:
@@ -160,7 +172,17 @@ You can delete a tag by its name.
{{< tabs "delete-tag" >}}
-{{< tab "Flink" >}}
+{{< tab "Flink SQL" >}}
+
+Run the following command:
+
+```sql
+CALL sys.delete_tag(`table` => 'database_name.table_name', tag => 'tag_name');
+```
+
+{{< /tab >}}
+
+{{< tab "Flink Action" >}}
Run the following command:
@@ -211,7 +233,17 @@ the data will be deleted too).
{{< tabs "rollback-to" >}}
-{{< tab "Flink" >}}
+{{< tab "Flink SQL" >}}
+
+Run the following command:
+
+```sql
+CALL sys.rollback_to(`table` => 'database_name.table_name', tag => 'tag_name');
+```
+
+{{< /tab >}}
+
+{{< tab "Flink Action" >}}
Run the following command:
diff --git a/docs/content/migration/clone-tables.md b/docs/content/migration/clone-tables.md
index 871baee6f2cb..36355dd85483 100644
--- a/docs/content/migration/clone-tables.md
+++ b/docs/content/migration/clone-tables.md
@@ -33,6 +33,46 @@ To clone a table, run the following command to submit a clone job.
If the table you clone is not modified at the same time, it is recommended to submit a Flink batch job for better performance.
However, if you want to clone the table while writing it at the same time, submit a Flink streaming job for automatic failure recovery.
+{{< tabs "clone" >}}
+
+{{< tab "Flink SQL" >}}
+
+```sql
+CALL sys.clone(
+ warehouse => 'source_warehouse_path`,
+ [`database` => 'source_database_name',]
+ [`table` => 'source_table_name',]
+ target_warehouse => 'target_warehouse_path`,
+ [target_database => 'target_database_name',]
+ [target_table => 'target_table_name',]
+ [parallelism => ]
+);
+```
+
+{{< hint info >}}
+1. If `database` is not specified, all tables in all databases of the specified warehouse will be cloned.
+2. If `table` is not specified, all tables of the specified database will be cloned.
+ {{< /hint >}}
+
+Example: Clone `test_db.test_table` from source warehouse to target warehouse.
+
+```sql
+CALL sys.clone(
+ warehouse => 's3:///path/to/warehouse_source`,
+ `database` => 'test_db',
+ `table` => 'test_table',
+ catalog_conf => 's3.endpoint=https://****.com;s3.access-key=*****;s3.secret-key=*****',
+ target_warehouse => 's3:///path/to/warehouse_target',
+ target_database => 'test_db',
+ target_table => 'test_table',
+ target_catalog_conf => 's3.endpoint=https://****.com;s3.access-key=*****;s3.secret-key=*****'
+);
+```
+
+{{< /tab >}}
+
+{{< tab "Flink Action" >}}
+
```bash
/bin/flink run \
/path/to/paimon-flink-action-{{< version >}}.jar \
@@ -80,3 +120,7 @@ For more usage of the clone action, see
/path/to/paimon-flink-action-{{< version >}}.jar \
clone --help
```
+
+{{< /tab >}}
+
+{{< /tabs >}}
diff --git a/docs/content/migration/migration-from-hive.md b/docs/content/migration/migration-from-hive.md
index 8327e8a928ef..a40079fed84f 100644
--- a/docs/content/migration/migration-from-hive.md
+++ b/docs/content/migration/migration-from-hive.md
@@ -56,14 +56,18 @@ CREATE CATALOG PAIMON WITH ('type'='paimon', 'metastore' = 'hive', 'uri' = 'thri
USE CATALOG PAIMON;
-CALL sys.migrate_table('hive', 'default.hivetable', 'file.format=orc');
+CALL sys.migrate_table(connector => 'hive', source_table => 'default.hivetable', options => 'file.format=orc');
```
After invoke, "hivetable" will totally convert to paimon format. Writing and reading the table by old "hive way" will fail.
We can add our table properties while importing by sys.migrate_table('.', '').
here should be separated by ",". For example:
```sql
-CALL sys.migrate_table('hive', 'my_db.wait_to_upgrate', 'file.format=orc,read.batch-size=2096,write-only=true')
+CALL sys.migrate_table(
+ connector => 'hive',
+ source_table => 'my_db.wait_to_upgrate',
+ options => 'file.format=orc,read.batch-size=2096,write-only=true'
+);
```
If your flink version is below 1.17, you can use flink action to achieve this:
@@ -101,14 +105,18 @@ CREATE CATALOG PAIMON WITH ('type'='paimon', 'metastore' = 'hive', 'uri' = 'thri
USE CATALOG PAIMON;
-CALL sys.migrate_database('hive', 'default', 'file.format=orc');
+CALL sys.migrate_database(connector => 'hive', source_database => 'default', options => 'file.format=orc');
```
After invoke, all tables in "default" database will totally convert to paimon format. Writing and reading the table by old "hive way" will fail.
We can add our table properties while importing by sys.migrate_database('', '').
here should be separated by ",". For example:
```sql
-CALL sys.migrate_database('hive', 'my_db', 'file.format=orc,read.batch-size=2096,write-only=true')
+CALL sys.migrate_database(
+ connector => 'hive',
+ source_database => 'my_db',
+ options => 'file.format=orc,read.batch-size=2096,write-only=true'
+);
```
If your flink version is below 1.17, you can use flink action to achieve this:
@@ -146,7 +154,7 @@ CREATE CATALOG PAIMON WITH ('type'='paimon', 'metastore' = 'hive', 'uri' = 'thri
USE CATALOG PAIMON;
-CALL sys.migrate_file('hive', 'default.hivetable', 'default.paimontable');
+CALL sys.migrate_file(connector => 'hive', source_table => 'default.hivetable', target_table => 'default.paimontable');
```
After invoke, "hivetable" will disappear. And all files will be moved and renamed to paimon directory. "paimontable" here must have the same
partition keys with "hivetable", and "paimontable" should be in unaware-bucket mode.