Skip to content

Commit

Permalink
[flink][procedure] Resolve compatibility for indexed argument in flin…
Browse files Browse the repository at this point in the history
…k 1.18
  • Loading branch information
yunfengzhou-hub committed Sep 3, 2024
1 parent 1bff0b3 commit f0dfb4c
Show file tree
Hide file tree
Showing 15 changed files with 1,773 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.procedure;

import org.apache.paimon.flink.action.CompactDatabaseAction;
import org.apache.paimon.utils.StringUtils;
import org.apache.paimon.utils.TimeUtils;

import org.apache.flink.table.procedure.ProcedureContext;

import java.util.Map;

import static org.apache.paimon.utils.ParameterUtils.parseCommaSeparatedKeyValues;

/**
* Compact database procedure. Usage:
*
* <pre><code>
* -- NOTE: use '' as placeholder for optional arguments
*
* -- compact all databases
* CALL sys.compact_database()
*
* -- compact some databases (accept regular expression)
* CALL sys.compact_database('includingDatabases')
*
* -- set compact mode
* CALL sys.compact_database('includingDatabases', 'mode')
*
* -- compact some tables (accept regular expression)
* CALL sys.compact_database('includingDatabases', 'mode', 'includingTables')
*
* -- exclude some tables (accept regular expression)
* CALL sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables')
*
* -- set table options ('k=v,...')
* CALL sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables', 'tableOptions')
* </code></pre>
*/
public class CompactDatabaseProcedure extends ProcedureBase {

public static final String IDENTIFIER = "compact_database";

public String[] call(ProcedureContext procedureContext) throws Exception {
return call(procedureContext, "");
}

public String[] call(ProcedureContext procedureContext, String includingDatabases)
throws Exception {
return call(procedureContext, includingDatabases, "");
}

public String[] call(ProcedureContext procedureContext, String includingDatabases, String mode)
throws Exception {
return call(procedureContext, includingDatabases, mode, "");
}

public String[] call(
ProcedureContext procedureContext,
String includingDatabases,
String mode,
String includingTables)
throws Exception {
return call(procedureContext, includingDatabases, mode, includingTables, "");
}

public String[] call(
ProcedureContext procedureContext,
String includingDatabases,
String mode,
String includingTables,
String excludingTables)
throws Exception {
return call(
procedureContext, includingDatabases, mode, includingTables, excludingTables, "");
}

public String[] call(
ProcedureContext procedureContext,
String includingDatabases,
String mode,
String includingTables,
String excludingTables,
String tableOptions)
throws Exception {
return call(
procedureContext,
includingDatabases,
mode,
includingTables,
excludingTables,
tableOptions,
"");
}

public String[] call(
ProcedureContext procedureContext,
String includingDatabases,
String mode,
String includingTables,
String excludingTables,
String tableOptions,
String partitionIdleTime)
throws Exception {
String warehouse = catalog.warehouse();
Map<String, String> catalogOptions = catalog.options();
CompactDatabaseAction action =
new CompactDatabaseAction(warehouse, catalogOptions)
.includingDatabases(nullable(includingDatabases))
.includingTables(nullable(includingTables))
.excludingTables(nullable(excludingTables))
.withDatabaseCompactMode(nullable(mode));
if (!StringUtils.isBlank(tableOptions)) {
action.withTableOptions(parseCommaSeparatedKeyValues(tableOptions));
}
if (!StringUtils.isBlank(partitionIdleTime)) {
action.withPartitionIdleTime(TimeUtils.parseDuration(partitionIdleTime));
}

return execute(procedureContext, action, "Compact database job");
}

@Override
public String identifier() {
return IDENTIFIER;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.procedure;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.Table;

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.table.procedure.ProcedureContext;

/**
* Create branch procedure for given tag. Usage:
*
* <pre><code>
* CALL sys.create_branch('tableId', 'branchName', 'tagName')
* </code></pre>
*/
public class CreateBranchProcedure extends ProcedureBase {

public static final String IDENTIFIER = "create_branch";

@Override
public String identifier() {
return IDENTIFIER;
}

public String[] call(
ProcedureContext procedureContext, String tableId, String branchName, String tagName)
throws Catalog.TableNotExistException {
return innerCall(tableId, branchName, tagName);
}

public String[] call(ProcedureContext procedureContext, String tableId, String branchName)
throws Catalog.TableNotExistException {
return innerCall(tableId, branchName, null);
}

private String[] innerCall(String tableId, String branchName, String tagName)
throws Catalog.TableNotExistException {
Table table = catalog.getTable(Identifier.fromString(tableId));
if (!StringUtils.isBlank(tagName)) {
table.createBranch(branchName, tagName);
} else {
table.createBranch(branchName);
}
return new String[] {"Success"};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.procedure;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.TimeUtils;

import org.apache.flink.table.procedure.ProcedureContext;

import javax.annotation.Nullable;

import java.time.Duration;

/**
* Create tag procedure. Usage:
*
* <pre><code>
* CALL sys.create_tag('tableId', 'tagName', snapshotId, 'timeRetained')
* </code></pre>
*/
public class CreateTagProcedure extends ProcedureBase {

public static final String IDENTIFIER = "create_tag";

public String[] call(
ProcedureContext procedureContext, String tableId, String tagName, long snapshotId)
throws Catalog.TableNotExistException {
return innerCall(tableId, tagName, snapshotId, null);
}

public String[] call(ProcedureContext procedureContext, String tableId, String tagName)
throws Catalog.TableNotExistException {
return innerCall(tableId, tagName, null, null);
}

public String[] call(
ProcedureContext procedureContext,
String tableId,
String tagName,
long snapshotId,
String timeRetained)
throws Catalog.TableNotExistException {
return innerCall(tableId, tagName, snapshotId, timeRetained);
}

public String[] call(
ProcedureContext procedureContext, String tableId, String tagName, String timeRetained)
throws Catalog.TableNotExistException {
return innerCall(tableId, tagName, null, timeRetained);
}

private String[] innerCall(
String tableId,
String tagName,
@Nullable Long snapshotId,
@Nullable String timeRetained)
throws Catalog.TableNotExistException {
Table table = catalog.getTable(Identifier.fromString(tableId));
if (snapshotId == null) {
table.createTag(tagName, toDuration(timeRetained));
} else {
table.createTag(tagName, snapshotId, toDuration(timeRetained));
}
return new String[] {"Success"};
}

@Nullable
private static Duration toDuration(@Nullable String s) {
if (s == null) {
return null;
}

return TimeUtils.parseDuration(s);
}

@Override
public String identifier() {
return IDENTIFIER;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.procedure;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.partition.actions.PartitionMarkDoneAction;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.PartitionPathUtils;

import org.apache.flink.table.procedure.ProcedureContext;

import java.io.IOException;
import java.util.List;

import static org.apache.paimon.flink.sink.partition.PartitionMarkDone.markDone;
import static org.apache.paimon.utils.ParameterUtils.getPartitions;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/**
* Partition mark done procedure. Usage:
*
* <pre><code>
* CALL sys.mark_partition_done('tableId', 'partition1', 'partition2', ...)
* </code></pre>
*/
public class MarkPartitionDoneProcedure extends ProcedureBase {

public static final String IDENTIFIER = "mark_partition_done";

public String[] call(
ProcedureContext procedureContext, String tableId, String... partitionStrings)
throws Catalog.TableNotExistException, IOException {
checkArgument(
partitionStrings.length > 0,
"mark_partition_done procedure must specify partitions.");

Identifier identifier = Identifier.fromString(tableId);
Table table = catalog.getTable(identifier);
checkArgument(
table instanceof FileStoreTable,
"Only FileStoreTable supports mark_partition_done procedure. The table type is '%s'.",
table.getClass().getName());

FileStoreTable fileStoreTable = (FileStoreTable) table;
CoreOptions coreOptions = fileStoreTable.coreOptions();
List<PartitionMarkDoneAction> actions =
PartitionMarkDoneAction.createActions(fileStoreTable, coreOptions);

List<String> partitionPaths =
PartitionPathUtils.generatePartitionPaths(
getPartitions(partitionStrings), fileStoreTable.store().partitionType());

markDone(partitionPaths, actions);

IOUtils.closeAllQuietly(actions);

return new String[] {"Success"};
}

@Override
public String identifier() {
return IDENTIFIER;
}
}
Loading

0 comments on commit f0dfb4c

Please sign in to comment.