Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flink] Set default recover strategy for TableEnvironment and StreamExecutionEnvironment in tests #3133

Merged
merged 2 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.paimon.types.RowType;

import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -66,10 +65,12 @@ public class CdcActionITCaseBase extends ActionITCaseBase {

@BeforeEach
public void setEnv() {
env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
env.enableCheckpointing(1000);
env.setRestartStrategy(RestartStrategies.noRestart());
env =
streamExecutionEnvironmentBuilder()
.streamingMode()
.parallelism(2)
.checkpointIntervalMs(1000)
.build();
}

@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.paimon.flink.util.AbstractTestBase;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
Expand Down Expand Up @@ -103,9 +102,8 @@ protected void doStart() {

@BeforeEach
public void setup() {
env = StreamExecutionEnvironment.getExecutionEnvironment();
env = streamExecutionEnvironmentBuilder().streamingMode().build();
tEnv = StreamTableEnvironment.create(env);
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
tEnv.getConfig()
.getConfiguration()
.set(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.paimon.utils.FailingFileIO;
import org.apache.paimon.utils.TraceableFileIO;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -148,12 +147,12 @@ private void innerTestRandomCdcEvents(Supplier<Integer> bucket, boolean unawareB
}

List<TestCdcEvent> events = mergeTestTableEvents(testTables);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getCheckpointConfig().setCheckpointInterval(100);
if (!enableFailure) {
env.setRestartStrategy(RestartStrategies.noRestart());
}
StreamExecutionEnvironment env =
streamExecutionEnvironmentBuilder()
.streamingMode()
.checkpointIntervalMs(100)
.allowRestart(enableFailure)
.build();

TestCdcSourceFunction sourceFunction = new TestCdcSourceFunction(events);
DataStreamSource<TestCdcEvent> source = env.addSource(sourceFunction);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.paimon.utils.FailingFileIO;
import org.apache.paimon.utils.TraceableFileIO;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.jupiter.api.Disabled;
Expand Down Expand Up @@ -145,12 +144,12 @@ private void innerTestRandomCdcEvents(
Collections.singletonList("pt"),
primaryKeys,
numBucket);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getCheckpointConfig().setCheckpointInterval(100);
if (!enableFailure) {
env.setRestartStrategy(RestartStrategies.noRestart());
}
StreamExecutionEnvironment env =
streamExecutionEnvironmentBuilder()
.streamingMode()
.checkpointIntervalMs(100)
.allowRestart(enableFailure)
.build();

TestCdcSourceFunction sourceFunction = new TestCdcSourceFunction(testTable.events());
DataStreamSource<TestCdcEvent> source = env.addSource(sourceFunction);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,17 @@ public void open(FunctionContext context) throws Exception {
}

private Collection<RowData> lookup(RowData keyRow) {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
Thread.currentThread()
.setContextClassLoader(AsyncLookupFunctionWrapper.class.getClassLoader());
try {
synchronized (function) {
return function.lookup(keyRow);
}
} catch (IOException e) {
throw new UncheckedIOException(e);
} finally {
Thread.currentThread().setContextClassLoader(cl);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@

import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
Expand All @@ -51,7 +50,6 @@

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
Expand All @@ -60,8 +58,6 @@
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;

/** ITCase for catalog. */
public abstract class CatalogITCaseBase extends AbstractTestBase {

Expand All @@ -71,7 +67,7 @@ public abstract class CatalogITCaseBase extends AbstractTestBase {

@BeforeEach
public void before() throws IOException {
tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
tEnv = tableEnvironmentBuilder().batchMode().build();
String catalog = "PAIMON";
path = getTempDirPath();
String inferScan =
Expand All @@ -89,8 +85,7 @@ public void before() throws IOException {
.collect(Collectors.joining(","))));
tEnv.useCatalog(catalog);

sEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL, Duration.ofMillis(100));
sEnv = tableEnvironmentBuilder().streamingMode().checkpointIntervalMs(100).build();
sEnv.registerCatalog(catalog, tEnv.getCatalog(catalog).get());
sEnv.useCatalog(catalog);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.paimon.utils.BlockingIterator;
import org.apache.paimon.utils.FailingFileIO;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.datastream.DataStream;
Expand Down Expand Up @@ -396,19 +395,16 @@ private static RowData srcRow(RowKind kind, int v, String p, int k) {
return wrap(GenericRowData.ofKind(kind, v, StringData.fromString(p), k));
}

public static StreamExecutionEnvironment buildStreamEnv() {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
env.enableCheckpointing(100);
env.setParallelism(2);
return env;
public StreamExecutionEnvironment buildStreamEnv() {
return streamExecutionEnvironmentBuilder()
.streamingMode()
.checkpointIntervalMs(100)
.parallelism(2)
.build();
}

public static StreamExecutionEnvironment buildBatchEnv() {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.setParallelism(2);
return env;
public StreamExecutionEnvironment buildBatchEnv() {
return streamExecutionEnvironmentBuilder().batchMode().parallelism(2).build();
}

public static FileStoreTable buildFileStoreTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@
import org.apache.paimon.fs.Path;
import org.apache.paimon.utils.BlockingIterator;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -55,18 +53,16 @@ public class FileSystemCatalogITCase extends AbstractTestBase {
private static final String DB_NAME = "default";

private String path;
private StreamTableEnvironment tEnv;
private TableEnvironment tEnv;

@BeforeEach
public void setup() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
env.setParallelism(1);

tEnv = StreamTableEnvironment.create(env);
tEnv.getConfig()
.getConfiguration()
.set(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, false);
tEnv =
tableEnvironmentBuilder()
.streamingMode()
.parallelism(1)
.setConf(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, false)
.build();
path = getTempDirPath();
tEnv.executeSql(
String.format("CREATE CATALOG fs WITH ('type'='paimon', 'warehouse'='%s')", path));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,8 @@

import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
Expand Down Expand Up @@ -59,7 +56,7 @@ public abstract class FlinkTestBase extends AbstractTestBase {
protected ExpectedResult expectedResult;
protected boolean ignoreException;

protected StreamTableEnvironment tEnv;
protected TableEnvironment tEnv;
protected String rootPath;

protected ResolvedCatalogTable resolvedTable =
Expand All @@ -79,14 +76,11 @@ protected void prepareEnv(
this.ignoreException = ignoreException;
this.expectedResult = expectedResult;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
EnvironmentSettings.Builder builder = EnvironmentSettings.newInstance().inBatchMode();
if (executionMode == RuntimeExecutionMode.STREAMING) {
env.enableCheckpointing(100);
builder.inStreamingMode();
tEnv = tableEnvironmentBuilder().streamingMode().checkpointIntervalMs(100).build();
} else {
tEnv = tableEnvironmentBuilder().batchMode().build();
}
tEnv = StreamTableEnvironment.create(env, builder.build());
rootPath = getTempDirPath();

tEnv.executeSql(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ public void testAsyncRetryLookup(LookupCacheMode cacheMode) throws Exception {

String query =
"SELECT /*+ LOOKUP('table'='D', 'retry-predicate'='lookup_miss',"
+ " 'retry-strategy'='fixed_delay', 'output-mode'='allow_unordered', 'fixed-delay'='3s','max-attempts'='60') */"
+ " 'retry-strategy'='fixed_delay', 'output-mode'='allow_unordered', 'fixed-delay'='3s','max-attempts'='30') */"
+ " T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM /*+ OPTIONS('lookup.async'='true') */ for system_time as of T.proctime AS D ON T.i = D.i";
BlockingIterator<Row, Row> iterator = BlockingIterator.of(sEnv.executeSql(query).collect());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.types.Row;
Expand All @@ -44,7 +43,7 @@ public class MappingTableITCase extends AbstractTestBase {

@BeforeEach
public void before() throws IOException {
tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
tEnv = tableEnvironmentBuilder().batchMode().build();
path = getTempDirPath();
}

Expand Down
Loading
Loading