Skip to content

Commit

Permalink
[improve]
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin committed Sep 18, 2023
1 parent aff327a commit 410681a
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.paimon.flink.procedure;

import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.CompactAction;
import org.apache.paimon.flink.action.SortCompactAction;
Expand Down Expand Up @@ -48,12 +50,8 @@
*/
public class CompactProcedure extends ProcedureBase {

private final String warehouse;
private final Map<String, String> catalogOptions;

public CompactProcedure(String warehouse, Map<String, String> catalogOptions) {
this.warehouse = warehouse;
this.catalogOptions = catalogOptions;
public CompactProcedure(Catalog catalog) {
super(catalog);
}

public String[] call(ProcedureContext procedureContext, String tableId) throws Exception {
Expand All @@ -76,6 +74,8 @@ public String[] call(
String orderByColumns,
String... partitionStrings)
throws Exception {
String warehouse = ((AbstractCatalog) catalog).warehouse();
Map<String, String> catalogOptions = ((AbstractCatalog) catalog).options();
Identifier identifier = Identifier.fromString(tableId);
CompactAction action;
String jobName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.flink.procedure;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;

import org.apache.flink.configuration.PipelineOptions;
Expand All @@ -32,6 +33,12 @@
/** Base implementation for flink {@link Procedure}. */
public class ProcedureBase implements Procedure {

protected final Catalog catalog;

ProcedureBase(Catalog catalog) {
this.catalog = catalog;
}

protected String[] execute(StreamExecutionEnvironment env, String defaultJobName)
throws Exception {
ReadableConfig conf = StreamExecutionEnvironmentUtils.getConfiguration(env);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.paimon.flink.procedure;

import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.flink.action.CompactActionFactory;

Expand All @@ -29,8 +28,6 @@
import java.util.List;
import java.util.Optional;

import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Utility methods for {@link Procedure}. */
public class ProcedureUtil {

Expand All @@ -47,15 +44,9 @@ public static List<String> listProcedures() {
}

public static Optional<Procedure> getProcedure(Catalog catalog, String procedureName) {
checkArgument(
catalog instanceof AbstractCatalog,
"Currently, only Paimon built-in AbstractCatalog supports procedure.");
AbstractCatalog abstractCatalog = (AbstractCatalog) catalog;
switch (procedureName) {
case CompactActionFactory.IDENTIFIER:
return Optional.of(
new CompactProcedure(
abstractCatalog.warehouse(), abstractCatalog.options()));
return Optional.of(new CompactProcedure(catalog));
default:
return Optional.empty();
}
Expand Down

0 comments on commit 410681a

Please sign in to comment.