diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/junit5/MiniClusterExtension.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/junit5/MiniClusterExtension.java index 1483873614661..db95d8e9a63aa 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/junit5/MiniClusterExtension.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/junit5/MiniClusterExtension.java @@ -145,6 +145,8 @@ public final class MiniClusterExtension private InternalMiniClusterExtension internalMiniClusterExtension; + private TestEnvironment executionEnvironment; + public MiniClusterExtension() { this( new MiniClusterResourceConfiguration.Builder() @@ -248,6 +250,10 @@ public void afterAll(ExtensionContext context) throws Exception { } } + public TestEnvironment getTestEnvironment() { + return this.executionEnvironment; + } + // Implementation private void registerEnv(InternalMiniClusterExtension internalMiniClusterExtension) { @@ -263,6 +269,12 @@ private void registerEnv(InternalMiniClusterExtension internalMiniClusterExtensi new TestEnvironment( internalMiniClusterExtension.getMiniCluster(), defaultParallelism, false); executionEnvironment.setAsContext(); + this.executionEnvironment = + new TestEnvironment( + internalMiniClusterExtension.getMiniCluster(), + internalMiniClusterExtension.getNumberSlots(), + false); + this.executionEnvironment.setAsContext(); TestStreamEnvironment.setAsContext( internalMiniClusterExtension.getMiniCluster(), defaultParallelism); } @@ -292,6 +304,14 @@ public Configuration getClientConfiguration() { return internalMiniClusterExtension.getClientConfiguration(); } + public Integer getNumberSlots() { + return internalMiniClusterExtension.getNumberSlots(); + } + + public boolean isRunning() { + return internalMiniClusterExtension.getMiniCluster().isRunning(); + } + private static class CloseableParameter implements ExtensionContext.Store.CloseableResource { private final T autoCloseable; diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java new file mode 100644 index 0000000000000..b428d2e984c57 --- /dev/null +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.util; + +import org.apache.flink.client.program.MiniClusterClient; +import org.apache.flink.runtime.client.JobStatusMessage; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.junit5.InjectClusterClient; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.util.FileUtils; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; + +/** + * Base class for unit tests that run multiple tests and want to reuse the same Flink cluster. This + * saves a significant amount of time, since the startup and shutdown of the Flink clusters + * (including actor systems, etc) usually dominates the execution of the actual tests. + * + *

To write a unit test against this test base, simply extend it and add one or more regular test + * methods and retrieve the StreamExecutionEnvironment from the context: + * + *

+ *   {@literal @}Test
+ *   public void someTest() {
+ *       ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ *       // test code
+ *       env.execute();
+ *   }
+ *
+ *   {@literal @}Test
+ *   public void anotherTest() {
+ *       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ *       // test code
+ *       env.execute();
+ *   }
+ *
+ * 
+ */ +public abstract class AbstractTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractTestBase.class); + + private static final int DEFAULT_PARALLELISM = 4; + + @RegisterExtension + public static final MiniClusterExtension MINI_CLUSTER_EXTENSION = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM) + .build()); + + @TempDir protected File temporaryFolder; + + @AfterEach + public final void cleanupRunningJobs(@InjectClusterClient MiniClusterClient clusterClient) + throws Exception { + if (!MINI_CLUSTER_EXTENSION.isRunning()) { + LOG.warn("Mini cluster is not running after the test!"); + return; + } + + for (JobStatusMessage path : clusterClient.listJobs().get()) { + if (!path.getJobState().isTerminalState()) { + try { + clusterClient.cancel(path.getJobId()).get(); + } catch (Exception ignored) { + // ignore exceptions when cancelling dangling jobs + } + } + } + } + + // -------------------------------------------------------------------------------------------- + // Temporary File Utilities + // -------------------------------------------------------------------------------------------- + + public String getTempDirPath(String dirName) throws IOException { + File f = createAndRegisterTempFile(dirName); + return f.toURI().toString(); + } + + public String getTempFilePath(String fileName) throws IOException { + File f = createAndRegisterTempFile(fileName); + return f.toURI().toString(); + } + + public String createTempFile(String fileName, String contents) throws IOException { + File f = createAndRegisterTempFile(fileName); + if (!f.getParentFile().exists()) { + f.getParentFile().mkdirs(); + } + f.createNewFile(); + FileUtils.writeFileUtf8(f, contents); + return f.toURI().toString(); + } + + public File createAndRegisterTempFile(String fileName) throws IOException { + return new File(temporaryFolder, fileName); + } +} diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBaseJUnit4.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBaseJUnit4.java index dc3dfa761d6e6..498a755a71888 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBaseJUnit4.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBaseJUnit4.java @@ -56,7 +56,10 @@ * } * * + * + * @deprecated Use {@link AbstractTestBase} instead. */ +@Deprecated public abstract class AbstractTestBaseJUnit4 extends TestLogger { private static final Logger LOG = LoggerFactory.getLogger(AbstractTestBaseJUnit4.class); diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java new file mode 100644 index 0000000000000..b503d0967297b --- /dev/null +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.util; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.java.ExecutionEnvironment; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.fail; + +/** + * Base class for unit tests that run a single test with object reuse enabled/disabled and against + * collection environments. + * + *

To write a unit test against this test base, simply extend it and implement the {@link + * #testProgram()} method. + * + *

To skip the execution against collection environments you have to override {@link + * #skipCollectionExecution()}. + */ +public abstract class JavaProgramTestBase extends AbstractTestBase { + + private JobExecutionResult latestExecutionResult; + + /** + * The number of times a test should be repeated. + * + *

This is useful for runtime changes, which affect resource management. Running certain + * tests repeatedly might help to discover resource leaks, race conditions etc. + */ + private int numberOfTestRepetitions = 1; + + private boolean isCollectionExecution; + + public void setNumberOfTestRepetitions(int numberOfTestRepetitions) { + this.numberOfTestRepetitions = numberOfTestRepetitions; + } + + public int getParallelism() { + return isCollectionExecution ? 1 : MINI_CLUSTER_EXTENSION.getNumberSlots(); + } + + public JobExecutionResult getLatestExecutionResult() { + return this.latestExecutionResult; + } + + public boolean isCollectionExecution() { + return isCollectionExecution; + } + + // -------------------------------------------------------------------------------------------- + // Methods to create the test program and for pre- and post- test work + // -------------------------------------------------------------------------------------------- + + protected abstract void testProgram() throws Exception; + + protected void preSubmit() throws Exception {} + + protected void postSubmit() throws Exception {} + + protected boolean skipCollectionExecution() { + return false; + } + + // -------------------------------------------------------------------------------------------- + // Test entry point + // -------------------------------------------------------------------------------------------- + + @Test + public void testJobWithObjectReuse() throws Exception { + isCollectionExecution = false; + + preSubmit(); + + // This only works because the underlying ExecutionEnvironment is a TestEnvironment + // We should fix that we are able to get access to the latest execution result from a + // different + // execution environment and how the object reuse mode is enabled + TestEnvironment env = MINI_CLUSTER_EXTENSION.getTestEnvironment(); + env.getConfig().enableObjectReuse(); + + // Possibly run the test multiple times + executeProgramMultipleTimes(env); + } + + private void executeProgramMultipleTimes(ExecutionEnvironment env) throws Exception { + for (int i = 0; i < numberOfTestRepetitions; i++) { + try { + testProgram(); + this.latestExecutionResult = env.getLastJobExecutionResult(); + } catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail("Error while calling the test program: " + e.getMessage()); + } + + assertThat(this.latestExecutionResult) + .as("The test program never triggered an execution.") + .isNotNull(); + } + + postSubmit(); + } + + @Test + public void testJobWithoutObjectReuse() throws Exception { + isCollectionExecution = false; + preSubmit(); + + // This only works because the underlying ExecutionEnvironment is a TestEnvironment + // We should fix that we are able to get access to the latest execution result from a + // different + // execution environment and how the object reuse mode is enabled + ExecutionEnvironment env = MINI_CLUSTER_EXTENSION.getTestEnvironment(); + env.getConfig().disableObjectReuse(); + + // Possibly run the test multiple times + executeProgramMultipleTimes(env); + } + + @Test + public void testJobCollectionExecution() throws Exception { + + // check if collection execution should be skipped. + if (this.skipCollectionExecution()) { + return; + } + + isCollectionExecution = true; + + preSubmit(); + // prepare the test environment + CollectionTestEnvironment env = new CollectionTestEnvironment(); + env.setAsContext(); + + // call the test program + try { + testProgram(); + this.latestExecutionResult = env.getLastJobExecutionResult(); + } catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail("Error while calling the test program: " + e.getMessage()); + } finally { + MINI_CLUSTER_EXTENSION.getTestEnvironment().setAsContext(); + } + + assertThat(this.latestExecutionResult) + .as("The test program never triggered an execution.") + .isNotNull(); + + postSubmit(); + } +} diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBaseJUnit4.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBaseJUnit4.java index 491f7359ea7db..98c888c6ad792 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBaseJUnit4.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBaseJUnit4.java @@ -33,7 +33,10 @@ * *

To skip the execution against collection environments you have to override {@link * #skipCollectionExecution()}. + * + * @deprecated Use {@link JavaProgramTestBase} instead. */ +@Deprecated public abstract class JavaProgramTestBaseJUnit4 extends AbstractTestBaseJUnit4 { private JobExecutionResult latestExecutionResult; diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java new file mode 100644 index 0000000000000..19628a09b32f1 --- /dev/null +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.util; + +import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; + +import java.util.Arrays; +import java.util.Collection; + +/** + * Base class for unit tests that run multiple tests and want to reuse the same Flink cluster. This + * saves a significant amount of time, since the startup and shutdown of the Flink clusters + * (including actor systems, etc) usually dominates the execution of the actual tests. + * + *

To write a unit test against this test base, simply extend it and add one or more regular test + * methods and retrieve the ExecutionEnvironment from the context: + * + *

{@code
+ * {@literal @}Test
+ * public void someTest() {
+ *     ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ *     // test code
+ *     env.execute();
+ * }
+ *
+ * {@literal @}Test
+ * public void anotherTest() {
+ *     ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ *     // test code
+ *     env.execute();
+ * }
+ *
+ * }
+ */ +public class MultipleProgramsTestBase extends AbstractTestBase { + + /** + * Enum that defines which execution environment to run the next test on: An embedded local + * flink cluster, or the collection execution backend. + */ + public enum TestExecutionMode { + CLUSTER, + CLUSTER_OBJECT_REUSE, + COLLECTION, + } + + // ------------------------------------------------------------------------ + + @Parameter protected TestExecutionMode mode; + + // ------------------------------------------------------------------------ + // Environment setup & teardown + // ------------------------------------------------------------------------ + + @BeforeEach + public void setupEnvironment() { + TestEnvironment testEnvironment; + switch (mode) { + case CLUSTER: + // This only works because of the quirks we built in the TestEnvironment. + // We should refactor this in the future!!! + testEnvironment = MINI_CLUSTER_EXTENSION.getTestEnvironment(); + testEnvironment.getConfig().disableObjectReuse(); + testEnvironment.setAsContext(); + break; + case CLUSTER_OBJECT_REUSE: + // This only works because of the quirks we built in the TestEnvironment. + // We should refactor this in the future!!! + testEnvironment = MINI_CLUSTER_EXTENSION.getTestEnvironment(); + testEnvironment.getConfig().enableObjectReuse(); + testEnvironment.setAsContext(); + break; + case COLLECTION: + new CollectionTestEnvironment().setAsContext(); + break; + } + } + + @AfterEach + public void teardownEnvironment() { + switch (mode) { + case CLUSTER: + case CLUSTER_OBJECT_REUSE: + TestEnvironment.unsetAsContext(); + break; + case COLLECTION: + CollectionTestEnvironment.unsetAsContext(); + break; + } + } + + // ------------------------------------------------------------------------ + // Parametrization lets the tests run in cluster and collection mode + // ------------------------------------------------------------------------ + + @Parameters(name = "Execution mode = {0}") + public static Collection executionModes() { + return Arrays.asList(TestExecutionMode.CLUSTER, TestExecutionMode.COLLECTION); + } +} diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBaseJUnit4.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBaseJUnit4.java index c0cda8071711b..40eac84b8c61c 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBaseJUnit4.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBaseJUnit4.java @@ -49,7 +49,10 @@ * } * * } + * + * @deprecated Use {@link MultipleProgramsTestBase} instead. */ +@Deprecated public class MultipleProgramsTestBaseJUnit4 extends AbstractTestBaseJUnit4 { /**