Skip to content

Commit

Permalink
1. fix the compatibility of procedure in flink 1.18.
Browse files Browse the repository at this point in the history
2. add document.
  • Loading branch information
wg1026688210 committed Jun 4, 2024
1 parent 1eb555f commit 6e2504b
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 0 deletions.
1 change: 1 addition & 0 deletions docs/content/flink/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ All available procedures are listed below.
<li>order_strategy(optional): 'order' or 'zorder' or 'hilbert' or 'none'.</li>
<li>order_by(optional): the columns need to be sort. Left empty if 'order_strategy' is 'none'.</li>
<li>options(optional): additional dynamic options of the table.</li>
<li>where(optional): partition predicate. Left empty for all partitions. (Can't be used together with "partitions").</li>
</td>
<td>
CALL sys.compact(`table` => 'default.T', partitions => 'p=0', order_strategy => 'zorder', order_by => 'a,b', options => 'sink.parallelism=4')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,24 @@ public String[] call(
return call(procedureContext, tableId, partitions, orderStrategy, orderByColumns, "", "");
}

public String[] call(
ProcedureContext procedureContext,
String tableId,
String partitions,
String orderStrategy,
String orderByColumns,
String tableOptions)
throws Exception {
return call(
procedureContext,
tableId,
partitions,
orderStrategy,
orderByColumns,
tableOptions,
"");
}

public String[] call(
ProcedureContext procedureContext,
String tableId,
Expand All @@ -78,6 +96,7 @@ public String[] call(
String tableOptions,
String whereSql)
throws Exception {

String warehouse = catalog.warehouse();
Map<String, String> catalogOptions = catalog.options();
Map<String, String> tableConf =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public void testCallCompact() {
.doesNotThrowAnyException();
assertThatCode(() -> sql("CALL sys.compact('default.T', 'pt=1', '', '')"))
.doesNotThrowAnyException();
assertThatCode(() -> sql("CALL sys.compact('default.T', '', '', '', 'sink.parallelism=1')"))
.doesNotThrowAnyException();
assertThatCode(
() ->
sql(
Expand Down

0 comments on commit 6e2504b

Please sign in to comment.