Skip to content

Commit

Permalink
[proposal] Reduce redundant code
Browse files Browse the repository at this point in the history
  • Loading branch information
XComp committed Feb 5, 2024
1 parent 7265b6b commit 41ec003
Showing 1 changed file with 82 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.ThrowingRunnable;

import org.junit.AfterClass;
import org.junit.Before;
Expand Down Expand Up @@ -207,27 +208,49 @@ public void testCheckpointRescalingOutKeyedState() throws Exception {
* parallelism.
*/
public void testCheckpointRescalingKeyedState(boolean scaleOut) throws Exception {
final int numberKeys = 42;
final int numberElements = 1000;
final int parallelism = scaleOut ? totalSlots / 2 : totalSlots;
final int parallelism2 = scaleOut ? totalSlots : totalSlots / 2;
final int initialParallelism = scaleOut ? totalSlots / 2 : totalSlots;
final int parallelismAfterRescaling = scaleOut ? totalSlots : totalSlots / 2;
final int maxParallelism = 13;
final int numberOfKeys = 42;
final int numberOfElements = 1000;

final JobGraph jobGraph =
createJobGraphWithKeyedState(
new Configuration(),
initialParallelism,
maxParallelism,
numberOfKeys,
numberOfElements);
testRescalingBasedOnCheckpoints(
jobGraph,
initialParallelism,
parallelismAfterRescaling,
maxParallelism,
numberOfKeys,
numberOfElements,
() -> {
waitForRunningTasks(
restClusterClient, jobGraph.getJobID(), 2 * parallelismAfterRescaling);
waitForAvailableSlots(
restClusterClient, totalSlots - parallelismAfterRescaling);
});
}

Duration timeout = Duration.ofMinutes(3);
Deadline deadline = Deadline.now().plus(timeout);
private void testRescalingBasedOnCheckpoints(
JobGraph jobGraph,
int initialParallelism,
int parallelismAfterRescaling,
int maxParallelism,
int numberKeys,
int numberElements,
ThrowingRunnable<? extends Exception> waitForJobAfterRescale)
throws Exception {
final Duration timeout = Duration.ofMinutes(3);
final Deadline deadline = Deadline.now().plus(timeout);

ClusterClient<?> client = cluster.getClusterClient();
final ClusterClient<?> client = cluster.getClusterClient();

try {

JobGraph jobGraph =
createJobGraphWithKeyedState(
new Configuration(),
parallelism,
maxParallelism,
numberKeys,
numberElements);

final JobID jobID = jobGraph.getJobID();

client.submitJob(jobGraph).get();
Expand All @@ -241,18 +264,16 @@ public void testCheckpointRescalingKeyedState(boolean scaleOut) throws Exception
deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));

// verify the current state

Set<Tuple2<Integer, Integer>> actualResult = CollectionSink.getElementsSet();

Set<Tuple2<Integer, Integer>> expectedResult = new HashSet<>();
final Set<Tuple2<Integer, Integer>> actualResult = CollectionSink.getElementsSet();
final Set<Tuple2<Integer, Integer>> expectedResult = new HashSet<>();

for (int key = 0; key < numberKeys; key++) {
int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism);

expectedResult.add(
Tuple2.of(
KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(
maxParallelism, parallelism, keyGroupIndex),
maxParallelism, initialParallelism, keyGroupIndex),
numberElements * key));
}

Expand All @@ -261,47 +282,43 @@ public void testCheckpointRescalingKeyedState(boolean scaleOut) throws Exception
// clear the CollectionSink set for the restarted job
CollectionSink.clearElementsSet();

waitForAllTaskRunning(cluster.getMiniCluster(), jobGraph.getJobID(), false);

// We must wait for a checkpoint that is triggered after calling waitForNewCheckpoint.
// This test will fail if the job recovers from a checkpoint triggered before
// `SubtaskIndexFlatMapper.workCompletedLatch.await` and after calling
// `waitForNewCheckpoint`. Because `SubtaskIndexFlatMapper` expects
// `ValueState<Integer> counter` and `ValueState<Integer> sum` after recovery from
// the checkpoint to be the count and sum of all data.
// We need to wait for a checkpoint to be completed that was triggered after all the
// data was processed. That ensures the entire data being flushed out of the Operator's
// network buffers to avoid reprocessing test data twice after the restore (see
// FLINK-34200).
waitForNewCheckpoint(jobID, cluster.getMiniCluster());

SubtaskIndexSource.SOURCE_LATCH.reset();

JobResourceRequirements.Builder builder = JobResourceRequirements.newBuilder();
final JobResourceRequirements.Builder builder = JobResourceRequirements.newBuilder();
for (JobVertex vertex : jobGraph.getVertices()) {
builder.setParallelismForJobVertex(vertex.getID(), parallelism2, parallelism2);
final int vertexParallelism =
Math.min(vertex.getMaxParallelism(), parallelismAfterRescaling);
builder.setParallelismForJobVertex(
vertex.getID(), vertexParallelism, vertexParallelism);
}

restClusterClient.updateJobResourceRequirements(jobID, builder.build()).join();

waitForRunningTasks(restClusterClient, jobID, 2 * parallelism2);
waitForAvailableSlots(restClusterClient, totalSlots - parallelism2);
waitForJobAfterRescale.run();

SubtaskIndexSource.SOURCE_LATCH.trigger();

client.requestJobResult(jobID).get();

Set<Tuple2<Integer, Integer>> actualResult2 = CollectionSink.getElementsSet();

Set<Tuple2<Integer, Integer>> expectedResult2 = new HashSet<>();
final Set<Tuple2<Integer, Integer>> actualResult2 = CollectionSink.getElementsSet();
final Set<Tuple2<Integer, Integer>> expectedResult2 = new HashSet<>();

for (int key = 0; key < numberKeys; key++) {
int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism);
expectedResult2.add(
Tuple2.of(
KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(
maxParallelism, parallelism2, keyGroupIndex),
maxParallelism, parallelismAfterRescaling, keyGroupIndex),
key * 2 * numberElements));
}

assertEquals(expectedResult2, actualResult2);

} finally {
// clear the CollectionSink set for the restarted job
CollectionSink.clearElementsSet();
Expand Down Expand Up @@ -364,103 +381,33 @@ public void testCheckpointRescalingNonPartitionedStateCausesException() throws E
*/
@Test
public void testCheckpointRescalingWithKeyedAndNonPartitionedState() throws Exception {
int numberKeys = 42;
int numberElements = 1000;
int parallelism = totalSlots / 2;
int parallelism2 = totalSlots;
int maxParallelism = 13;

Duration timeout = Duration.ofMinutes(3);
Deadline deadline = Deadline.now().plus(timeout);

ClusterClient<?> client = cluster.getClusterClient();

try {

JobGraph jobGraph =
createJobGraphWithKeyedAndNonPartitionedOperatorState(
parallelism,
maxParallelism,
parallelism,
numberKeys,
numberElements,
numberElements);

final JobID jobID = jobGraph.getJobID();

client.submitJob(jobGraph).get();

SubtaskIndexSource.SOURCE_LATCH.trigger();

// wait til the sources have emitted numberElements for each key and completed a
// checkpoint
assertTrue(
SubtaskIndexFlatMapper.workCompletedLatch.await(
deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));

// verify the current state

Set<Tuple2<Integer, Integer>> actualResult = CollectionSink.getElementsSet();

Set<Tuple2<Integer, Integer>> expectedResult = new HashSet<>();

for (int key = 0; key < numberKeys; key++) {
int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism);

expectedResult.add(
Tuple2.of(
KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(
maxParallelism, parallelism, keyGroupIndex),
numberElements * key));
}

assertEquals(expectedResult, actualResult);

// clear the CollectionSink set for the restarted job
CollectionSink.clearElementsSet();

waitForNewCheckpoint(jobID, cluster.getMiniCluster());

SubtaskIndexSource.SOURCE_LATCH.reset();

JobResourceRequirements.Builder builder = JobResourceRequirements.newBuilder();
for (JobVertex vertex : jobGraph.getVertices()) {
if (vertex.getMaxParallelism() >= parallelism2) {
builder.setParallelismForJobVertex(vertex.getID(), parallelism2, parallelism2);
} else {
builder.setParallelismForJobVertex(
vertex.getID(), vertex.getMaxParallelism(), vertex.getMaxParallelism());
}
}

restClusterClient.updateJobResourceRequirements(jobID, builder.build()).join();

waitForRunningTasks(restClusterClient, jobID, parallelism2);
waitForAvailableSlots(restClusterClient, totalSlots - parallelism2);

SubtaskIndexSource.SOURCE_LATCH.trigger();

client.requestJobResult(jobID).get();

Set<Tuple2<Integer, Integer>> actualResult2 = CollectionSink.getElementsSet();

Set<Tuple2<Integer, Integer>> expectedResult2 = new HashSet<>();

for (int key = 0; key < numberKeys; key++) {
int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism);
expectedResult2.add(
Tuple2.of(
KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(
maxParallelism, parallelism2, keyGroupIndex),
key * 2 * numberElements));
}

assertEquals(expectedResult2, actualResult2);

} finally {
// clear the CollectionSink set for the restarted job
CollectionSink.clearElementsSet();
}
final int initialParallelism = totalSlots / 2;
final int parallelismAfterRescaling = totalSlots;
final int maxParallelism = 13;
final int numberOfKeys = 42;
final int numberOfElements = 1000;

final JobGraph jobGraph =
createJobGraphWithKeyedAndNonPartitionedOperatorState(
initialParallelism,
maxParallelism,
initialParallelism,
numberOfKeys,
numberOfElements,
numberOfElements);
testRescalingBasedOnCheckpoints(
jobGraph,
initialParallelism,
parallelismAfterRescaling,
maxParallelism,
numberOfKeys,
numberOfElements,
() -> {
waitForRunningTasks(
restClusterClient, jobGraph.getJobID(), parallelismAfterRescaling);
waitForAvailableSlots(
restClusterClient, totalSlots - parallelismAfterRescaling);
});
}

@Test
Expand Down

0 comments on commit 41ec003

Please sign in to comment.