From 1488e1c027dc6c134ed0cd017ae0054bab236965 Mon Sep 17 00:00:00 2001 From: HunterXHunter Date: Wed, 30 Oct 2024 17:30:31 +0800 Subject: [PATCH] [flink] Procedure sorting compact a table without partitions filter. (#4405) --- docs/content/flink/procedures.md | 1 + .../paimon/flink/procedure/CompactProcedure.java | 12 ++++++++++++ .../ProcedurePositionalArgumentsITCase.java | 2 ++ 3 files changed, 15 insertions(+) diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md index 2ac9fb6258cb..52460e27e709 100644 --- a/docs/content/flink/procedures.md +++ b/docs/content/flink/procedures.md @@ -71,6 +71,7 @@ All available procedures are listed below. -- Use indexed argument
CALL [catalog.]sys.compact('table')

CALL [catalog.]sys.compact('table', 'partitions')

+ CALL [catalog.]sys.compact('table', 'order_strategy', 'order_by')

CALL [catalog.]sys.compact('table', 'partitions', 'order_strategy', 'order_by')

CALL [catalog.]sys.compact('table', 'partitions', 'order_strategy', 'order_by', 'options')

CALL [catalog.]sys.compact('table', 'partitions', 'order_strategy', 'order_by', 'options', 'where')

diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java index c9be24404946..63aa6c906b94 100644 --- a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java @@ -44,6 +44,9 @@ * CALL sys.compact('tableId', 'pt1=A,pt2=a;pt1=B,pt2=b') * * -- compact a table with sorting + * CALL sys.compact('tableId', 'ORDER/ZORDER', 'col1,col2') + * + * -- compact specific partitions with sorting * CALL sys.compact('tableId', 'partitions', 'ORDER/ZORDER', 'col1,col2', 'sink.parallelism=6') * * @@ -61,6 +64,15 @@ public String[] call(ProcedureContext procedureContext, String tableId, String p return call(procedureContext, tableId, partitions, "", "", "", ""); } + public String[] call( + ProcedureContext procedureContext, + String tableId, + String orderStrategy, + String orderByColumns) + throws Exception { + return call(procedureContext, tableId, "", orderStrategy, orderByColumns, "", ""); + } + public String[] call( ProcedureContext procedureContext, String tableId, diff --git a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java index bb461dab6d1f..3eb1bf3c40e4 100644 --- a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java +++ b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java @@ -56,6 +56,8 @@ public void testCompactDatabaseAndTable() { assertThatCode(() -> sql("CALL sys.compact('default.T')")).doesNotThrowAnyException(); assertThatCode(() -> sql("CALL sys.compact('default.T', 'pt=1')")) .doesNotThrowAnyException(); + assertThatCode(() -> sql("CALL sys.compact('default.T', '', '')")) + .doesNotThrowAnyException(); assertThatCode(() -> sql("CALL sys.compact('default.T', 'pt=1', '', '')")) .doesNotThrowAnyException(); assertThatCode(() -> sql("CALL sys.compact('default.T', '', '', '', 'sink.parallelism=1')"))