Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[proposal] Reduce redundant code #9

Open
wants to merge 3 commits into
base: 34200/rescale-itcase
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -367,21 +367,21 @@ public static void waitForCheckpoint(JobID jobID, MiniCluster miniCluster, int n
});
}

/** Wait for on more completed checkpoint. */
public static void waitForOneMoreCheckpoint(JobID jobID, MiniCluster miniCluster)
throws Exception {
final long[] currentCheckpoint = new long[] {-1L};
/**
* Wait for a new completed checkpoint, the new checkpoint must be triggered after
* waitForNewCheckpoint is called.
*/
public static void waitForNewCheckpoint(JobID jobID, MiniCluster miniCluster) throws Exception {
final long startTime = System.currentTimeMillis();
waitUntilCondition(
() -> {
AccessExecutionGraph graph = miniCluster.getExecutionGraph(jobID).get();
CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot();
if (snapshot != null) {
long currentCount = snapshot.getCounts().getNumberOfCompletedCheckpoints();
if (currentCheckpoint[0] < 0L) {
currentCheckpoint[0] = currentCount;
} else {
return currentCount > currentCheckpoint[0];
}
final CompletedCheckpointStats latestCompletedCheckpoint =
snapshot.getHistory().getLatestCompletedCheckpoint();
return latestCompletedCheckpoint != null
&& latestCompletedCheckpoint.getTriggerTimestamp() > startTime;
} else if (graph.getState().isGloballyTerminalState()) {
checkState(
graph.getFailureInfo() != null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.ThrowingRunnable;

import org.junit.AfterClass;
import org.junit.Before;
Expand All @@ -86,7 +87,7 @@
import java.util.concurrent.TimeUnit;

import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForOneMoreCheckpoint;
import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForNewCheckpoint;
import static org.apache.flink.test.scheduling.UpdateJobResourceRequirementsITCase.waitForAvailableSlots;
import static org.apache.flink.test.scheduling.UpdateJobResourceRequirementsITCase.waitForRunningTasks;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -163,6 +164,8 @@ public void setup() throws Exception {
NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL, buffersPerChannel);

config.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Adaptive);
// Disable the scaling cooldown to speed up the test
config.set(JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN, Duration.ofMillis(0));

// speed the test suite up
// - lower refresh interval -> controls how fast we invalidate ExecutionGraphCache
Expand Down Expand Up @@ -205,27 +208,49 @@ public void testCheckpointRescalingOutKeyedState() throws Exception {
* parallelism.
*/
public void testCheckpointRescalingKeyedState(boolean scaleOut) throws Exception {
final int numberKeys = 42;
final int numberElements = 1000;
final int parallelism = scaleOut ? totalSlots / 2 : totalSlots;
final int parallelism2 = scaleOut ? totalSlots : totalSlots / 2;
final int initialParallelism = scaleOut ? totalSlots / 2 : totalSlots;
final int parallelismAfterRescaling = scaleOut ? totalSlots : totalSlots / 2;
final int maxParallelism = 13;
final int numberOfKeys = 42;
final int numberOfElements = 1000;

final JobGraph jobGraph =
createJobGraphWithKeyedState(
new Configuration(),
initialParallelism,
maxParallelism,
numberOfKeys,
numberOfElements);
testRescalingBasedOnCheckpoints(
jobGraph,
initialParallelism,
parallelismAfterRescaling,
maxParallelism,
numberOfKeys,
numberOfElements,
() -> {
waitForRunningTasks(
restClusterClient, jobGraph.getJobID(), 2 * parallelismAfterRescaling);
waitForAvailableSlots(
restClusterClient, totalSlots - parallelismAfterRescaling);
});
}

Duration timeout = Duration.ofMinutes(3);
Deadline deadline = Deadline.now().plus(timeout);
private void testRescalingBasedOnCheckpoints(
JobGraph jobGraph,
int initialParallelism,
int parallelismAfterRescaling,
int maxParallelism,
int numberKeys,
int numberElements,
ThrowingRunnable<? extends Exception> waitForJobAfterRescale)
throws Exception {
final Duration timeout = Duration.ofMinutes(3);
final Deadline deadline = Deadline.now().plus(timeout);

ClusterClient<?> client = cluster.getClusterClient();
final ClusterClient<?> client = cluster.getClusterClient();

try {

JobGraph jobGraph =
createJobGraphWithKeyedState(
new Configuration(),
parallelism,
maxParallelism,
numberKeys,
numberElements);

final JobID jobID = jobGraph.getJobID();

client.submitJob(jobGraph).get();
Expand All @@ -239,18 +264,16 @@ public void testCheckpointRescalingKeyedState(boolean scaleOut) throws Exception
deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));

// verify the current state

Set<Tuple2<Integer, Integer>> actualResult = CollectionSink.getElementsSet();

Set<Tuple2<Integer, Integer>> expectedResult = new HashSet<>();
final Set<Tuple2<Integer, Integer>> actualResult = CollectionSink.getElementsSet();
final Set<Tuple2<Integer, Integer>> expectedResult = new HashSet<>();

for (int key = 0; key < numberKeys; key++) {
int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism);

expectedResult.add(
Tuple2.of(
KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(
maxParallelism, parallelism, keyGroupIndex),
maxParallelism, initialParallelism, keyGroupIndex),
numberElements * key));
}

Expand All @@ -259,41 +282,43 @@ public void testCheckpointRescalingKeyedState(boolean scaleOut) throws Exception
// clear the CollectionSink set for the restarted job
CollectionSink.clearElementsSet();

waitForAllTaskRunning(cluster.getMiniCluster(), jobGraph.getJobID(), false);

waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster());
// We need to wait for a checkpoint to be completed that was triggered after all the
// data was processed. That ensures the entire data being flushed out of the Operator's
// network buffers to avoid reprocessing test data twice after the restore (see
// FLINK-34200).
waitForNewCheckpoint(jobID, cluster.getMiniCluster());

SubtaskIndexSource.SOURCE_LATCH.reset();

JobResourceRequirements.Builder builder = JobResourceRequirements.newBuilder();
final JobResourceRequirements.Builder builder = JobResourceRequirements.newBuilder();
for (JobVertex vertex : jobGraph.getVertices()) {
builder.setParallelismForJobVertex(vertex.getID(), parallelism2, parallelism2);
final int vertexParallelism =
Math.min(vertex.getMaxParallelism(), parallelismAfterRescaling);
builder.setParallelismForJobVertex(
vertex.getID(), vertexParallelism, vertexParallelism);
}

restClusterClient.updateJobResourceRequirements(jobID, builder.build()).join();

waitForRunningTasks(restClusterClient, jobID, 2 * parallelism2);
waitForAvailableSlots(restClusterClient, totalSlots - parallelism2);
waitForJobAfterRescale.run();

SubtaskIndexSource.SOURCE_LATCH.trigger();

client.requestJobResult(jobID).get();

Set<Tuple2<Integer, Integer>> actualResult2 = CollectionSink.getElementsSet();

Set<Tuple2<Integer, Integer>> expectedResult2 = new HashSet<>();
final Set<Tuple2<Integer, Integer>> actualResult2 = CollectionSink.getElementsSet();
final Set<Tuple2<Integer, Integer>> expectedResult2 = new HashSet<>();

for (int key = 0; key < numberKeys; key++) {
int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism);
expectedResult2.add(
Tuple2.of(
KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(
maxParallelism, parallelism2, keyGroupIndex),
maxParallelism, parallelismAfterRescaling, keyGroupIndex),
key * 2 * numberElements));
}

assertEquals(expectedResult2, actualResult2);

} finally {
// clear the CollectionSink set for the restarted job
CollectionSink.clearElementsSet();
Expand Down Expand Up @@ -328,7 +353,7 @@ public void testCheckpointRescalingNonPartitionedStateCausesException() throws E
// wait until the operator handles some data
StateSourceBase.workStartedLatch.await();

waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster());
waitForNewCheckpoint(jobID, cluster.getMiniCluster());

JobResourceRequirements.Builder builder = JobResourceRequirements.newBuilder();
for (JobVertex vertex : jobGraph.getVertices()) {
Expand Down Expand Up @@ -356,103 +381,33 @@ public void testCheckpointRescalingNonPartitionedStateCausesException() throws E
*/
@Test
public void testCheckpointRescalingWithKeyedAndNonPartitionedState() throws Exception {
int numberKeys = 42;
int numberElements = 1000;
int parallelism = totalSlots / 2;
int parallelism2 = totalSlots;
int maxParallelism = 13;

Duration timeout = Duration.ofMinutes(3);
Deadline deadline = Deadline.now().plus(timeout);

ClusterClient<?> client = cluster.getClusterClient();

try {

JobGraph jobGraph =
createJobGraphWithKeyedAndNonPartitionedOperatorState(
parallelism,
maxParallelism,
parallelism,
numberKeys,
numberElements,
numberElements);

final JobID jobID = jobGraph.getJobID();

client.submitJob(jobGraph).get();

SubtaskIndexSource.SOURCE_LATCH.trigger();

// wait til the sources have emitted numberElements for each key and completed a
// checkpoint
assertTrue(
SubtaskIndexFlatMapper.workCompletedLatch.await(
deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));

// verify the current state

Set<Tuple2<Integer, Integer>> actualResult = CollectionSink.getElementsSet();

Set<Tuple2<Integer, Integer>> expectedResult = new HashSet<>();

for (int key = 0; key < numberKeys; key++) {
int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism);

expectedResult.add(
Tuple2.of(
KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(
maxParallelism, parallelism, keyGroupIndex),
numberElements * key));
}

assertEquals(expectedResult, actualResult);

// clear the CollectionSink set for the restarted job
CollectionSink.clearElementsSet();

waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster());

SubtaskIndexSource.SOURCE_LATCH.reset();

JobResourceRequirements.Builder builder = JobResourceRequirements.newBuilder();
for (JobVertex vertex : jobGraph.getVertices()) {
if (vertex.getMaxParallelism() >= parallelism2) {
builder.setParallelismForJobVertex(vertex.getID(), parallelism2, parallelism2);
} else {
builder.setParallelismForJobVertex(
vertex.getID(), vertex.getMaxParallelism(), vertex.getMaxParallelism());
}
}

restClusterClient.updateJobResourceRequirements(jobID, builder.build()).join();

waitForRunningTasks(restClusterClient, jobID, parallelism2);
waitForAvailableSlots(restClusterClient, totalSlots - parallelism2);

SubtaskIndexSource.SOURCE_LATCH.trigger();

client.requestJobResult(jobID).get();

Set<Tuple2<Integer, Integer>> actualResult2 = CollectionSink.getElementsSet();

Set<Tuple2<Integer, Integer>> expectedResult2 = new HashSet<>();

for (int key = 0; key < numberKeys; key++) {
int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism);
expectedResult2.add(
Tuple2.of(
KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(
maxParallelism, parallelism2, keyGroupIndex),
key * 2 * numberElements));
}

assertEquals(expectedResult2, actualResult2);

} finally {
// clear the CollectionSink set for the restarted job
CollectionSink.clearElementsSet();
}
final int initialParallelism = totalSlots / 2;
final int parallelismAfterRescaling = totalSlots;
final int maxParallelism = 13;
final int numberOfKeys = 42;
final int numberOfElements = 1000;

final JobGraph jobGraph =
createJobGraphWithKeyedAndNonPartitionedOperatorState(
initialParallelism,
maxParallelism,
initialParallelism,
numberOfKeys,
numberOfElements,
numberOfElements);
testRescalingBasedOnCheckpoints(
jobGraph,
initialParallelism,
parallelismAfterRescaling,
maxParallelism,
numberOfKeys,
numberOfElements,
() -> {
waitForRunningTasks(
restClusterClient, jobGraph.getJobID(), parallelismAfterRescaling);
waitForAvailableSlots(
restClusterClient, totalSlots - parallelismAfterRescaling);
});
}

@Test
Expand Down Expand Up @@ -513,7 +468,7 @@ public void testCheckpointRescalingPartitionedOperatorState(
// wait until the operator handles some data
StateSourceBase.workStartedLatch.await();

waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster());
waitForNewCheckpoint(jobID, cluster.getMiniCluster());

JobResourceRequirements.Builder builder = JobResourceRequirements.newBuilder();
for (JobVertex vertex : jobGraph.getVertices()) {
Expand Down