Skip to content

Commit

Permalink
[FLINK-29050][test-utils] Use JUnit5 to re-write AbstractTestBase, Ja…
Browse files Browse the repository at this point in the history
…vaProgramTestBase MultipleProgramsTestBase and tag the corresponding old implementation classes as deprecated
  • Loading branch information
RocMarshal authored and 1996fanrui committed Apr 25, 2024
1 parent ffa639a commit 29a0455
Show file tree
Hide file tree
Showing 7 changed files with 445 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ public final class MiniClusterExtension

private InternalMiniClusterExtension internalMiniClusterExtension;

private TestEnvironment executionEnvironment;

public MiniClusterExtension() {
this(
new MiniClusterResourceConfiguration.Builder()
Expand Down Expand Up @@ -248,6 +250,10 @@ public void afterAll(ExtensionContext context) throws Exception {
}
}

public TestEnvironment getTestEnvironment() {
return this.executionEnvironment;
}

// Implementation

private void registerEnv(InternalMiniClusterExtension internalMiniClusterExtension) {
Expand All @@ -263,6 +269,12 @@ private void registerEnv(InternalMiniClusterExtension internalMiniClusterExtensi
new TestEnvironment(
internalMiniClusterExtension.getMiniCluster(), defaultParallelism, false);
executionEnvironment.setAsContext();
this.executionEnvironment =
new TestEnvironment(
internalMiniClusterExtension.getMiniCluster(),
internalMiniClusterExtension.getNumberSlots(),
false);
this.executionEnvironment.setAsContext();
TestStreamEnvironment.setAsContext(
internalMiniClusterExtension.getMiniCluster(), defaultParallelism);
}
Expand Down Expand Up @@ -292,6 +304,14 @@ public Configuration getClientConfiguration() {
return internalMiniClusterExtension.getClientConfiguration();
}

public Integer getNumberSlots() {
return internalMiniClusterExtension.getNumberSlots();
}

public boolean isRunning() {
return internalMiniClusterExtension.getMiniCluster().isRunning();
}

private static class CloseableParameter<T extends AutoCloseable>
implements ExtensionContext.Store.CloseableResource {
private final T autoCloseable;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.test.util;

import org.apache.flink.client.program.MiniClusterClient;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.junit5.InjectClusterClient;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.util.FileUtils;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;

/**
* Base class for unit tests that run multiple tests and want to reuse the same Flink cluster. This
* saves a significant amount of time, since the startup and shutdown of the Flink clusters
* (including actor systems, etc) usually dominates the execution of the actual tests.
*
* <p>To write a unit test against this test base, simply extend it and add one or more regular test
* methods and retrieve the StreamExecutionEnvironment from the context:
*
* <pre>
* {@literal @}Test
* public void someTest() {
* ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
* // test code
* env.execute();
* }
*
* {@literal @}Test
* public void anotherTest() {
* StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
* // test code
* env.execute();
* }
*
* </pre>
*/
public abstract class AbstractTestBase {

private static final Logger LOG = LoggerFactory.getLogger(AbstractTestBase.class);

private static final int DEFAULT_PARALLELISM = 4;

@RegisterExtension
public static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
new MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
.build());

@TempDir protected File temporaryFolder;

@AfterEach
public final void cleanupRunningJobs(@InjectClusterClient MiniClusterClient clusterClient)
throws Exception {
if (!MINI_CLUSTER_EXTENSION.isRunning()) {
LOG.warn("Mini cluster is not running after the test!");
return;
}

for (JobStatusMessage path : clusterClient.listJobs().get()) {
if (!path.getJobState().isTerminalState()) {
try {
clusterClient.cancel(path.getJobId()).get();
} catch (Exception ignored) {
// ignore exceptions when cancelling dangling jobs
}
}
}
}

// --------------------------------------------------------------------------------------------
// Temporary File Utilities
// --------------------------------------------------------------------------------------------

public String getTempDirPath(String dirName) throws IOException {
File f = createAndRegisterTempFile(dirName);
return f.toURI().toString();
}

public String getTempFilePath(String fileName) throws IOException {
File f = createAndRegisterTempFile(fileName);
return f.toURI().toString();
}

public String createTempFile(String fileName, String contents) throws IOException {
File f = createAndRegisterTempFile(fileName);
if (!f.getParentFile().exists()) {
f.getParentFile().mkdirs();
}
f.createNewFile();
FileUtils.writeFileUtf8(f, contents);
return f.toURI().toString();
}

public File createAndRegisterTempFile(String fileName) throws IOException {
return new File(temporaryFolder, fileName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@
* }
*
* </pre>
*
* @deprecated Use {@link AbstractTestBase} instead.
*/
@Deprecated
public abstract class AbstractTestBaseJUnit4 extends TestLogger {

private static final Logger LOG = LoggerFactory.getLogger(AbstractTestBaseJUnit4.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.test.util;

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.ExecutionEnvironment;

import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.fail;

/**
* Base class for unit tests that run a single test with object reuse enabled/disabled and against
* collection environments.
*
* <p>To write a unit test against this test base, simply extend it and implement the {@link
* #testProgram()} method.
*
* <p>To skip the execution against collection environments you have to override {@link
* #skipCollectionExecution()}.
*/
public abstract class JavaProgramTestBase extends AbstractTestBase {

private JobExecutionResult latestExecutionResult;

/**
* The number of times a test should be repeated.
*
* <p>This is useful for runtime changes, which affect resource management. Running certain
* tests repeatedly might help to discover resource leaks, race conditions etc.
*/
private int numberOfTestRepetitions = 1;

private boolean isCollectionExecution;

public void setNumberOfTestRepetitions(int numberOfTestRepetitions) {
this.numberOfTestRepetitions = numberOfTestRepetitions;
}

public int getParallelism() {
return isCollectionExecution ? 1 : MINI_CLUSTER_EXTENSION.getNumberSlots();
}

public JobExecutionResult getLatestExecutionResult() {
return this.latestExecutionResult;
}

public boolean isCollectionExecution() {
return isCollectionExecution;
}

// --------------------------------------------------------------------------------------------
// Methods to create the test program and for pre- and post- test work
// --------------------------------------------------------------------------------------------

protected abstract void testProgram() throws Exception;

protected void preSubmit() throws Exception {}

protected void postSubmit() throws Exception {}

protected boolean skipCollectionExecution() {
return false;
}

// --------------------------------------------------------------------------------------------
// Test entry point
// --------------------------------------------------------------------------------------------

@Test
public void testJobWithObjectReuse() throws Exception {
isCollectionExecution = false;

preSubmit();

// This only works because the underlying ExecutionEnvironment is a TestEnvironment
// We should fix that we are able to get access to the latest execution result from a
// different
// execution environment and how the object reuse mode is enabled
TestEnvironment env = MINI_CLUSTER_EXTENSION.getTestEnvironment();
env.getConfig().enableObjectReuse();

// Possibly run the test multiple times
executeProgramMultipleTimes(env);
}

private void executeProgramMultipleTimes(ExecutionEnvironment env) throws Exception {
for (int i = 0; i < numberOfTestRepetitions; i++) {
try {
testProgram();
this.latestExecutionResult = env.getLastJobExecutionResult();
} catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
fail("Error while calling the test program: " + e.getMessage());
}

assertThat(this.latestExecutionResult)
.as("The test program never triggered an execution.")
.isNotNull();
}

postSubmit();
}

@Test
public void testJobWithoutObjectReuse() throws Exception {
isCollectionExecution = false;
preSubmit();

// This only works because the underlying ExecutionEnvironment is a TestEnvironment
// We should fix that we are able to get access to the latest execution result from a
// different
// execution environment and how the object reuse mode is enabled
ExecutionEnvironment env = MINI_CLUSTER_EXTENSION.getTestEnvironment();
env.getConfig().disableObjectReuse();

// Possibly run the test multiple times
executeProgramMultipleTimes(env);
}

@Test
public void testJobCollectionExecution() throws Exception {

// check if collection execution should be skipped.
if (this.skipCollectionExecution()) {
return;
}

isCollectionExecution = true;

preSubmit();
// prepare the test environment
CollectionTestEnvironment env = new CollectionTestEnvironment();
env.setAsContext();

// call the test program
try {
testProgram();
this.latestExecutionResult = env.getLastJobExecutionResult();
} catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
fail("Error while calling the test program: " + e.getMessage());
} finally {
MINI_CLUSTER_EXTENSION.getTestEnvironment().setAsContext();
}

assertThat(this.latestExecutionResult)
.as("The test program never triggered an execution.")
.isNotNull();

postSubmit();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@
*
* <p>To skip the execution against collection environments you have to override {@link
* #skipCollectionExecution()}.
*
* @deprecated Use {@link JavaProgramTestBase} instead.
*/
@Deprecated
public abstract class JavaProgramTestBaseJUnit4 extends AbstractTestBaseJUnit4 {

private JobExecutionResult latestExecutionResult;
Expand Down
Loading

0 comments on commit 29a0455

Please sign in to comment.