diff --git a/docs/content/engines/flink.md b/docs/content/engines/flink.md index 7250b8bb5cb9..7175269ab27a 100644 --- a/docs/content/engines/flink.md +++ b/docs/content/engines/flink.md @@ -319,3 +319,188 @@ SELECT * FROM T; SET 'paimon.*.default.T.scan.timestamp-millis' = '1697018249000'; SELECT * FROM T; ``` + +## Procedures + +Flink 1.18 and later versions support [Call Statements](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/call/), +which make it easier to manipulate data and metadata of Paimon table by writing SQLs instead of submitting Flink jobs. +All available procedures are listed below. Note that when you call a procedure, you must pass all parameters in order, +and if you don't want to pass some parameters, you must use `''` as placeholder. For example, if you want to compact +table `default.t` with parallelism 4, but you don't want to specify partitions and sort strategy, the call statement +should be \ +`CALL sys.compact('default.t', '', '', '', 'sink.parallelism=4')`. + +Specify partitions: we use string to represent partition filter. "," means "AND" and ";" means "OR". For example, if you want +to specify two partitions date=01 and date=02, you need to write 'date=01;date=02'; If you want to specify one partition +with date=01 and day=01, you need to write 'date=01,day=01'. + +table options syntax: we use string to represent table options. The format is 'key1=value1,key2=value2...'. + +
Procedure Name | +Usage | +Explaination | +Example | +
---|---|---|---|
compact | +
+ CALL [catalog.]sys.compact('identifier') + CALL [catalog.]sys.compact('identifier', 'partitions') + CALL [catalog.]sys.compact('identifier', 'partitions', 'order_strategy', 'order_columns', 'table_options') + |
+
+ TO compact a table. Arguments:
+ |
+ + CALL sys.compact('default.T', 'p=0', 'zorder', 'a,b', 'sink.parallelism=4') + | +
compact_database | +
+ CALL [catalog.]sys.compact_database() + CALL [catalog.]sys.compact_database('includingDatabases') + CALL [catalog.]sys.compact_database('includingDatabases', 'mode') + CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables') + CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables') + CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables', 'tableOptions') + |
+
+ To compact databases. Arguments:
+ |
+ + CALL sys.compact_database('db1|db2', 'combined', 'table_.*', 'ignore', 'sink.parallelism=4') + | +
create_tag | ++ CALL [catalog.]sys.create_tag('identifier', 'tagName', snapshotId) + | +
+ To create a tag based on given snapshot. Arguments:
+ |
+ + CALL sys.create_tag('default.T', 'my_tag', 10) + | +
delete_tag | ++ CALL [catalog.]sys.delete_tag('identifier', 'tagName') + | +
+ To delete a tag. Arguments:
+ |
+ + CALL sys.delete_tag('default.T', 'my_tag') + | +
merge_into | +
+ -- when matched then upsert + CALL [catalog.]sys.merge_into('identifier','targetAlias', + 'sourceSqls','sourceTable','mergeCondition', + 'matchedUpsertCondition','matchedUpsertSetting') + -- when matched then upsert; when not matched then insert + CALL [catalog.]sys.merge_into('identifier','targetAlias', + 'sourceSqls','sourceTable','mergeCondition', + 'matchedUpsertCondition','matchedUpsertSetting', + 'notMatchedInsertCondition','notMatchedInsertValues') + -- when matched then delete + CALL [catalog].sys.merge_into('identifier','targetAlias', + 'sourceSqls','sourceTable','mergeCondition', + 'matchedDeleteCondition') + -- when matched then upsert + delete; + -- when not matched then insert + CALL [catalog].sys.merge_into('identifier','targetAlias', + 'sourceSqls','sourceTable','mergeCondition', + 'matchedUpsertCondition','matchedUpsertSetting', + 'notMatchedInsertCondition','notMatchedInsertValues', + 'matchedDeleteCondition') + |
+ + To perform "MERGE INTO" syntax. See merge_into action for + details of arguments. + | +
+ -- for matched order rows, + -- increase the price, + -- and if there is no match, + -- insert the order from + -- the source table + CALL sys.merge_into('default.T', '', '', 'default.S', 'T.id=S.order_id', '', 'price=T.price+20', '', '*') + |
+
remove_orphan_files | +
+ CALL [catalog.]sys.remove_orphan_files('identifier') + CALL [catalog.]sys.remove_orphan_files('identifier', 'olderThan') + |
+
+ To remove the orphan data files and metadata files. Arguments:
+ |
+ CALL remove_orphan_files('default.T', '2023-10-31 12:00:00') | +
reset_consumer | +
+ -- reset the new next snapshot id in the consumer + CALL [catalog.]sys.reset_consumer('identifier', 'consumerId', nextSnapshotId) + -- delete consumer + CALL [catalog.]sys.reset_consumer('identifier', 'consumerId') + |
+
+ To reset or delete consumer. Arguments:
+ |
+ CALL sys.reset_consumer('default.T', 'myid', 10) | +
rollback_to | +
+ -- rollback to a snapshot + CALL sys.rollback_to('identifier', snapshotId) + -- rollback to a tag + CALL sys.rollback_to('identifier', 'tagName') + |
+
+ To rollback to a specific version of target table. Argument:
+ |
+ CALL sys.rollback_to('default.T', 10) | +
* CALL sys.drop_partition('tableId', 'partition1', 'partition2', ...)
*
+ *
+ * @deprecated use ALTER TABLE DROP PARTITION
*/
+@Deprecated
public class DropPartitionProcedure extends ProcedureBase {
public static final String IDENTIFIER = "drop_partition";
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java
index c5d5ff23eb89..c0639ba44d92 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java
@@ -37,7 +37,7 @@
* Remove orphan files procedure. Usage:
*
*
- * CALL sys.remove_orphan_files(table => 'tableId', [older_then => '2023-10-31 12:00:00'])
+ * CALL sys.remove_orphan_files(table => 'tableId', [older_than => '2023-10-31 12:00:00'])
*
*/
public class RemoveOrphanFilesProcedure extends BaseProcedure {
@@ -45,7 +45,7 @@ public class RemoveOrphanFilesProcedure extends BaseProcedure {
private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
ProcedureParameter.required("table", StringType),
- ProcedureParameter.optional("older_then", StringType)
+ ProcedureParameter.optional("older_than", StringType)
};
private static final StructType OUTPUT_TYPE =
diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala
index 5f8a58e785c1..4764daa6f88f 100644
--- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala
+++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala
@@ -51,22 +51,22 @@ class RemoveOrphanFilesProcedureTest extends PaimonSparkTestBase {
checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), Row("Deleted=0") :: Nil)
val orphanFile2ModTime = fileIO.getFileStatus(orphanFile2).getModificationTime
- val older_then1 = DateTimeUtils.formatLocalDateTime(
+ val older_than1 = DateTimeUtils.formatLocalDateTime(
DateTimeUtils.toLocalDateTime(
orphanFile2ModTime -
TimeUnit.SECONDS.toMillis(1)),
3)
checkAnswer(
- spark.sql(s"CALL sys.remove_orphan_files(table => 'T', older_then => '$older_then1')"),
+ spark.sql(s"CALL sys.remove_orphan_files(table => 'T', older_than => '$older_than1')"),
Row("Deleted=1") :: Nil)
- val older_then2 = DateTimeUtils.formatLocalDateTime(
+ val older_than2 = DateTimeUtils.formatLocalDateTime(
DateTimeUtils.toLocalDateTime(System.currentTimeMillis()),
3)
checkAnswer(
- spark.sql(s"CALL sys.remove_orphan_files(table => 'T', older_then => '$older_then2')"),
+ spark.sql(s"CALL sys.remove_orphan_files(table => 'T', older_than => '$older_than2')"),
Row("Deleted=1") :: Nil)
}
}