Skip to content

Commit

Permalink
[flink][procedure] Support named argument in procedures (apache#4087)
Browse files Browse the repository at this point in the history
  • Loading branch information
yunfengzhou-hub authored Sep 3, 2024
1 parent db98432 commit 87a2840
Show file tree
Hide file tree
Showing 60 changed files with 3,835 additions and 1,007 deletions.
28 changes: 9 additions & 19 deletions docs/content/flink/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,21 +185,6 @@ All available procedures are listed below.
<tr>
<td>merge_into</td>
<td>
-- when matched then upsert<br/>
CALL [catalog.]sys.merge_into('identifier','targetAlias',<br/>
'sourceSqls','sourceTable','mergeCondition',<br/>
'matchedUpsertCondition','matchedUpsertSetting')<br/><br/>
-- when matched then upsert; when not matched then insert<br/>
CALL [catalog.]sys.merge_into('identifier','targetAlias',<br/>
'sourceSqls','sourceTable','mergeCondition',<br/>
'matchedUpsertCondition','matchedUpsertSetting',<br/>
'notMatchedInsertCondition','notMatchedInsertValues')<br/><br/>
-- when matched then delete<br/>
CALL [catalog].sys.merge_into('identifier','targetAlias',<br/>
'sourceSqls','sourceTable','mergeCondition',<br/>
'matchedDeleteCondition')<br/><br/>
-- when matched then upsert + delete;<br/>
-- when not matched then insert<br/>
CALL [catalog].sys.merge_into('identifier','targetAlias',<br/>
'sourceSqls','sourceTable','mergeCondition',<br/>
'matchedUpsertCondition','matchedUpsertSetting',<br/>
Expand All @@ -216,7 +201,12 @@ All available procedures are listed below.
-- and if there is no match,<br/>
-- insert the order from<br/>
-- the source table<br/>
CALL sys.merge_into('default.T', '', '', 'default.S', 'T.id=S.order_id', '', 'price=T.price+20', '', '*')
CALL sys.merge_into(<br/>
target_table => 'default.T',<br/>
source_table => 'default.S',<br/>
merge_condition => 'T.id=S.order_id',<br/>
matched_upsert_setting => 'price=T.price+20',<br/>
not_matched_insert_values => '*')<br/><br/>
</td>
</tr>
<tr>
Expand Down Expand Up @@ -259,17 +249,17 @@ All available procedures are listed below.
<td>rollback_to</td>
<td>
-- rollback to a snapshot<br/>
CALL sys.rollback_to('identifier', snapshotId)<br/><br/>
CALL sys.rollback_to(`table` => 'identifier', snapshot_id => snapshotId)<br/><br/>
-- rollback to a tag<br/>
CALL sys.rollback_to('identifier', 'tagName')
CALL sys.rollback_to(`table` => 'identifier', tag => 'tagName')
</td>
<td>
To rollback to a specific version of target table. Argument:
<li>identifier: the target table identifier. Cannot be empty.</li>
<li>snapshotId (Long): id of the snapshot that will roll back to.</li>
<li>tagName: name of the tag that will roll back to.</li>
</td>
<td>CALL sys.rollback_to('default.T', 10)</td>
<td>CALL sys.rollback_to(`table` => 'default.T', snapshot_id => 10)</td>
</tr>
<tr>
<td>expire_snapshots</td>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.procedure;

import org.apache.paimon.flink.action.CompactDatabaseAction;
import org.apache.paimon.utils.StringUtils;
import org.apache.paimon.utils.TimeUtils;

import org.apache.flink.table.procedure.ProcedureContext;

import java.util.Map;

import static org.apache.paimon.utils.ParameterUtils.parseCommaSeparatedKeyValues;

/**
* Compact database procedure. Usage:
*
* <pre><code>
* -- NOTE: use '' as placeholder for optional arguments
*
* -- compact all databases
* CALL sys.compact_database()
*
* -- compact some databases (accept regular expression)
* CALL sys.compact_database('includingDatabases')
*
* -- set compact mode
* CALL sys.compact_database('includingDatabases', 'mode')
*
* -- compact some tables (accept regular expression)
* CALL sys.compact_database('includingDatabases', 'mode', 'includingTables')
*
* -- exclude some tables (accept regular expression)
* CALL sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables')
*
* -- set table options ('k=v,...')
* CALL sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables', 'tableOptions')
* </code></pre>
*/
public class CompactDatabaseProcedure extends ProcedureBase {

public static final String IDENTIFIER = "compact_database";

public String[] call(ProcedureContext procedureContext) throws Exception {
return call(procedureContext, "");
}

public String[] call(ProcedureContext procedureContext, String includingDatabases)
throws Exception {
return call(procedureContext, includingDatabases, "");
}

public String[] call(ProcedureContext procedureContext, String includingDatabases, String mode)
throws Exception {
return call(procedureContext, includingDatabases, mode, "");
}

public String[] call(
ProcedureContext procedureContext,
String includingDatabases,
String mode,
String includingTables)
throws Exception {
return call(procedureContext, includingDatabases, mode, includingTables, "");
}

public String[] call(
ProcedureContext procedureContext,
String includingDatabases,
String mode,
String includingTables,
String excludingTables)
throws Exception {
return call(
procedureContext, includingDatabases, mode, includingTables, excludingTables, "");
}

public String[] call(
ProcedureContext procedureContext,
String includingDatabases,
String mode,
String includingTables,
String excludingTables,
String tableOptions)
throws Exception {
return call(
procedureContext,
includingDatabases,
mode,
includingTables,
excludingTables,
tableOptions,
"");
}

public String[] call(
ProcedureContext procedureContext,
String includingDatabases,
String mode,
String includingTables,
String excludingTables,
String tableOptions,
String partitionIdleTime)
throws Exception {
String warehouse = catalog.warehouse();
Map<String, String> catalogOptions = catalog.options();
CompactDatabaseAction action =
new CompactDatabaseAction(warehouse, catalogOptions)
.includingDatabases(nullable(includingDatabases))
.includingTables(nullable(includingTables))
.excludingTables(nullable(excludingTables))
.withDatabaseCompactMode(nullable(mode));
if (!StringUtils.isBlank(tableOptions)) {
action.withTableOptions(parseCommaSeparatedKeyValues(tableOptions));
}
if (!StringUtils.isBlank(partitionIdleTime)) {
action.withPartitionIdleTime(TimeUtils.parseDuration(partitionIdleTime));
}

return execute(procedureContext, action, "Compact database job");
}

@Override
public String identifier() {
return IDENTIFIER;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.procedure;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.Table;

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.table.procedure.ProcedureContext;

/**
* Create branch procedure for given tag. Usage:
*
* <pre><code>
* CALL sys.create_branch('tableId', 'branchName', 'tagName')
* </code></pre>
*/
public class CreateBranchProcedure extends ProcedureBase {

public static final String IDENTIFIER = "create_branch";

@Override
public String identifier() {
return IDENTIFIER;
}

public String[] call(
ProcedureContext procedureContext, String tableId, String branchName, String tagName)
throws Catalog.TableNotExistException {
return innerCall(tableId, branchName, tagName);
}

public String[] call(ProcedureContext procedureContext, String tableId, String branchName)
throws Catalog.TableNotExistException {
return innerCall(tableId, branchName, null);
}

private String[] innerCall(String tableId, String branchName, String tagName)
throws Catalog.TableNotExistException {
Table table = catalog.getTable(Identifier.fromString(tableId));
if (!StringUtils.isBlank(tagName)) {
table.createBranch(branchName, tagName);
} else {
table.createBranch(branchName);
}
return new String[] {"Success"};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.procedure;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.TimeUtils;

import org.apache.flink.table.procedure.ProcedureContext;

import javax.annotation.Nullable;

import java.time.Duration;

/**
* Create tag procedure. Usage:
*
* <pre><code>
* CALL sys.create_tag('tableId', 'tagName', snapshotId, 'timeRetained')
* </code></pre>
*/
public class CreateTagProcedure extends ProcedureBase {

public static final String IDENTIFIER = "create_tag";

public String[] call(
ProcedureContext procedureContext, String tableId, String tagName, long snapshotId)
throws Catalog.TableNotExistException {
return innerCall(tableId, tagName, snapshotId, null);
}

public String[] call(ProcedureContext procedureContext, String tableId, String tagName)
throws Catalog.TableNotExistException {
return innerCall(tableId, tagName, null, null);
}

public String[] call(
ProcedureContext procedureContext,
String tableId,
String tagName,
long snapshotId,
String timeRetained)
throws Catalog.TableNotExistException {
return innerCall(tableId, tagName, snapshotId, timeRetained);
}

public String[] call(
ProcedureContext procedureContext, String tableId, String tagName, String timeRetained)
throws Catalog.TableNotExistException {
return innerCall(tableId, tagName, null, timeRetained);
}

private String[] innerCall(
String tableId,
String tagName,
@Nullable Long snapshotId,
@Nullable String timeRetained)
throws Catalog.TableNotExistException {
Table table = catalog.getTable(Identifier.fromString(tableId));
if (snapshotId == null) {
table.createTag(tagName, toDuration(timeRetained));
} else {
table.createTag(tagName, snapshotId, toDuration(timeRetained));
}
return new String[] {"Success"};
}

@Nullable
private static Duration toDuration(@Nullable String s) {
if (s == null) {
return null;
}

return TimeUtils.parseDuration(s);
}

@Override
public String identifier() {
return IDENTIFIER;
}
}
Loading

0 comments on commit 87a2840

Please sign in to comment.