Skip to content

Commit

Permalink
[fix] address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin committed Sep 18, 2023
1 parent ae416b9 commit aff327a
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,14 @@
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.CompactAction;
import org.apache.paimon.flink.action.SortCompactAction;
import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;

import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.procedure.ProcedureContext;
import org.apache.flink.table.procedures.Procedure;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC;
import static org.apache.paimon.flink.action.ActionFactory.parseCommaSeparatedKeyValues;

/**
Expand All @@ -52,7 +46,7 @@
* CALL compact('tableId', '', '', partition1, partition2, ...)
* </code></pre>
*/
public class CompactProcedure implements Procedure {
public class CompactProcedure extends ProcedureBase {

private final String warehouse;
private final Map<String, String> catalogOptions;
Expand Down Expand Up @@ -119,14 +113,6 @@ public String[] call(
StreamExecutionEnvironment env = procedureContext.getExecutionEnvironment();
action.build(env);

ReadableConfig conf = StreamExecutionEnvironmentUtils.getConfiguration(env);
String name = conf.getOptional(PipelineOptions.NAME).orElse(jobName);
if (conf.get(TABLE_DML_SYNC)) {
env.execute(name);
return new String[] {"Success"};
} else {
JobClient jobClient = env.executeAsync(name);
return new String[] {"JobID=" + jobClient.getJobID()};
}
return execute(env, jobName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.procedure;

import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;

import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.procedures.Procedure;

import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC;

/** Base implementation for flink {@link Procedure}. */
public class ProcedureBase implements Procedure {

protected String[] execute(StreamExecutionEnvironment env, String defaultJobName)
throws Exception {
ReadableConfig conf = StreamExecutionEnvironmentUtils.getConfiguration(env);
String name = conf.getOptional(PipelineOptions.NAME).orElse(defaultJobName);
JobClient jobClient = env.executeAsync(name);
String jobId = jobClient.getJobID().toString();
if (conf.get(TABLE_DML_SYNC)) {
try {
jobClient.getJobExecutionResult().get();
} catch (Exception e) {
throw new TableException(String.format("Failed to wait job '%s' finish", jobId), e);
}
return new String[] {"Success"};
} else {
return new String[] {"JobID=" + jobId};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;

Expand Down Expand Up @@ -157,7 +158,7 @@ protected StreamExecutionEnvironment buildDefaultEnv(boolean isStreaming) {
return env;
}

protected void callProcedure(String procedureStatement, boolean isStreaming) {
protected void callProcedure(String procedureStatement, boolean isStreaming, boolean dmlSync) {
StreamExecutionEnvironment env = buildDefaultEnv(isStreaming);

TableEnvironment tEnv;
Expand All @@ -171,6 +172,8 @@ protected void callProcedure(String procedureStatement, boolean isStreaming) {
tEnv = StreamTableEnvironment.create(env, EnvironmentSettings.inBatchMode());
}

tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, dmlSync);

tEnv.executeSql(
String.format(
"CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse'='%s');",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,8 @@ private void callProcedure(boolean isStreaming) {
String.format(
"CALL compact('%s.%s', '', '', '%s', '%s')",
database, tableName, "dt=20221208,hh=15", "dt=20221209,hh=15"),
isStreaming);
isStreaming,
!isStreaming);
}

private List<Map<String, String>> getSpecifiedPartitions() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,8 @@ private void callProcedure(String orderStrategy, List<String> orderByColumns) {
String.format(
"CALL compact('%s.%s', '%s', '%s')",
database, tableName, orderStrategy, String.join(",", orderByColumns)),
false);
false,
true);
}

public void createTable() throws Exception {
Expand Down

0 comments on commit aff327a

Please sign in to comment.