Skip to content

Commit

Permalink
[doc] Document available flink procedures (#2455)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin authored Dec 5, 2023
1 parent 3c6ad33 commit 5f9d265
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 7 deletions.
185 changes: 185 additions & 0 deletions docs/content/engines/flink.md
Original file line number Diff line number Diff line change
Expand Up @@ -319,3 +319,188 @@ SELECT * FROM T;
SET 'paimon.*.default.T.scan.timestamp-millis' = '1697018249000';
SELECT * FROM T;
```

## Procedures

Flink 1.18 and later versions support [Call Statements](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/call/),
which make it easier to manipulate data and metadata of Paimon table by writing SQLs instead of submitting Flink jobs.
All available procedures are listed below. Note that when you call a procedure, you must pass all parameters in order,
and if you don't want to pass some parameters, you must use `''` as placeholder. For example, if you want to compact
table `default.t` with parallelism 4, but you don't want to specify partitions and sort strategy, the call statement
should be \
`CALL sys.compact('default.t', '', '', '', 'sink.parallelism=4')`.

Specify partitions: we use string to represent partition filter. "," means "AND" and ";" means "OR". For example, if you want
to specify two partitions date=01 and date=02, you need to write 'date=01;date=02'; If you want to specify one partition
with date=01 and day=01, you need to write 'date=01,day=01'.

table options syntax: we use string to represent table options. The format is 'key1=value1,key2=value2...'.

<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 4%">Procedure Name</th>
<th class="text-left" style="width: 4%">Usage</th>
<th class="text-left" style="width: 20%">Explaination</th>
<th class="text-left" style="width: 4%">Example</th>
</tr>
</thead>
<tbody style="font-size: 11px; ">
<tr>
<td>compact</td>
<td>
CALL [catalog.]sys.compact('identifier') <br/><br/>
CALL [catalog.]sys.compact('identifier', 'partitions') <br/><br/>
CALL [catalog.]sys.compact('identifier', 'partitions', 'order_strategy', 'order_columns', 'table_options')
</td>
<td>
TO compact a table. Arguments:
<li>identifier: the target table identifier. Cannot be empty.</li>
<li>partitions: partition filter.</li>
<li>order_strategy: 'order' or 'zorder' or 'none'. Left empty for 'none'.</li>
<li>order_columns: the columns need to be sort. Left empty if 'order_strategy' is 'none'.</li>
<li>table_options: additional dynamic options of the table.</li>
</td>
<td>
CALL sys.compact('default.T', 'p=0', 'zorder', 'a,b', 'sink.parallelism=4')
</td>
</tr>
<tr>
<td>compact_database</td>
<td>
CALL [catalog.]sys.compact_database() <br/><br/>
CALL [catalog.]sys.compact_database('includingDatabases') <br/><br/>
CALL [catalog.]sys.compact_database('includingDatabases', 'mode') <br/><br/>
CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables') <br/><br/>
CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables') <br/><br/>
CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables', 'tableOptions')
</td>
<td>
To compact databases. Arguments:
<li>includingDatabases: to specify databases. You can use regular expression.</li>
<li>mode: compact mode. "divided": start a sink for each table, detecting the new table requires restarting the job;
"combined" (default): start a single combined sink for all tables, the new table will be automatically detected.
</li>
<li>includingTables: to specify tables. You can use regular expression.</li>
<li>excludingTables: to specify tables that are not compacted. You can use regular expression.</li>
<li>tableOptions: additional dynamic options of the table.</li>
</td>
<td>
CALL sys.compact_database('db1|db2', 'combined', 'table_.*', 'ignore', 'sink.parallelism=4')
</td>
</tr>
<tr>
<td>create_tag</td>
<td>
CALL [catalog.]sys.create_tag('identifier', 'tagName', snapshotId)
</td>
<td>
To create a tag based on given snapshot. Arguments:
<li>identifier: the target table identifier. Cannot be empty.</li>
<li>tagName: name of the new tag.</li>
<li>snapshotId (Long): id of the snapshot which the new tag is based on.</li>
</td>
<td>
CALL sys.create_tag('default.T', 'my_tag', 10)
</td>
</tr>
<tr>
<td>delete_tag</td>
<td>
CALL [catalog.]sys.delete_tag('identifier', 'tagName')
</td>
<td>
To delete a tag. Arguments:
<li>identifier: the target table identifier. Cannot be empty.</li>
<li>tagName: name of the tag to be deleted.</li>
</td>
<td>
CALL sys.delete_tag('default.T', 'my_tag')
</td>
</tr>
<tr>
<td>merge_into</td>
<td>
-- when matched then upsert<br/>
CALL [catalog.]sys.merge_into('identifier','targetAlias',<br/>
'sourceSqls','sourceTable','mergeCondition',<br/>
'matchedUpsertCondition','matchedUpsertSetting')<br/><br/>
-- when matched then upsert; when not matched then insert<br/>
CALL [catalog.]sys.merge_into('identifier','targetAlias',<br/>
'sourceSqls','sourceTable','mergeCondition',<br/>
'matchedUpsertCondition','matchedUpsertSetting',<br/>
'notMatchedInsertCondition','notMatchedInsertValues')<br/><br/>
-- when matched then delete<br/>
CALL [catalog].sys.merge_into('identifier','targetAlias',<br/>
'sourceSqls','sourceTable','mergeCondition',<br/>
'matchedDeleteCondition')<br/><br/>
-- when matched then upsert + delete;<br/>
-- when not matched then insert<br/>
CALL [catalog].sys.merge_into('identifier','targetAlias',<br/>
'sourceSqls','sourceTable','mergeCondition',<br/>
'matchedUpsertCondition','matchedUpsertSetting',<br/>
'notMatchedInsertCondition','notMatchedInsertValues',<br/>
'matchedDeleteCondition')<br/><br/>
</td>
<td>
To perform "MERGE INTO" syntax. See <a href="/how-to/writing-tables#merging-into-table">merge_into action</a> for
details of arguments.
</td>
<td>
-- for matched order rows,<br/>
-- increase the price,<br/>
-- and if there is no match,<br/>
-- insert the order from<br/>
-- the source table<br/>
CALL sys.merge_into('default.T', '', '', 'default.S', 'T.id=S.order_id', '', 'price=T.price+20', '', '*')
</td>
</tr>
<tr>
<td>remove_orphan_files</td>
<td>
CALL [catalog.]sys.remove_orphan_files('identifier')<br/><br/>
CALL [catalog.]sys.remove_orphan_files('identifier', 'olderThan')
</td>
<td>
To remove the orphan data files and metadata files. Arguments:
<li>identifier: the target table identifier. Cannot be empty.</li>
<li>olderThan: to avoid deleting newly written files, this procedure only
deletes orphan files older than 1 day by default. This argument can modify the interval.
</li>
</td>
<td>CALL remove_orphan_files('default.T', '2023-10-31 12:00:00')</td>
</tr>
<tr>
<td>reset_consumer</td>
<td>
-- reset the new next snapshot id in the consumer<br/>
CALL [catalog.]sys.reset_consumer('identifier', 'consumerId', nextSnapshotId)<br/><br/>
-- delete consumer<br/>
CALL [catalog.]sys.reset_consumer('identifier', 'consumerId')
</td>
<td>
To reset or delete consumer. Arguments:
<li>identifier: the target table identifier. Cannot be empty.</li>
<li>consumerId: consumer to be reset or deleted.</li>
<li>nextSnapshotId (Long): the new next snapshot id of the consumer.</li>
</td>
<td>CALL sys.reset_consumer('default.T', 'myid', 10)</td>
</tr>
<tr>
<td>rollback_to</td>
<td>
-- rollback to a snapshot<br/>
CALL sys.rollback_to('identifier', snapshotId)<br/><br/>
-- rollback to a tag<br/>
CALL sys.rollback_to('identifier', 'tagName')
</td>
<td>
To rollback to a specific version of target table. Argument:
<li>identifier: the target table identifier. Cannot be empty.</li>
<li>snapshotId (Long): id of the snapshot that will roll back to.</li>
<li>tagName: name of the tag that will roll back to.</li>
</td>
<td>CALL sys.rollback_to('default.T', 10)</td>
</tr>
</tbody>
</table>
2 changes: 1 addition & 1 deletion docs/content/maintenance/manage-snapshots.md
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ To avoid deleting files that are newly added by other writing jobs, this action
{{< tab "Spark" >}}
```sql
CALL sys.remove_orphan_files(table => "tableId", [older_then => "2023-10-31 12:00:00"])
CALL sys.remove_orphan_files(table => "tableId", [older_than => "2023-10-31 12:00:00"])
```
{{< /tab >}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@
* <pre><code>
* CALL sys.drop_partition('tableId', 'partition1', 'partition2', ...)
* </code></pre>
*
* @deprecated use ALTER TABLE DROP PARTITION
*/
@Deprecated
public class DropPartitionProcedure extends ProcedureBase {

public static final String IDENTIFIER = "drop_partition";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@
* Remove orphan files procedure. Usage:
*
* <pre><code>
* CALL sys.remove_orphan_files(table => 'tableId', [older_then => '2023-10-31 12:00:00'])
* CALL sys.remove_orphan_files(table => 'tableId', [older_than => '2023-10-31 12:00:00'])
* </code></pre>
*/
public class RemoveOrphanFilesProcedure extends BaseProcedure {

private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
ProcedureParameter.required("table", StringType),
ProcedureParameter.optional("older_then", StringType)
ProcedureParameter.optional("older_than", StringType)
};

private static final StructType OUTPUT_TYPE =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,22 +51,22 @@ class RemoveOrphanFilesProcedureTest extends PaimonSparkTestBase {
checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), Row("Deleted=0") :: Nil)

val orphanFile2ModTime = fileIO.getFileStatus(orphanFile2).getModificationTime
val older_then1 = DateTimeUtils.formatLocalDateTime(
val older_than1 = DateTimeUtils.formatLocalDateTime(
DateTimeUtils.toLocalDateTime(
orphanFile2ModTime -
TimeUnit.SECONDS.toMillis(1)),
3)

checkAnswer(
spark.sql(s"CALL sys.remove_orphan_files(table => 'T', older_then => '$older_then1')"),
spark.sql(s"CALL sys.remove_orphan_files(table => 'T', older_than => '$older_than1')"),
Row("Deleted=1") :: Nil)

val older_then2 = DateTimeUtils.formatLocalDateTime(
val older_than2 = DateTimeUtils.formatLocalDateTime(
DateTimeUtils.toLocalDateTime(System.currentTimeMillis()),
3)

checkAnswer(
spark.sql(s"CALL sys.remove_orphan_files(table => 'T', older_then => '$older_then2')"),
spark.sql(s"CALL sys.remove_orphan_files(table => 'T', older_than => '$older_than2')"),
Row("Deleted=1") :: Nil)
}
}

0 comments on commit 5f9d265

Please sign in to comment.