diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index 2fd7b694b7373..ebcfc369c827a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -349,7 +349,7 @@ public int getRescaleOnFailedCheckpointCount() { } private final Settings settings; - private final RescaleManager.Factory rescaleManagerFactory; + private final StateTransitionManager.Factory stateTransitionManagerFactory; private final JobGraph jobGraph; @@ -427,7 +427,7 @@ public AdaptiveScheduler( throws JobExecutionException { this( settings, - DefaultRescaleManager.Factory.fromSettings(settings), + DefaultStateTransitionManager.Factory.fromSettings(settings), (metricGroup, checkpointStatsListener) -> new DefaultCheckpointStatsTracker( configuration.get(WebOptions.CHECKPOINTS_HISTORY_SIZE), @@ -455,7 +455,7 @@ public AdaptiveScheduler( @VisibleForTesting AdaptiveScheduler( Settings settings, - RescaleManager.Factory rescaleManagerFactory, + StateTransitionManager.Factory stateTransitionManagerFactory, BiFunction checkpointStatsTrackerFactory, JobGraph jobGraph, @@ -480,7 +480,7 @@ public AdaptiveScheduler( assertPreconditions(jobGraph); this.settings = settings; - this.rescaleManagerFactory = rescaleManagerFactory; + this.stateTransitionManagerFactory = stateTransitionManagerFactory; this.jobGraph = jobGraph; this.jobInfo = new JobInfoImpl(jobGraph.getJobID(), jobGraph.getName()); @@ -1175,7 +1175,7 @@ public void goToExecuting( this, userCodeClassLoader, failureCollection, - rescaleManagerFactory, + stateTransitionManagerFactory, settings.getMinParallelismChangeForDesiredRescale(), settings.getRescaleOnFailedCheckpointCount())); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultRescaleManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultRescaleManager.java deleted file mode 100644 index 0b0fc013357e0..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultRescaleManager.java +++ /dev/null @@ -1,232 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.scheduler.adaptive; - -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.util.Preconditions; -import org.apache.flink.util.concurrent.FutureUtils; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nullable; -import javax.annotation.concurrent.NotThreadSafe; - -import java.time.Duration; -import java.time.Instant; -import java.time.temporal.Temporal; -import java.util.concurrent.CompletableFuture; -import java.util.function.Supplier; - -/** - * {@code DefaultRescaleManager} manages triggering the next rescaling based on when the previous - * rescale operation happened and the available resources. It handles the event based on the - * following phases (in that order): - * - *
    - *
  1. Cooldown phase: No rescaling takes place (its upper threshold is defined by {@code - * scalingIntervalMin}. - *
  2. Soft-rescaling phase: Rescaling is triggered if the desired amount of resources is - * available. - *
  3. Hard-rescaling phase: Rescaling is triggered if a sufficient amount of resources is - * available (its lower threshold is defined by (@code scalingIntervalMax}). - *
- * - *

Thread-safety: This class is not implemented in a thread-safe manner and relies on the fact - * that any method call happens within a single thread. - * - * @see Executing - */ -@NotThreadSafe -public class DefaultRescaleManager implements RescaleManager { - - private static final Logger LOG = LoggerFactory.getLogger(DefaultRescaleManager.class); - - private final Temporal initializationTime; - private final Supplier clock; - - @VisibleForTesting final Duration scalingIntervalMin; - @VisibleForTesting @Nullable final Duration scalingIntervalMax; - - private final RescaleManager.Context rescaleContext; - - private boolean rescaleScheduled = false; - - @VisibleForTesting final Duration maxTriggerDelay; - - /** - * {@code triggerFuture} is used to allow triggering a scheduled callback. Rather than - * scheduling the callback itself, the callback is just chained with the future. The completion - * of the future is then scheduled which will, as a consequence, run the callback as part of the - * scheduled operation. - * - *

{@code triggerFuture} can be used to trigger the callback even earlier (before the - * scheduled delay has passed). See {@link #onTrigger()}. - */ - private CompletableFuture triggerFuture; - - DefaultRescaleManager( - Temporal initializationTime, - RescaleManager.Context rescaleContext, - Duration scalingIntervalMin, - @Nullable Duration scalingIntervalMax, - Duration maxTriggerDelay) { - this( - initializationTime, - Instant::now, - rescaleContext, - scalingIntervalMin, - scalingIntervalMax, - maxTriggerDelay); - } - - @VisibleForTesting - DefaultRescaleManager( - Temporal initializationTime, - Supplier clock, - RescaleManager.Context rescaleContext, - Duration scalingIntervalMin, - @Nullable Duration scalingIntervalMax, - Duration maxTriggerDelay) { - this.initializationTime = initializationTime; - this.clock = clock; - - this.maxTriggerDelay = maxTriggerDelay; - this.triggerFuture = FutureUtils.completedVoidFuture(); - - Preconditions.checkArgument( - scalingIntervalMax == null || scalingIntervalMin.compareTo(scalingIntervalMax) <= 0, - "scalingIntervalMax should at least match or be longer than scalingIntervalMin."); - this.scalingIntervalMin = scalingIntervalMin; - this.scalingIntervalMax = scalingIntervalMax; - - this.rescaleContext = rescaleContext; - } - - @Override - public void onChange() { - if (this.triggerFuture.isDone()) { - this.triggerFuture = scheduleOperationWithTrigger(this::evaluateChangeEvent); - } - } - - @Override - public void onTrigger() { - if (!this.triggerFuture.isDone()) { - this.triggerFuture.complete(null); - LOG.debug( - "A rescale trigger event was observed causing the rescale verification logic to be initiated."); - } else { - LOG.debug( - "A rescale trigger event was observed outside of a rescale cycle. No action taken."); - } - } - - private void evaluateChangeEvent() { - if (timeSinceLastRescale().compareTo(scalingIntervalMin) > 0) { - maybeRescale(); - } else if (!rescaleScheduled) { - rescaleScheduled = true; - rescaleContext.scheduleOperation(this::maybeRescale, scalingIntervalMin); - } - } - - private CompletableFuture scheduleOperationWithTrigger(Runnable callback) { - final CompletableFuture triggerFuture = new CompletableFuture<>(); - triggerFuture.thenRun(callback); - this.rescaleContext.scheduleOperation( - () -> triggerFuture.complete(null), this.maxTriggerDelay); - - return triggerFuture; - } - - private Duration timeSinceLastRescale() { - return Duration.between(this.initializationTime, clock.get()); - } - - private void maybeRescale() { - rescaleScheduled = false; - if (rescaleContext.hasDesiredResources()) { - LOG.info("Desired parallelism for job was reached: Rescaling will be triggered."); - rescaleContext.rescale(); - } else if (scalingIntervalMax != null) { - LOG.info( - "The longer the pipeline runs, the more the (small) resource gain is worth the restarting time. " - + "Last resource added does not meet the configured minimal parallelism change. Forced rescaling will be triggered after {} if the resource is still there.", - scalingIntervalMax); - - // reasoning for inconsistent scheduling: - // https://lists.apache.org/thread/m2w2xzfjpxlw63j0k7tfxfgs0rshhwwr - if (timeSinceLastRescale().compareTo(scalingIntervalMax) > 0) { - rescaleWithSufficientResources(); - } else { - rescaleContext.scheduleOperation( - this::rescaleWithSufficientResources, scalingIntervalMax); - } - } - } - - private void rescaleWithSufficientResources() { - if (rescaleContext.hasSufficientResources()) { - LOG.info( - "Resources for desired job parallelism couldn't be collected after {}: Rescaling will be enforced.", - scalingIntervalMax); - rescaleContext.rescale(); - } - } - - public static class Factory implements RescaleManager.Factory { - - private final Duration scalingIntervalMin; - @Nullable private final Duration scalingIntervalMax; - private final Duration maximumDelayForTrigger; - - /** - * Creates a {@code Factory} instance based on the {@link AdaptiveScheduler}'s {@code - * Settings} for rescaling. - */ - public static Factory fromSettings(AdaptiveScheduler.Settings settings) { - // it's not ideal that we use a AdaptiveScheduler internal class here. We might want to - // change that as part of a more general alignment of the rescaling configuration. - return new Factory( - settings.getScalingIntervalMin(), - settings.getScalingIntervalMax(), - settings.getMaximumDelayForTriggeringRescale()); - } - - private Factory( - Duration scalingIntervalMin, - @Nullable Duration scalingIntervalMax, - Duration maximumDelayForTrigger) { - this.scalingIntervalMin = scalingIntervalMin; - this.scalingIntervalMax = scalingIntervalMax; - this.maximumDelayForTrigger = maximumDelayForTrigger; - } - - @Override - public DefaultRescaleManager create(Context rescaleContext, Instant lastRescale) { - return new DefaultRescaleManager( - lastRescale, - rescaleContext, - scalingIntervalMin, - scalingIntervalMax, - maximumDelayForTrigger); - } - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java new file mode 100644 index 0000000000000..aa1ea2c965d3c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.NotThreadSafe; + +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.Temporal; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ScheduledFuture; +import java.util.function.Supplier; + +/** + * {@code DefaultStateTransitionManager} is a state machine which manages the {@link + * AdaptiveScheduler}'s state transitions based on the previous transition time and the available + * resources. See {@link Phase} for details on each individual phase of this state machine. Note: We + * use the term phase here to avoid confusion with the state used in the {@link AdaptiveScheduler}. + * + *

+ * {@link Cooldown}
+ *   |
+ *   +--> {@link Idling}
+ *   |      |
+ *   |      V
+ *   +--> {@link Stabilizing}
+ *          |
+ *          +--> {@link Stabilized} --> {@link Idling}
+ *          |      |
+ *          |      V
+ *          \--> {@link Transitioning}
+ * 
+ * + *

Thread-safety: This class is not implemented in a thread-safe manner and relies on the fact + * that any method call happens within a single thread. + * + * @see Executing + */ +@NotThreadSafe +public class DefaultStateTransitionManager implements StateTransitionManager { + + private static final Logger LOG = LoggerFactory.getLogger(DefaultStateTransitionManager.class); + + private final Supplier clock; + private final StateTransitionManager.Context transitionContext; + private Phase phase; + private final List> scheduledFutures; + + @VisibleForTesting final Duration cooldownTimeout; + @Nullable @VisibleForTesting final Duration resourceStabilizationTimeout; + @VisibleForTesting final Duration maxTriggerDelay; + + DefaultStateTransitionManager( + Temporal initializationTime, + StateTransitionManager.Context transitionContext, + Duration cooldownTimeout, + @Nullable Duration resourceStabilizationTimeout, + Duration maxTriggerDelay) { + this( + initializationTime, + Instant::now, + transitionContext, + cooldownTimeout, + resourceStabilizationTimeout, + maxTriggerDelay); + } + + @VisibleForTesting + DefaultStateTransitionManager( + Temporal initializationTime, + Supplier clock, + StateTransitionManager.Context transitionContext, + Duration cooldownTimeout, + @Nullable Duration resourceStabilizationTimeout, + Duration maxTriggerDelay) { + + this.clock = clock; + this.maxTriggerDelay = maxTriggerDelay; + this.cooldownTimeout = cooldownTimeout; + this.resourceStabilizationTimeout = resourceStabilizationTimeout; + this.transitionContext = transitionContext; + this.scheduledFutures = new ArrayList<>(); + this.phase = new Cooldown(initializationTime, clock, this, cooldownTimeout); + } + + @Override + public void onChange() { + phase.onChange(); + } + + @Override + public void onTrigger() { + phase.onTrigger(); + } + + @Override + public void close() { + scheduledFutures.forEach(future -> future.cancel(true)); + scheduledFutures.clear(); + } + + @VisibleForTesting + Phase getPhase() { + return phase; + } + + private void progressToIdling() { + progressToPhase(new Idling(clock, this)); + } + + private void progressToStabilizing(Temporal firstChangeEventTimestamp) { + progressToPhase( + new Stabilizing( + clock, + this, + resourceStabilizationTimeout, + firstChangeEventTimestamp, + maxTriggerDelay)); + } + + private void progressToStabilized(Temporal firstChangeEventTimestamp) { + progressToPhase(new Stabilized(clock, this, firstChangeEventTimestamp, maxTriggerDelay)); + } + + private void triggerTransitionToSubsequentState() { + progressToPhase(new Transitioning(clock, this)); + transitionContext.transitionToSubsequentState(); + } + + private void progressToPhase(Phase newPhase) { + Preconditions.checkState( + !(phase instanceof Transitioning), + "The state transition operation has already been triggered."); + LOG.debug("Transitioning from {} to {}.", phase, newPhase); + phase = newPhase; + } + + @VisibleForTesting + void scheduleFromNow(Runnable callback, Duration delay, Phase phase) { + scheduledFutures.add( + transitionContext.scheduleOperation(() -> runIfPhase(phase, callback), delay)); + } + + private void runIfPhase(Phase expectedPhase, Runnable callback) { + if (getPhase() == expectedPhase) { + callback.run(); + } else { + LOG.debug( + "Ignoring scheduled action because expected phase {} is not the actual phase {}.", + expectedPhase, + getPhase()); + } + } + + /** Factory for creating {@link DefaultStateTransitionManager} instances. */ + public static class Factory implements StateTransitionManager.Factory { + + private final Duration cooldownTimeout; + @Nullable private final Duration resourceStabilizationTimeout; + private final Duration maximumDelayForTrigger; + + /** + * Creates a {@code Factory} instance based on the {@link AdaptiveScheduler}'s {@code + * Settings} for rescaling. + */ + public static Factory fromSettings(AdaptiveScheduler.Settings settings) { + // it's not ideal that we use a AdaptiveScheduler internal class here. We might want to + // change that as part of a more general alignment of the rescaling configuration. + return new Factory( + settings.getScalingIntervalMin(), + settings.getScalingIntervalMax(), + settings.getMaximumDelayForTriggeringRescale()); + } + + private Factory( + Duration cooldownTimeout, + @Nullable Duration resourceStabilizationTimeout, + Duration maximumDelayForTrigger) { + this.cooldownTimeout = cooldownTimeout; + this.resourceStabilizationTimeout = resourceStabilizationTimeout; + this.maximumDelayForTrigger = maximumDelayForTrigger; + } + + @Override + public DefaultStateTransitionManager create(Context context, Instant lastStateTransition) { + return new DefaultStateTransitionManager( + lastStateTransition, + context, + cooldownTimeout, + resourceStabilizationTimeout, + maximumDelayForTrigger); + } + } + + /** + * A phase in the state machine of the {@link DefaultStateTransitionManager}. Each phase is + * responsible for a specific part of the state transition process. + */ + @VisibleForTesting + abstract static class Phase { + + private final Supplier clock; + private final DefaultStateTransitionManager context; + + @VisibleForTesting + Phase(Supplier clock, DefaultStateTransitionManager context) { + this.clock = clock; + this.context = context; + } + + Temporal now() { + return clock.get(); + } + + DefaultStateTransitionManager context() { + return context; + } + + void scheduleRelativelyTo(Runnable callback, Temporal startOfTimeout, Duration timeout) { + final Duration passedTimeout = Duration.between(startOfTimeout, now()); + Preconditions.checkArgument( + !passedTimeout.isNegative(), + "The startOfTimeout ({}) should be in the past but is after the current time.", + startOfTimeout); + + final Duration timeoutLeft = timeout.minus(passedTimeout); + scheduleFromNow(callback, timeoutLeft.isNegative() ? Duration.ZERO : timeoutLeft); + } + + void scheduleFromNow(Runnable callback, Duration delay) { + context.scheduleFromNow(callback, delay, this); + } + + boolean hasDesiredResources() { + return context.transitionContext.hasDesiredResources(); + } + + boolean hasSufficientResources() { + return context.transitionContext.hasSufficientResources(); + } + + void onChange() {} + + void onTrigger() {} + } + + /** + * {@link Phase} to prevent any rescaling. {@link StateTransitionManager#onChange()} events will + * be monitored and forwarded to the next phase. {@link StateTransitionManager#onTrigger()} + * events will be ignored. + */ + @VisibleForTesting + static final class Cooldown extends Phase { + + @Nullable private Temporal firstChangeEventTimestamp; + + private Cooldown( + Temporal timeOfLastRescale, + Supplier clock, + DefaultStateTransitionManager context, + Duration cooldownTimeout) { + super(clock, context); + + this.scheduleRelativelyTo(this::finalizeCooldown, timeOfLastRescale, cooldownTimeout); + } + + @Override + void onChange() { + if (hasSufficientResources() && firstChangeEventTimestamp == null) { + firstChangeEventTimestamp = now(); + } + } + + private void finalizeCooldown() { + if (firstChangeEventTimestamp == null) { + context().progressToIdling(); + } else { + context().progressToStabilizing(firstChangeEventTimestamp); + } + } + } + + /** + * {@link Phase} which follows the {@link Cooldown} phase if no {@link + * StateTransitionManager#onChange()} was observed, yet. The {@code + * DefaultStateTransitionManager} waits for a first {@link StateTransitionManager#onChange()} + * event. {@link StateTransitionManager#onTrigger()} events will be ignored. + */ + @VisibleForTesting + static final class Idling extends Phase { + + private Idling(Supplier clock, DefaultStateTransitionManager context) { + super(clock, context); + } + + @Override + void onChange() { + if (hasSufficientResources()) { + context().progressToStabilizing(now()); + } + } + } + + /** + * {@link Phase} that handles the resources stabilization. In this phase, {@link + * StateTransitionManager#onTrigger()} will initiate rescaling if desired resources are met and + * {@link StateTransitionManager#onChange()} will schedule the evaluation of the desired + * resources. + */ + static final class Stabilizing extends Phase { + + private Temporal onChangeEventTimestamp; + private final Duration maxTriggerDelay; + private boolean evaluationScheduled = false; + + private Stabilizing( + Supplier clock, + DefaultStateTransitionManager context, + @Nullable Duration resourceStabilizationTimeout, + Temporal firstOnChangeEventTimestamp, + Duration maxTriggerDelay) { + super(clock, context); + this.onChangeEventTimestamp = firstOnChangeEventTimestamp; + this.maxTriggerDelay = maxTriggerDelay; + + if (resourceStabilizationTimeout != null) { + scheduleRelativelyTo( + () -> context().progressToStabilized(firstOnChangeEventTimestamp), + firstOnChangeEventTimestamp, + resourceStabilizationTimeout); + } + scheduleTransitionEvaluation(); + } + + @Override + void onChange() { + // schedule another desired-resource evaluation in scenarios where the previous change + // event was already handled by a onTrigger callback with a no-op + onChangeEventTimestamp = now(); + scheduleTransitionEvaluation(); + } + + @Override + void onTrigger() { + transitionToSubSequentStateForDesiredResources(); + } + + private void scheduleTransitionEvaluation() { + if (!evaluationScheduled) { + evaluationScheduled = true; + this.scheduleRelativelyTo( + () -> { + evaluationScheduled = false; + transitionToSubSequentStateForDesiredResources(); + }, + onChangeEventTimestamp, + maxTriggerDelay); + } + } + + private void transitionToSubSequentStateForDesiredResources() { + if (hasDesiredResources()) { + context().triggerTransitionToSubsequentState(); + } else { + LOG.debug( + "Desired resources are not met, skipping the transition to the subsequent state."); + } + } + } + + /** + * {@link Phase} that handles the post-stabilization phase. A {@link + * StateTransitionManager#onTrigger()} event initiates rescaling if sufficient resources are + * available; otherwise transitioning to {@link Idling} will be performed. + */ + @VisibleForTesting + static final class Stabilized extends Phase { + + private Stabilized( + Supplier clock, + DefaultStateTransitionManager context, + Temporal firstChangeEventTimestamp, + Duration maxTriggerDelay) { + super(clock, context); + this.scheduleRelativelyTo(this::onTrigger, firstChangeEventTimestamp, maxTriggerDelay); + } + + @Override + void onTrigger() { + if (hasSufficientResources()) { + context().triggerTransitionToSubsequentState(); + } else { + LOG.debug("Sufficient resources are not met, progressing to idling."); + context().progressToIdling(); + } + } + } + + /** + * In this final {@link Phase} no additional transition is possible: {@link + * StateTransitionManager#onChange()} and {@link StateTransitionManager#onTrigger()} events will + * be ignored. + */ + @VisibleForTesting + static final class Transitioning extends Phase { + private Transitioning(Supplier clock, DefaultStateTransitionManager context) { + super(clock, context); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java index 1fcd23884f5a1..f7dcb8d97cfee 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java @@ -58,13 +58,13 @@ /** State which represents a running job with an {@link ExecutionGraph} and assigned slots. */ class Executing extends StateWithExecutionGraph - implements ResourceListener, RescaleManager.Context, CheckpointStatsListener { + implements ResourceListener, StateTransitionManager.Context, CheckpointStatsListener { private final Context context; private final RescalingController sufficientResourcesController; private final RescalingController desiredResourcesController; - private final RescaleManager rescaleManager; + private final StateTransitionManager stateTransitionManager; private final int rescaleOnFailedCheckpointCount; // null indicates that there was no change event observed, yet @Nullable private AtomicInteger failedCheckpointCountdown; @@ -77,7 +77,7 @@ class Executing extends StateWithExecutionGraph Context context, ClassLoader userCodeClassLoader, List failureCollection, - RescaleManager.Factory rescaleManagerFactory, + StateTransitionManager.Factory stateTransitionManagerFactory, int minParallelismChangeForRescale, int rescaleOnFailedCheckpointCount, Instant lastRescale) { @@ -96,7 +96,7 @@ class Executing extends StateWithExecutionGraph this.sufficientResourcesController = new EnforceParallelismChangeRescalingController(); this.desiredResourcesController = new EnforceMinimalIncreaseRescalingController(minParallelismChangeForRescale); - this.rescaleManager = rescaleManagerFactory.create(this, lastRescale); + this.stateTransitionManager = stateTransitionManagerFactory.create(this, lastRescale); Preconditions.checkArgument( rescaleOnFailedCheckpointCount > 0, @@ -110,8 +110,8 @@ class Executing extends StateWithExecutionGraph context.runIfState( this, () -> { - rescaleManager.onChange(); - rescaleManager.onTrigger(); + stateTransitionManager.onChange(); + stateTransitionManager.onTrigger(); }, Duration.ZERO); } @@ -147,12 +147,12 @@ private static VertexParallelism extractCurrentVertexParallelism( } @Override - public void scheduleOperation(Runnable callback, Duration delay) { - context.runIfState(this, callback, delay); + public ScheduledFuture scheduleOperation(Runnable callback, Duration delay) { + return context.runIfState(this, callback, delay); } @Override - public void rescale() { + public void transitionToSubsequentState() { context.goToRestarting( getExecutionGraph(), getExecutionGraphHandler(), @@ -186,6 +186,12 @@ void onGloballyTerminalState(JobStatus globallyTerminalState) { context.goToFinished(ArchivedExecutionGraph.createFrom(getExecutionGraph())); } + @Override + public void onLeave(Class newState) { + stateTransitionManager.close(); + super.onLeave(newState); + } + private void deploy() { for (ExecutionJobVertex executionJobVertex : getExecutionGraph().getVerticesTopologically()) { @@ -212,13 +218,13 @@ private void handleDeploymentFailure(ExecutionVertex executionVertex, JobExcepti @Override public void onNewResourcesAvailable() { - rescaleManager.onChange(); + stateTransitionManager.onChange(); initializeFailedCheckpointCountdownIfUnset(); } @Override public void onNewResourceRequirements() { - rescaleManager.onChange(); + stateTransitionManager.onChange(); initializeFailedCheckpointCountdownIfUnset(); } @@ -236,7 +242,7 @@ public void onFailedCheckpoint() { } private void triggerPotentialRescale() { - rescaleManager.onTrigger(); + stateTransitionManager.onTrigger(); this.failedCheckpointCountdown = null; } @@ -322,7 +328,7 @@ static class Factory implements StateFactory { private final OperatorCoordinatorHandler operatorCoordinatorHandler; private final ClassLoader userCodeClassLoader; private final List failureCollection; - private final RescaleManager.Factory rescaleManagerFactory; + private final StateTransitionManager.Factory stateTransitionManagerFactory; private final int minParallelismChangeForRescale; private final int rescaleOnFailedCheckpointCount; @@ -334,7 +340,7 @@ static class Factory implements StateFactory { Context context, ClassLoader userCodeClassLoader, List failureCollection, - RescaleManager.Factory rescaleManagerFactory, + StateTransitionManager.Factory stateTransitionManagerFactory, int minParallelismChangeForRescale, int rescaleOnFailedCheckpointCount) { this.context = context; @@ -344,7 +350,7 @@ static class Factory implements StateFactory { this.operatorCoordinatorHandler = operatorCoordinatorHandler; this.userCodeClassLoader = userCodeClassLoader; this.failureCollection = failureCollection; - this.rescaleManagerFactory = rescaleManagerFactory; + this.stateTransitionManagerFactory = stateTransitionManagerFactory; this.minParallelismChangeForRescale = minParallelismChangeForRescale; this.rescaleOnFailedCheckpointCount = rescaleOnFailedCheckpointCount; } @@ -362,7 +368,7 @@ public Executing getState() { context, userCodeClassLoader, failureCollection, - rescaleManagerFactory, + stateTransitionManagerFactory, minParallelismChangeForRescale, rescaleOnFailedCheckpointCount, Instant.now()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/RescaleManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitionManager.java similarity index 55% rename from flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/RescaleManager.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitionManager.java index a2e53bbe0274e..a6a6aaaca2e9d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/RescaleManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitionManager.java @@ -20,21 +20,30 @@ import java.time.Duration; import java.time.Instant; +import java.util.concurrent.ScheduledFuture; -/** The {@code RescaleManager} decides on whether rescaling should happen or not. */ -public interface RescaleManager { +/** + * The {@code StateTransitionManager} decides on whether {@link AdaptiveScheduler} state transition + * should happen or not. + */ +public interface StateTransitionManager { - /** Is called if the environment changed in a way that a rescaling could be considered. */ + /** + * Is called if the environment changed in a way that a state transition could be considered. + */ void onChange(); /** * Is called when any previous observed environment changes shall be verified possibly - * triggering a rescale operation. + * triggering a state transition operation. */ void onTrigger(); + /** Is called when the state transition manager should be closed. */ + default void close() {} + /** - * The interface that can be used by the {@code RescaleManager} to communicate with the + * The interface that can be used by the {@code StateTransitionManager} to communicate with the * underlying system. */ interface Context { @@ -51,20 +60,24 @@ interface Context { */ boolean hasDesiredResources(); - /** Triggers the rescaling of the job. */ - void rescale(); + /** Triggers the transition to the subsequent state of the {@link AdaptiveScheduler}. */ + void transitionToSubsequentState(); - /** Runs operation with a given delay in the underlying main thread. */ - void scheduleOperation(Runnable callback, Duration delay); + /** + * Runs operation with a given delay in the underlying main thread. + * + * @return a ScheduledFuture representing pending completion of the operation. + */ + ScheduledFuture scheduleOperation(Runnable callback, Duration delay); } - /** Interface for creating {@code RescaleManager} instances. */ + /** Interface for creating {@code StateTransitionManager} instances. */ interface Factory { /** - * Creates a {@code RescaleManager} instance for the given {@code rescaleContext} and - * previous rescale time. + * Creates a {@code StateTransitionManager} instance for the given {@code Context} and + * previous state transition time. */ - RescaleManager create(Context rescaleContext, Instant lastRescale); + StateTransitionManager create(Context context, Instant lastStateTransition); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java index 433c6d66a9068..26bd64e68b8ef 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java @@ -97,7 +97,7 @@ public class AdaptiveSchedulerBuilder { /** * {@code null} indicates that the default factory will be used based on the set configuration. */ - @Nullable private RescaleManager.Factory rescaleManagerFactory = null; + @Nullable private StateTransitionManager.Factory stateTransitionManagerFactory = null; private BiFunction checkpointStatsTrackerFactory = @@ -224,9 +224,9 @@ public AdaptiveSchedulerBuilder setSlotAllocator(SlotAllocator slotAllocator) { return this; } - public AdaptiveSchedulerBuilder setRescaleManagerFactory( - @Nullable RescaleManager.Factory rescaleManagerFactory) { - this.rescaleManagerFactory = rescaleManagerFactory; + public AdaptiveSchedulerBuilder setStateTransitionManagerFactory( + @Nullable StateTransitionManager.Factory stateTransitionManagerFactory) { + this.stateTransitionManagerFactory = stateTransitionManagerFactory; return this; } @@ -259,9 +259,9 @@ public AdaptiveScheduler build() throws Exception { AdaptiveScheduler.Settings.of(jobMasterConfiguration); return new AdaptiveScheduler( settings, - rescaleManagerFactory == null - ? DefaultRescaleManager.Factory.fromSettings(settings) - : rescaleManagerFactory, + stateTransitionManagerFactory == null + ? DefaultStateTransitionManager.Factory.fromSettings(settings) + : stateTransitionManagerFactory, checkpointStatsTrackerFactory, jobGraph, jobResourceRequirements, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java index ab0b96fa765ed..5bc3d859c483d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java @@ -2306,8 +2306,8 @@ private AdaptiveScheduler createSchedulerThatReachesExecutingState( EXECUTOR_RESOURCE.getExecutor()) .setJobMasterConfiguration(config) .setDeclarativeSlotPool(slotPool) - .setRescaleManagerFactory( - new TestingRescaleManager.Factory( + .setStateTransitionManagerFactory( + new TestingStateTransitionManager.Factory( () -> {}, () -> { singleThreadMainThreadExecutor diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/DefaultRescaleManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/DefaultRescaleManagerTest.java deleted file mode 100644 index 94e6a57ea093a..0000000000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/DefaultRescaleManagerTest.java +++ /dev/null @@ -1,675 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.scheduler.adaptive; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.JobManagerOptions; -import org.apache.flink.util.ConfigurationException; - -import org.junit.jupiter.api.Test; - -import java.time.Duration; -import java.time.Instant; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; -import java.util.SortedMap; -import java.util.TreeMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Function; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; - -class DefaultRescaleManagerTest { - - @Test - void testProperConfiguration() throws ConfigurationException { - final Duration scalingIntervalMin = Duration.ofMillis(1337); - final Duration scalingIntervalMax = Duration.ofMillis(7331); - final Duration maximumDelayForRescaleTrigger = Duration.ofMillis(4242); - - final Configuration configuration = new Configuration(); - configuration.set(JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN, scalingIntervalMin); - configuration.set(JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX, scalingIntervalMax); - configuration.set( - JobManagerOptions.MAXIMUM_DELAY_FOR_SCALE_TRIGGER, maximumDelayForRescaleTrigger); - - final DefaultRescaleManager testInstance = - DefaultRescaleManager.Factory.fromSettings( - AdaptiveScheduler.Settings.of(configuration)) - .create(TestingRescaleManagerContext.stableContext(), Instant.now()); - assertThat(testInstance.scalingIntervalMin).isEqualTo(scalingIntervalMin); - assertThat(testInstance.scalingIntervalMax).isEqualTo(scalingIntervalMax); - assertThat(testInstance.maxTriggerDelay).isEqualTo(maximumDelayForRescaleTrigger); - } - - @Test - void testInvalidConfiguration() { - final Duration cooldownThreshold = Duration.ofMinutes(2); - final TestingRescaleManagerContext ctx = TestingRescaleManagerContext.stableContext(); - assertThatThrownBy( - () -> - new DefaultRescaleManager( - Instant.now(), - ctx, - cooldownThreshold, - cooldownThreshold.minusNanos(1), - Duration.ofHours(5))) - .isInstanceOf(IllegalArgumentException.class); - } - - @Test - void triggerWithoutChangeEventNoopInCooldownPhase() { - triggerWithoutChangeEventNoop( - TestingRescaleManagerContext::createTestInstanceInCooldownPhase); - } - - @Test - void triggerWithoutChangeEventNoopInSoftRescalingPhase() { - triggerWithoutChangeEventNoop( - TestingRescaleManagerContext::createTestInstanceInSoftRescalePhase); - } - - @Test - void triggerWithoutChangeEventNoopInHardRescalingPhase() { - triggerWithoutChangeEventNoop( - TestingRescaleManagerContext::createTestInstanceInHardRescalePhase); - } - - private void triggerWithoutChangeEventNoop( - Function testInstanceCreator) { - final TestingRescaleManagerContext ctx = - TestingRescaleManagerContext.stableContext().withDesiredRescaling(); - final DefaultRescaleManager testInstance = testInstanceCreator.apply(ctx); - - testInstance.onTrigger(); - - assertThat(ctx.rescaleWasTriggered()) - .as( - "No rescaling should have been triggered due to the missing change event despite the fact that desired rescaling would be possible.") - .isFalse(); - assertThat(ctx.additionalTasksWaiting()).as("No tasks should be scheduled.").isFalse(); - } - - @Test - void testDesiredChangeEventDuringCooldown() { - final TestingRescaleManagerContext softScalePossibleCtx = - TestingRescaleManagerContext.stableContext().withDesiredRescaling(); - final DefaultRescaleManager testInstance = - softScalePossibleCtx.createTestInstanceInCooldownPhase(); - - testInstance.onChange(); - - assertIntermediateStateWithoutRescale(softScalePossibleCtx); - - testInstance.onTrigger(); - - assertIntermediateStateWithoutRescale(softScalePossibleCtx); - - softScalePossibleCtx.transitionIntoSoftScalingTimeframe(); - - assertFinalStateWithRescale(softScalePossibleCtx); - } - - @Test - void testDesiredChangeEventInSoftRescalePhase() { - final TestingRescaleManagerContext desiredRescalePossibleCtx = - TestingRescaleManagerContext.stableContext().withDesiredRescaling(); - final DefaultRescaleManager testInstance = - desiredRescalePossibleCtx.createTestInstanceInSoftRescalePhase(); - - testInstance.onChange(); - - assertIntermediateStateWithoutRescale(desiredRescalePossibleCtx); - - testInstance.onTrigger(); - - assertFinalStateWithRescale(desiredRescalePossibleCtx); - } - - @Test - void testDesiredChangeEventInHardRescalePhase() { - final TestingRescaleManagerContext desiredRescalePossibleCtx = - TestingRescaleManagerContext.stableContext().withDesiredRescaling(); - final DefaultRescaleManager testInstance = - desiredRescalePossibleCtx.createTestInstanceInHardRescalePhase(); - - testInstance.onChange(); - - assertIntermediateStateWithoutRescale(desiredRescalePossibleCtx); - - testInstance.onTrigger(); - - assertFinalStateWithRescale(desiredRescalePossibleCtx); - } - - @Test - void testNoRescaleInCooldownPhase() { - final TestingRescaleManagerContext noRescalePossibleCtx = - TestingRescaleManagerContext.stableContext(); - final DefaultRescaleManager testInstance = - noRescalePossibleCtx.createTestInstanceInCooldownPhase(); - - testInstance.onChange(); - - assertIntermediateStateWithoutRescale(noRescalePossibleCtx); - - testInstance.onTrigger(); - - assertIntermediateStateWithoutRescale(noRescalePossibleCtx); - - noRescalePossibleCtx.transitionIntoSoftScalingTimeframe(); - - assertIntermediateStateWithoutRescale(noRescalePossibleCtx); - - noRescalePossibleCtx.transitionIntoHardScalingTimeframe(); - - assertThat(noRescalePossibleCtx.rescaleWasTriggered()) - .as("No rescaling should have happened even in the hard-rescaling phase.") - .isFalse(); - assertThat(noRescalePossibleCtx.additionalTasksWaiting()) - .as("No further tasks should have been waiting for execution.") - .isTrue(); - } - - @Test - void testNoRescaleInSoftRescalePhase() { - final TestingRescaleManagerContext noRescalePossibleCtx = - TestingRescaleManagerContext.stableContext(); - final DefaultRescaleManager testInstance = - noRescalePossibleCtx.createTestInstanceInSoftRescalePhase(); - - testInstance.onChange(); - - assertIntermediateStateWithoutRescale(noRescalePossibleCtx); - - testInstance.onTrigger(); - - assertIntermediateStateWithoutRescale(noRescalePossibleCtx); - - noRescalePossibleCtx.transitionIntoHardScalingTimeframe(); - - assertThat(noRescalePossibleCtx.rescaleWasTriggered()) - .as("No rescaling should have happened even in the hard-rescaling phase.") - .isFalse(); - assertThat(noRescalePossibleCtx.additionalTasksWaiting()) - .as("No further tasks should have been waiting for execution.") - .isTrue(); - } - - @Test - void testNoResaleInHardRescalePhase() { - final TestingRescaleManagerContext noRescalePossibleCtx = - TestingRescaleManagerContext.stableContext(); - final DefaultRescaleManager testInstance = - noRescalePossibleCtx.createTestInstanceInHardRescalePhase(); - - testInstance.onChange(); - - assertIntermediateStateWithoutRescale(noRescalePossibleCtx); - - testInstance.onTrigger(); - - assertThat(noRescalePossibleCtx.rescaleWasTriggered()) - .as("No rescaling should have happened even in the hard-rescaling phase.") - .isFalse(); - assertThat(noRescalePossibleCtx.additionalTasksWaiting()) - .as("No further tasks should have been waiting for execution.") - .isTrue(); - } - - @Test - void testSufficientChangeInCooldownPhase() { - final TestingRescaleManagerContext hardRescalePossibleCtx = - TestingRescaleManagerContext.stableContext().withSufficientRescaling(); - final DefaultRescaleManager testInstance = - hardRescalePossibleCtx.createTestInstanceInCooldownPhase(); - - testInstance.onChange(); - - assertIntermediateStateWithoutRescale(hardRescalePossibleCtx); - - testInstance.onTrigger(); - - assertIntermediateStateWithoutRescale(hardRescalePossibleCtx); - - hardRescalePossibleCtx.transitionIntoSoftScalingTimeframe(); - - assertIntermediateStateWithoutRescale(hardRescalePossibleCtx); - - hardRescalePossibleCtx.transitionIntoHardScalingTimeframe(); - - assertFinalStateWithRescale(hardRescalePossibleCtx); - } - - @Test - void testSufficientChangeInSoftRescalePhase() { - final TestingRescaleManagerContext hardRescalePossibleCtx = - TestingRescaleManagerContext.stableContext().withSufficientRescaling(); - final DefaultRescaleManager testInstance = - hardRescalePossibleCtx.createTestInstanceInSoftRescalePhase(); - - testInstance.onChange(); - - assertIntermediateStateWithoutRescale(hardRescalePossibleCtx); - - testInstance.onTrigger(); - - assertIntermediateStateWithoutRescale(hardRescalePossibleCtx); - - hardRescalePossibleCtx.transitionIntoHardScalingTimeframe(); - - assertFinalStateWithRescale(hardRescalePossibleCtx); - } - - @Test - void testSufficientChangeInHardRescalePhase() { - final TestingRescaleManagerContext hardRescalePossibleCtx = - TestingRescaleManagerContext.stableContext().withSufficientRescaling(); - final DefaultRescaleManager testInstance = - hardRescalePossibleCtx.createTestInstanceInHardRescalePhase(); - - testInstance.onChange(); - - assertIntermediateStateWithoutRescale(hardRescalePossibleCtx); - - testInstance.onTrigger(); - - assertFinalStateWithRescale(hardRescalePossibleCtx); - } - - @Test - void testSufficientChangeInCooldownWithSubsequentDesiredChangeInSoftRescalePhase() { - final TestingRescaleManagerContext ctx = - TestingRescaleManagerContext.stableContext().withSufficientRescaling(); - final DefaultRescaleManager testInstance = ctx.createTestInstanceInCooldownPhase(); - - testInstance.onChange(); - - assertIntermediateStateWithoutRescale(ctx); - - testInstance.onTrigger(); - - assertIntermediateStateWithoutRescale(ctx); - - ctx.transitionIntoSoftScalingTimeframe(); - - ctx.withDesiredRescaling(); - - testInstance.onChange(); - - assertIntermediateStateWithoutRescale(ctx); - - testInstance.onTrigger(); - - assertThat(ctx.rescaleWasTriggered()).isTrue(); - assertThat(ctx.numberOfTasksWaiting()) - .as( - "There should be a task scheduled that allows transitioning into hard-rescaling phase.") - .isEqualTo(3); - } - - @Test - void testSufficientChangeWithSubsequentDesiredChangeInSoftRescalePhase() { - final TestingRescaleManagerContext ctx = - TestingRescaleManagerContext.stableContext().withSufficientRescaling(); - final DefaultRescaleManager testInstance = ctx.createTestInstanceInSoftRescalePhase(); - - testInstance.onChange(); - - assertIntermediateStateWithoutRescale(ctx); - - testInstance.onTrigger(); - - assertIntermediateStateWithoutRescale(ctx); - - assertThat(ctx.numberOfTasksWaiting()) - .as( - "There should be a task scheduled that allows transitioning into hard-rescaling phase.") - .isEqualTo(2); - - ctx.withDesiredRescaling(); - - testInstance.onChange(); - - assertIntermediateStateWithoutRescale(ctx); - - testInstance.onTrigger(); - - assertThat(ctx.rescaleWasTriggered()).isTrue(); - } - - @Test - void - testRevokedSufficientChangeInSoftRescalePhaseWithSubsequentSufficientChangeInHardRescalingPhase() { - final TestingRescaleManagerContext ctx = - TestingRescaleManagerContext.stableContext().withSufficientRescaling(); - final DefaultRescaleManager testInstance = ctx.createTestInstanceInSoftRescalePhase(); - - testInstance.onChange(); - - assertIntermediateStateWithoutRescale(ctx); - - testInstance.onTrigger(); - - assertIntermediateStateWithoutRescale(ctx); - - assertThat(ctx.numberOfTasksWaiting()) - .as( - "There should be a task scheduled that allows transitioning into hard-rescaling phase.") - .isEqualTo(2); - - ctx.revertAnyParallelismImprovements(); - - testInstance.onChange(); - - assertIntermediateStateWithoutRescale(ctx); - - testInstance.onTrigger(); - - assertIntermediateStateWithoutRescale(ctx); - - assertThat(ctx.numberOfTasksWaiting()) - .as( - "There should be a task scheduled that allows transitioning into hard-rescaling phase.") - .isEqualTo(2); - - ctx.transitionIntoHardScalingTimeframe(); - - assertThat(ctx.rescaleWasTriggered()) - .as( - "No rescaling should have been triggered because of the previous revert of the additional resources.") - .isFalse(); - assertThat(ctx.additionalTasksWaiting()) - .as( - "The transition to hard-rescaling should have happened without any additional tasks in waiting state.") - .isTrue(); - - ctx.withSufficientRescaling(); - - testInstance.onChange(); - - assertIntermediateStateWithoutRescale(ctx); - - testInstance.onTrigger(); - - assertFinalStateWithRescale(ctx); - } - - @Test - void testRevokedChangeInHardRescalingPhaseCausesWithSubsequentSufficientChange() { - final TestingRescaleManagerContext ctx = TestingRescaleManagerContext.stableContext(); - final DefaultRescaleManager testInstance = ctx.createTestInstanceInHardRescalePhase(); - - testInstance.onChange(); - - assertIntermediateStateWithoutRescale(ctx); - - testInstance.onTrigger(); - - assertThat(ctx.rescaleWasTriggered()).isFalse(); - assertThat(ctx.additionalTasksWaiting()).isTrue(); - - ctx.withSufficientRescaling(); - - testInstance.onChange(); - - assertIntermediateStateWithoutRescale(ctx); - - testInstance.onTrigger(); - - assertFinalStateWithRescale(ctx); - } - - private static void assertIntermediateStateWithoutRescale(TestingRescaleManagerContext ctx) { - assertThat(ctx.rescaleWasTriggered()) - .as("The rescale should not have been triggered, yet.") - .isFalse(); - assertThat(ctx.additionalTasksWaiting()) - .as("There should be still tasks being scheduled.") - .isTrue(); - } - - private static void assertFinalStateWithRescale(TestingRescaleManagerContext ctx) { - assertThat(ctx.rescaleWasTriggered()) - .as("The rescale should have been triggered already.") - .isTrue(); - assertThat(ctx.additionalTasksWaiting()) - .as("All scheduled tasks should have been executed.") - .isTrue(); - } - - /** - * {@code TestingRescaleManagerContext} provides methods for adjusting the elapsed time and for - * adjusting the available resources for rescaling. - */ - private static class TestingRescaleManagerContext implements RescaleManager.Context { - - // default configuration values to allow for easy transitioning between the phases - private static final Duration SCALING_MIN = Duration.ofHours(1); - private static final Duration SCALING_MAX = Duration.ofHours(2); - - // configuration that defines what kind of rescaling would be possible - private boolean hasSufficientResources = false; - private boolean hasDesiredResources = false; - - // internal state used for assertions - private final AtomicBoolean rescaleTriggered = new AtomicBoolean(); - private final SortedMap> scheduledTasks = new TreeMap<>(); - - // Instant.MIN makes debugging easier because timestamps become human-readable - private final Instant initializationTime = Instant.MIN; - private Duration elapsedTime = Duration.ZERO; - - // /////////////////////////////////////////////// - // Context creation - // /////////////////////////////////////////////// - - public static TestingRescaleManagerContext stableContext() { - return new TestingRescaleManagerContext(); - } - - private TestingRescaleManagerContext() { - // no rescaling is enabled by default - revertAnyParallelismImprovements(); - } - - public void revertAnyParallelismImprovements() { - this.hasSufficientResources = false; - this.hasDesiredResources = false; - } - - public TestingRescaleManagerContext withDesiredRescaling() { - // having desired resources should also mean that the sufficient resources are met - this.hasSufficientResources = true; - this.hasDesiredResources = true; - - return this; - } - - public TestingRescaleManagerContext withSufficientRescaling() { - this.hasSufficientResources = true; - this.hasDesiredResources = false; - - return this; - } - - // /////////////////////////////////////////////// - // RescaleManager.Context interface methods - // /////////////////////////////////////////////// - - @Override - public boolean hasSufficientResources() { - return this.hasSufficientResources; - } - - @Override - public boolean hasDesiredResources() { - return this.hasDesiredResources; - } - - @Override - public void rescale() { - rescaleTriggered.set(true); - } - - @Override - public void scheduleOperation(Runnable callback, Duration delay) { - final Instant triggerTime = - Objects.requireNonNull(initializationTime).plus(elapsedTime).plus(delay); - if (!scheduledTasks.containsKey(triggerTime)) { - scheduledTasks.put(triggerTime, new ArrayList<>()); - } - - scheduledTasks.get(triggerTime).add(callback); - } - - // /////////////////////////////////////////////// - // Test instance creation - // /////////////////////////////////////////////// - - /** - * Creates the {@code DefaultRescaleManager} test instance and transitions into a period in - * time where the instance is in cooldown phase. - */ - public DefaultRescaleManager createTestInstanceInCooldownPhase() { - return createTestInstance(this::transitionIntoCooldownTimeframe); - } - - /** - * Creates the {@code DefaultRescaleManager} test instance and transitions into a period in - * time where the instance is in soft-rescaling phase. - */ - public DefaultRescaleManager createTestInstanceInSoftRescalePhase() { - return createTestInstance(this::transitionIntoSoftScalingTimeframe); - } - - /** - * Creates the {@code DefaultRescaleManager} test instance and transitions into a period in - * time where the instance is in hard-rescaling phase. - */ - public DefaultRescaleManager createTestInstanceInHardRescalePhase() { - return createTestInstance(this::transitionIntoHardScalingTimeframe); - } - - /** - * Initializes the test instance and sets the context's elapsed time based on the passed - * callback. - */ - private DefaultRescaleManager createTestInstance(Runnable timeTransitioning) { - final DefaultRescaleManager testInstance = - new DefaultRescaleManager( - initializationTime, - // clock that returns the time based on the configured elapsedTime - () -> Objects.requireNonNull(initializationTime).plus(elapsedTime), - this, - SCALING_MIN, - SCALING_MAX, - Duration.ofHours(5)) { - @Override - public void onChange() { - super.onChange(); - - // hack to avoid calling this method in every test method - // we want to trigger tasks that are meant to run right-away - TestingRescaleManagerContext.this.triggerOutdatedTasks(); - } - - @Override - public void onTrigger() { - super.onTrigger(); - - // hack to avoid calling this method in every test method - // we want to trigger tasks that are meant to run right-away - TestingRescaleManagerContext.this.triggerOutdatedTasks(); - } - }; - - timeTransitioning.run(); - return testInstance; - } - - // /////////////////////////////////////////////// - // Time-adjustment functionality - // /////////////////////////////////////////////// - - /** - * Transitions the context's time to a moment that falls into the test instance's cooldown - * phase. - */ - public void transitionIntoCooldownTimeframe() { - this.elapsedTime = SCALING_MIN.dividedBy(2); - this.triggerOutdatedTasks(); - } - - /** - * Transitions the context's time to a moment that falls into the test instance's - * soft-scaling phase. - */ - public void transitionIntoSoftScalingTimeframe() { - // the state transition is scheduled based on the current event's time rather than the - // initializationTime - this.elapsedTime = elapsedTime.plus(SCALING_MIN); - - // make sure that we're still below the scalingIntervalMax - this.elapsedTime = elapsedTime.plus(SCALING_MAX.minus(elapsedTime).dividedBy(2)); - this.triggerOutdatedTasks(); - } - - /** - * Transitions the context's time to a moment that falls into the test instance's - * hard-scaling phase. - */ - public void transitionIntoHardScalingTimeframe() { - // the state transition is scheduled based on the current event's time rather than the - // initializationTime - this.elapsedTime = elapsedTime.plus(SCALING_MAX).plusMinutes(1); - this.triggerOutdatedTasks(); - } - - private void triggerOutdatedTasks() { - while (!scheduledTasks.isEmpty()) { - final Instant timeOfExecution = scheduledTasks.firstKey(); - if (!timeOfExecution.isAfter( - Objects.requireNonNull(initializationTime).plus(elapsedTime))) { - scheduledTasks.remove(timeOfExecution).forEach(Runnable::run); - } else { - break; - } - } - } - - // /////////////////////////////////////////////// - // Methods for verifying the context's state - // /////////////////////////////////////////////// - - public boolean rescaleWasTriggered() { - return rescaleTriggered.get(); - } - - public int numberOfTasksWaiting() { - return scheduledTasks.size(); - } - - public boolean additionalTasksWaiting() { - return !scheduledTasks.isEmpty(); - } - } -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManagerTest.java new file mode 100644 index 0000000000000..ca90aa2a1c556 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManagerTest.java @@ -0,0 +1,722 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.core.testutils.ScheduledTask; +import org.apache.flink.runtime.scheduler.adaptive.DefaultStateTransitionManager.Idling; +import org.apache.flink.util.ConfigurationException; +import org.apache.flink.util.Preconditions; + +import org.junit.jupiter.api.Test; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; + +import static org.apache.flink.runtime.scheduler.adaptive.DefaultStateTransitionManager.Cooldown; +import static org.apache.flink.runtime.scheduler.adaptive.DefaultStateTransitionManager.Phase; +import static org.apache.flink.runtime.scheduler.adaptive.DefaultStateTransitionManager.Stabilized; +import static org.apache.flink.runtime.scheduler.adaptive.DefaultStateTransitionManager.Stabilizing; +import static org.apache.flink.runtime.scheduler.adaptive.DefaultStateTransitionManager.Transitioning; +import static org.assertj.core.api.Assertions.assertThat; + +class DefaultStateTransitionManagerTest { + + @Test + void testProperConfiguration() throws ConfigurationException { + final Duration cooldownTimeout = Duration.ofMillis(1337); + final Duration resourceStabilizationTimeout = Duration.ofMillis(7331); + final Duration maximumDelayForRescaleTrigger = Duration.ofMillis(4242); + + final Configuration configuration = new Configuration(); + configuration.set(JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN, cooldownTimeout); + configuration.set( + JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX, resourceStabilizationTimeout); + configuration.set( + JobManagerOptions.MAXIMUM_DELAY_FOR_SCALE_TRIGGER, maximumDelayForRescaleTrigger); + + final DefaultStateTransitionManager testInstance = + DefaultStateTransitionManager.Factory.fromSettings( + AdaptiveScheduler.Settings.of(configuration)) + .create( + TestingStateTransitionManagerContext.stableContext(), + Instant.now()); + assertThat(testInstance.cooldownTimeout).isEqualTo(cooldownTimeout); + assertThat(testInstance.resourceStabilizationTimeout) + .isEqualTo(resourceStabilizationTimeout); + assertThat(testInstance.maxTriggerDelay).isEqualTo(maximumDelayForRescaleTrigger); + } + + @Test + void testTriggerWithoutChangeEventNoopInCooldownPhase() { + final TestingStateTransitionManagerContext ctx = + TestingStateTransitionManagerContext.stableContext().withDesiredResources(); + final DefaultStateTransitionManager testInstance = ctx.createTestInstanceInCooldownPhase(); + triggerWithoutPhaseMove(ctx, testInstance, Cooldown.class); + } + + @Test + void testTriggerWithoutChangeEventNoopInIdlingPhase() { + final TestingStateTransitionManagerContext ctx = + TestingStateTransitionManagerContext.stableContext().withDesiredResources(); + final DefaultStateTransitionManager testInstance = + ctx.createTestInstanceThatPassedCooldownPhase(); + triggerWithoutPhaseMove(ctx, testInstance, Idling.class); + } + + @Test + void testTriggerWithoutChangeEventNoopInTransitioningPhase() { + final TestingStateTransitionManagerContext ctx = + TestingStateTransitionManagerContext.stableContext().withDesiredResources(); + final DefaultStateTransitionManager testInstance = + ctx.createTestInstanceInTransitioningPhase(); + triggerWithoutPhaseMove(ctx, testInstance, Transitioning.class); + } + + @Test + void testStateTransitionRightAfterCooldown() { + final TestingStateTransitionManagerContext ctx = + TestingStateTransitionManagerContext.stableContext().withDesiredResources(); + final DefaultStateTransitionManager testInstance = ctx.createTestInstanceInCooldownPhase(); + + changeWithoutPhaseMove(ctx, testInstance, Cooldown.class); + + triggerWithoutPhaseMove(ctx, testInstance, Cooldown.class); + + ctx.transitionToInclusiveCooldownEnd(); + + assertPhaseWithoutStateTransition(ctx, testInstance, Cooldown.class); + + testInstance.onTrigger(); + + ctx.passTime(Duration.ofMillis(1)); + + assertPhaseWithoutStateTransition(ctx, testInstance, Stabilizing.class); + + testInstance.onTrigger(); + + assertFinalStateTransitionHappened(ctx, testInstance); + } + + @Test + void testDesiredChangeInCooldownPhase() { + final TestingStateTransitionManagerContext ctx = + TestingStateTransitionManagerContext.stableContext().withDesiredResources(); + final DefaultStateTransitionManager testInstance = ctx.createTestInstanceInCooldownPhase(); + + changeWithoutPhaseMove(ctx, testInstance, Cooldown.class); + + triggerWithoutPhaseMove(ctx, testInstance, Cooldown.class); + + ctx.transitionOutOfCooldownPhase(); + + assertPhaseWithoutStateTransition(ctx, testInstance, Stabilizing.class); + + testInstance.onTrigger(); + + assertFinalStateTransitionHappened(ctx, testInstance); + } + + @Test + void testDesiredChangeInIdlingPhase() { + final TestingStateTransitionManagerContext ctx = + TestingStateTransitionManagerContext.stableContext().withDesiredResources(); + final DefaultStateTransitionManager testInstance = + ctx.createTestInstanceThatPassedCooldownPhase(); + + assertPhaseWithoutStateTransition(ctx, testInstance, Idling.class); + + testInstance.onChange(); + + assertPhaseWithoutStateTransition(ctx, testInstance, Stabilizing.class); + + testInstance.onTrigger(); + + assertFinalStateTransitionHappened(ctx, testInstance); + } + + @Test + void testDesiredChangeInStabilizedPhase() { + final TestingStateTransitionManagerContext ctx = + TestingStateTransitionManagerContext.stableContext().withSufficientResources(); + final DefaultStateTransitionManager testInstance = + ctx.createTestInstanceInStabilizedPhase(); + + assertPhaseWithoutStateTransition(ctx, testInstance, Stabilized.class); + + withDesiredChange(ctx, testInstance); + + assertPhaseWithoutStateTransition(ctx, testInstance, Stabilized.class); + + testInstance.onTrigger(); + + assertFinalStateTransitionHappened(ctx, testInstance); + } + + @Test + void testDesiredResourcesInStabilizingPhaseAfterMaxTriggerDelay() { + final TestingStateTransitionManagerContext ctx = + TestingStateTransitionManagerContext.stableContext().withSufficientResources(); + final DefaultStateTransitionManager testInstance = + ctx.createTestInstanceWithoutStabilizationTimeout( + manager -> { + manager.onChange(); + ctx.passTime(TestingStateTransitionManagerContext.COOLDOWN_TIMEOUT); + }); + + assertPhaseWithoutStateTransition(ctx, testInstance, Stabilizing.class); + + ctx.passMaxDelayTriggerTimeout(); + + withDesiredChange(ctx, testInstance); + + assertPhaseWithoutStateTransition(ctx, testInstance, Stabilizing.class); + + ctx.passMaxDelayTriggerTimeout(); + + assertFinalStateTransitionHappened(ctx, testInstance); + } + + @Test + void testNoResourcesChangeInCooldownPhase() { + final TestingStateTransitionManagerContext ctx = + TestingStateTransitionManagerContext.stableContext(); + final DefaultStateTransitionManager testInstance = ctx.createTestInstanceInCooldownPhase(); + + changeWithoutPhaseMove(ctx, testInstance, Cooldown.class); + + triggerWithoutPhaseMove(ctx, testInstance, Cooldown.class); + + ctx.transitionOutOfCooldownPhase(); + + assertPhaseWithoutStateTransition(ctx, testInstance, Idling.class); + } + + @Test + void testNoResourcesChangeInIdlingPhase() { + final TestingStateTransitionManagerContext ctx = + TestingStateTransitionManagerContext.stableContext(); + final DefaultStateTransitionManager testInstance = + ctx.createTestInstanceThatPassedCooldownPhase(); + + changeWithoutPhaseMove(ctx, testInstance, Idling.class); + + triggerWithoutPhaseMove(ctx, testInstance, Idling.class); + } + + @Test + void testSufficientResourcesInStabilizingPhaseAfterMaxTriggerDelay() { + final TestingStateTransitionManagerContext ctx = + TestingStateTransitionManagerContext.stableContext().withSufficientResources(); + final DefaultStateTransitionManager testInstance = + ctx.createTestInstanceWithoutStabilizationTimeout( + manager -> { + manager.onChange(); + ctx.passTime(TestingStateTransitionManagerContext.COOLDOWN_TIMEOUT); + }); + + assertPhaseWithoutStateTransition(ctx, testInstance, Stabilizing.class); + + ctx.passMaxDelayTriggerTimeout(); + + assertPhaseWithoutStateTransition(ctx, testInstance, Stabilizing.class); + } + + @Test + void testSufficientResourcesInStabilizedPhaseAfterMaxTriggerDelay() { + final TestingStateTransitionManagerContext ctx = + TestingStateTransitionManagerContext.stableContext().withSufficientResources(); + final DefaultStateTransitionManager testInstance = + ctx.createTestInstanceInStabilizedPhase(); + + assertPhaseWithoutStateTransition(ctx, testInstance, Stabilized.class); + + ctx.passMaxDelayTriggerTimeout(); + + assertFinalStateTransitionHappened(ctx, testInstance); + } + + @Test + void testSufficientChangeInCooldownPhase() { + final TestingStateTransitionManagerContext ctx = + TestingStateTransitionManagerContext.stableContext().withSufficientResources(); + final DefaultStateTransitionManager testInstance = ctx.createTestInstanceInCooldownPhase(); + + changeWithoutPhaseMove(ctx, testInstance, Cooldown.class); + + triggerWithoutPhaseMove(ctx, testInstance, Cooldown.class); + + ctx.transitionOutOfCooldownPhase(); + + triggerWithoutPhaseMove(ctx, testInstance, Stabilizing.class); + + ctx.passResourceStabilizationTimeout(); + + testInstance.onTrigger(); + + assertFinalStateTransitionHappened(ctx, testInstance); + } + + @Test + void testSufficientChangeInIdlingPhase() { + final TestingStateTransitionManagerContext ctx = + TestingStateTransitionManagerContext.stableContext().withSufficientResources(); + final DefaultStateTransitionManager testInstance = + ctx.createTestInstanceThatPassedCooldownPhase(); + + assertPhaseWithoutStateTransition(ctx, testInstance, Idling.class); + + testInstance.onChange(); + + triggerWithoutPhaseMove(ctx, testInstance, Stabilizing.class); + + ctx.passResourceStabilizationTimeout(); + + testInstance.onTrigger(); + + assertFinalStateTransitionHappened(ctx, testInstance); + } + + @Test + void testSufficientChangeInStabilizedPhase() { + final TestingStateTransitionManagerContext ctx = + TestingStateTransitionManagerContext.stableContext().withSufficientResources(); + final DefaultStateTransitionManager testInstance = + ctx.createTestInstanceInStabilizedPhase(); + + assertPhaseWithoutStateTransition(ctx, testInstance, Stabilized.class); + + testInstance.onTrigger(); + + assertFinalStateTransitionHappened(ctx, testInstance); + } + + @Test + void testSufficientChangeWithSubsequentDesiredChangeInStabilizingPhase() { + final TestingStateTransitionManagerContext ctx = + TestingStateTransitionManagerContext.stableContext().withSufficientResources(); + final DefaultStateTransitionManager testInstance = + ctx.createTestInstanceThatPassedCooldownPhase(); + + assertPhaseWithoutStateTransition(ctx, testInstance, Idling.class); + + testInstance.onChange(); + + triggerWithoutPhaseMove(ctx, testInstance, Stabilizing.class); + + withDesiredChange(ctx, testInstance); + + assertPhaseWithoutStateTransition(ctx, testInstance, Stabilizing.class); + + testInstance.onTrigger(); + + assertFinalStateTransitionHappened(ctx, testInstance); + } + + @Test + void testRevokedChangeInStabilizingPhaseWithSubsequentSufficientChangeInStabilizedPhase() { + final TestingStateTransitionManagerContext ctx = + TestingStateTransitionManagerContext.stableContext().withSufficientResources(); + final DefaultStateTransitionManager testInstance = + ctx.createTestInstanceThatPassedCooldownPhase(); + + assertPhaseWithoutStateTransition(ctx, testInstance, Idling.class); + + testInstance.onChange(); + + triggerWithoutPhaseMove(ctx, testInstance, Stabilizing.class); + + ctx.withRevokeResources(); + + changeWithoutPhaseMove(ctx, testInstance, Stabilizing.class); + + triggerWithoutPhaseMove(ctx, testInstance, Stabilizing.class); + + ctx.passResourceStabilizationTimeout(); + + assertPhaseWithoutStateTransition(ctx, testInstance, Stabilized.class); + + withSufficientChange(ctx, testInstance); + + assertPhaseWithoutStateTransition(ctx, testInstance, Stabilized.class); + + testInstance.onTrigger(); + + assertFinalStateTransitionHappened(ctx, testInstance); + } + + @Test + void testRevokedChangeInStabilizedPhase() { + final TestingStateTransitionManagerContext ctx = + TestingStateTransitionManagerContext.stableContext().withSufficientResources(); + final DefaultStateTransitionManager testInstance = + ctx.createTestInstanceInStabilizedPhase(); + + assertPhaseWithoutStateTransition(ctx, testInstance, Stabilized.class); + + ctx.withRevokeResources(); + + testInstance.onTrigger(); + + assertPhaseWithoutStateTransition(ctx, testInstance, Idling.class); + } + + @Test + void testScheduledTaskBeingIgnoredAfterStateChanged() { + final TestingStateTransitionManagerContext ctx = + TestingStateTransitionManagerContext.stableContext(); + final DefaultStateTransitionManager testInstance = + ctx.createTestInstanceInStabilizedPhase(); + + final AtomicBoolean callbackCalled = new AtomicBoolean(); + testInstance.scheduleFromNow( + () -> callbackCalled.set(true), Duration.ZERO, new TestPhase()); + ctx.triggerOutdatedTasks(); + + assertThat(callbackCalled).isFalse(); + } + + private static class TestPhase extends Phase { + private TestPhase() { + super(Instant::now, null); + } + } + + private static void assertPhaseWithoutStateTransition( + TestingStateTransitionManagerContext ctx, + DefaultStateTransitionManager testInstance, + Class expectedPhase) { + assertThat(ctx.stateTransitionWasTriggered()).isFalse(); + assertThat(testInstance.getPhase()).isInstanceOf(expectedPhase); + } + + private static void assertFinalStateTransitionHappened( + TestingStateTransitionManagerContext ctx, DefaultStateTransitionManager testInstance) { + assertThat(ctx.stateTransitionWasTriggered()).isTrue(); + assertThat(testInstance.getPhase()).isInstanceOf(Transitioning.class); + } + + private static void changeWithoutPhaseMove( + TestingStateTransitionManagerContext ctx, + DefaultStateTransitionManager testInstance, + Class expectedPhase) { + assertPhaseWithoutStateTransition(ctx, testInstance, expectedPhase); + + testInstance.onChange(); + + assertPhaseWithoutStateTransition(ctx, testInstance, expectedPhase); + } + + private static void triggerWithoutPhaseMove( + TestingStateTransitionManagerContext ctx, + DefaultStateTransitionManager testInstance, + Class expectedPhase) { + + assertPhaseWithoutStateTransition(ctx, testInstance, expectedPhase); + + testInstance.onTrigger(); + + assertPhaseWithoutStateTransition(ctx, testInstance, expectedPhase); + } + + private static void withSufficientChange( + TestingStateTransitionManagerContext ctx, DefaultStateTransitionManager testInstance) { + + ctx.withSufficientResources(); + testInstance.onChange(); + } + + private static void withDesiredChange( + TestingStateTransitionManagerContext ctx, DefaultStateTransitionManager testInstance) { + + ctx.withDesiredResources(); + testInstance.onChange(); + } + + /** + * {@code TestingStateTransitionManagerContext} provides methods for adjusting the elapsed time + * and for adjusting the available resources for rescaling. + */ + private static class TestingStateTransitionManagerContext + implements StateTransitionManager.Context { + + // default configuration values to allow for easy transitioning between the phases + private static final Duration COOLDOWN_TIMEOUT = Duration.ofHours(1); + private static final Duration RESOURCE_STABILIZATION_TIMEOUT = Duration.ofHours(2); + private static final Duration MAX_TRIGGER_DELAY = + RESOURCE_STABILIZATION_TIMEOUT.plus(Duration.ofMinutes(10)); + + // configuration that defines what kind of rescaling would be possible + private boolean hasSufficientResources = false; + private boolean hasDesiredResources = false; + + // internal state used for assertions + private final AtomicBoolean transitionTriggered = new AtomicBoolean(); + private final SortedMap>> scheduledTasks = + new TreeMap<>(); + + // Instant.MIN makes debugging easier because timestamps become human-readable + private final Instant initializationTime = Instant.MIN; + private Duration elapsedTime = Duration.ZERO; + + // /////////////////////////////////////////////// + // Context creation + // /////////////////////////////////////////////// + + public static TestingStateTransitionManagerContext stableContext() { + return new TestingStateTransitionManagerContext(); + } + + private TestingStateTransitionManagerContext() { + // no rescaling is enabled by default + withRevokeResources(); + } + + public TestingStateTransitionManagerContext withRevokeResources() { + this.hasSufficientResources = false; + this.hasDesiredResources = false; + + return this; + } + + public TestingStateTransitionManagerContext withDesiredResources() { + // having desired resources should also mean that the sufficient resources are met + this.hasSufficientResources = true; + this.hasDesiredResources = true; + + return this; + } + + public TestingStateTransitionManagerContext withSufficientResources() { + this.hasSufficientResources = true; + this.hasDesiredResources = false; + + return this; + } + + // /////////////////////////////////////////////// + // StateTransitionManager.Context interface methods + // /////////////////////////////////////////////// + + @Override + public boolean hasSufficientResources() { + return this.hasSufficientResources; + } + + @Override + public boolean hasDesiredResources() { + return this.hasDesiredResources; + } + + @Override + public void transitionToSubsequentState() { + transitionTriggered.set(true); + } + + @Override + public ScheduledFuture scheduleOperation(Runnable callback, Duration delay) { + final Instant triggerTime = + Objects.requireNonNull(initializationTime).plus(elapsedTime).plus(delay); + if (!scheduledTasks.containsKey(triggerTime)) { + scheduledTasks.put(triggerTime, new ArrayList<>()); + } + ScheduledTask scheduledTask = + new ScheduledTask<>(Executors.callable(callback), delay.toMillis()); + scheduledTasks.get(triggerTime).add(scheduledTask); + return scheduledTask; + } + + // /////////////////////////////////////////////// + // Test instance creation + // /////////////////////////////////////////////// + + /** + * Creates the {@code DefaultStateTransitionManager} test instance and advances into a + * period in time where the instance is in cooldown phase. + */ + public DefaultStateTransitionManager createTestInstanceInCooldownPhase() { + return createTestInstance(ignored -> this.transitionIntoCooldownTimeframe()); + } + + /** + * Creates the {@code DefaultStateTransitionManager} test instance and advances into a + * period in time where the instance is in stabilizing phase. + */ + public DefaultStateTransitionManager createTestInstanceThatPassedCooldownPhase() { + return createTestInstance(ignored -> this.transitionOutOfCooldownPhase()); + } + + /** + * Creates the {@code DefaultStateTransitionManager} test instance and advances into a + * period in time where the instance is in stabilized phase. + */ + public DefaultStateTransitionManager createTestInstanceInStabilizedPhase() { + return createTestInstance( + manager -> { + manager.onChange(); + passResourceStabilizationTimeout(); + }); + } + + /** + * Creates the {@code DefaultStateTransitionManager} test instance in terminal transitioning + * phase. + */ + public DefaultStateTransitionManager createTestInstanceInTransitioningPhase() { + return createTestInstance( + manager -> { + manager.onChange(); + passResourceStabilizationTimeout(); + manager.onTrigger(); + clearStateTransition(); + }); + } + + /** + * Initializes the test instance with set stabilization timeout and sets the context's + * elapsed time based on the passed callback. + */ + public DefaultStateTransitionManager createTestInstance( + Consumer callback) { + return createTestInstance(callback, RESOURCE_STABILIZATION_TIMEOUT); + } + + /** + * Initializes the test instance without stabilization timeout and sets the context's + * elapsed time based on the passed callback. + */ + public DefaultStateTransitionManager createTestInstanceWithoutStabilizationTimeout( + Consumer callback) { + return createTestInstance(callback, null); + } + + private DefaultStateTransitionManager createTestInstance( + Consumer callback, + @Nullable Duration resourceStabilizationTimeout) { + final DefaultStateTransitionManager testInstance = + new DefaultStateTransitionManager( + initializationTime, + // clock that returns the time based on the configured elapsedTime + () -> Objects.requireNonNull(initializationTime).plus(elapsedTime), + this, + TestingStateTransitionManagerContext.COOLDOWN_TIMEOUT, + resourceStabilizationTimeout, + TestingStateTransitionManagerContext.MAX_TRIGGER_DELAY) { + @Override + public void onChange() { + super.onChange(); + + // hack to avoid calling this method in every test method + // we want to trigger tasks that are meant to run right-away + TestingStateTransitionManagerContext.this.triggerOutdatedTasks(); + } + + @Override + public void onTrigger() { + super.onTrigger(); + + // hack to avoid calling this method in every test method + // we want to trigger tasks that are meant to run right-away + TestingStateTransitionManagerContext.this.triggerOutdatedTasks(); + } + }; + + callback.accept(testInstance); + return testInstance; + } + + // /////////////////////////////////////////////// + // Time-adjustment functionality + // /////////////////////////////////////////////// + + /** + * Transitions the context's time to a moment that falls into the test instance's cooldown + * phase. + */ + public void transitionIntoCooldownTimeframe() { + setElapsedTime(COOLDOWN_TIMEOUT.dividedBy(2)); + this.triggerOutdatedTasks(); + } + + public void transitionOutOfCooldownPhase() { + this.setElapsedTime(COOLDOWN_TIMEOUT.plusMillis(1)); + } + + public void passResourceStabilizationTimeout() { + // resource stabilization is based on the current time + this.passTime(RESOURCE_STABILIZATION_TIMEOUT.plusMillis(1)); + } + + public void transitionToInclusiveCooldownEnd() { + setElapsedTime(COOLDOWN_TIMEOUT.minusMillis(1)); + } + + public void passMaxDelayTriggerTimeout() { + this.passTime(MAX_TRIGGER_DELAY.plusMillis(1)); + } + + public void passTime(Duration elapsed) { + setElapsedTime(this.elapsedTime.plus(elapsed)); + } + + public void setElapsedTime(Duration elapsedTime) { + Preconditions.checkState( + this.elapsedTime.compareTo(elapsedTime) <= 0, + "The elapsed time should monotonically increase."); + this.elapsedTime = elapsedTime; + this.triggerOutdatedTasks(); + } + + private void triggerOutdatedTasks() { + while (!scheduledTasks.isEmpty()) { + final Instant timeOfExecution = scheduledTasks.firstKey(); + if (!timeOfExecution.isAfter( + Objects.requireNonNull(initializationTime).plus(elapsedTime))) { + scheduledTasks.remove(timeOfExecution).forEach(ScheduledTask::execute); + } else { + break; + } + } + } + + // /////////////////////////////////////////////// + // Methods for verifying the context's state + // /////////////////////////////////////////////// + + public boolean stateTransitionWasTriggered() { + return transitionTriggered.get(); + } + + public void clearStateTransition() { + transitionTriggered.set(false); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java index 48ee70bfd297f..b49c1c9b35d1a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java @@ -155,7 +155,7 @@ void testNoDeploymentCallOnEnterWhenVertexRunning() throws Exception { ctx, ClassLoader.getSystemClassLoader(), new ArrayList<>(), - TestingRescaleManager.Factory.noOpFactory(), + TestingStateTransitionManager.Factory.noOpFactory(), 1, 1, Instant.now()); @@ -183,7 +183,7 @@ void testIllegalStateExceptionOnNotRunningExecutionGraph() { ctx, ClassLoader.getSystemClassLoader(), new ArrayList<>(), - TestingRescaleManager.Factory.noOpFactory(), + TestingStateTransitionManager.Factory.noOpFactory(), 1, 1, Instant.now()); @@ -195,12 +195,13 @@ void testIllegalStateExceptionOnNotRunningExecutionGraph() { @Test public void testTriggerRescaleOnCompletedCheckpoint() throws Exception { final AtomicBoolean rescaleTriggered = new AtomicBoolean(); - final RescaleManager.Factory rescaleManagerFactory = - new TestingRescaleManager.Factory(() -> {}, () -> rescaleTriggered.set(true)); + final StateTransitionManager.Factory stateTransitionManagerFactory = + new TestingStateTransitionManager.Factory( + () -> {}, () -> rescaleTriggered.set(true)); try (MockExecutingContext ctx = new MockExecutingContext()) { final Executing testInstance = new ExecutingStateBuilder() - .setRescaleManagerFactory(rescaleManagerFactory) + .setStateTransitionManagerFactory(stateTransitionManagerFactory) .build(ctx); assertThat(rescaleTriggered).isFalse(); @@ -212,13 +213,14 @@ public void testTriggerRescaleOnCompletedCheckpoint() throws Exception { @Test public void testTriggerRescaleOnFailedCheckpoint() throws Exception { final AtomicInteger rescaleTriggerCount = new AtomicInteger(); - final RescaleManager.Factory rescaleManagerFactory = - new TestingRescaleManager.Factory(() -> {}, rescaleTriggerCount::incrementAndGet); + final StateTransitionManager.Factory stateTransitionManagerFactory = + new TestingStateTransitionManager.Factory( + () -> {}, rescaleTriggerCount::incrementAndGet); final int rescaleOnFailedCheckpointsCount = 3; try (MockExecutingContext ctx = new MockExecutingContext()) { final Executing testInstance = new ExecutingStateBuilder() - .setRescaleManagerFactory(rescaleManagerFactory) + .setStateTransitionManagerFactory(stateTransitionManagerFactory) .setRescaleOnFailedCheckpointCount(rescaleOnFailedCheckpointsCount) .build(ctx); @@ -254,13 +256,14 @@ public void testTriggerRescaleOnFailedCheckpoint() throws Exception { @Test public void testOnCompletedCheckpointResetsFailedCheckpointCount() throws Exception { final AtomicInteger rescaleTriggeredCount = new AtomicInteger(); - final RescaleManager.Factory rescaleManagerFactory = - new TestingRescaleManager.Factory(() -> {}, rescaleTriggeredCount::incrementAndGet); + final StateTransitionManager.Factory stateTransitionManagerFactory = + new TestingStateTransitionManager.Factory( + () -> {}, rescaleTriggeredCount::incrementAndGet); final int rescaleOnFailedCheckpointsCount = 3; try (MockExecutingContext ctx = new MockExecutingContext()) { final Executing testInstance = new ExecutingStateBuilder() - .setRescaleManagerFactory(rescaleManagerFactory) + .setStateTransitionManagerFactory(stateTransitionManagerFactory) .setRescaleOnFailedCheckpointCount(rescaleOnFailedCheckpointsCount) .build(ctx); @@ -581,8 +584,8 @@ void testExecutingChecksForNewResourcesWhenBeingCreated() throws Exception { final Queue actualEvents = new ArrayDeque<>(); try (MockExecutingContext ctx = new MockExecutingContext()) { new ExecutingStateBuilder() - .setRescaleManagerFactory( - new TestingRescaleManager.Factory( + .setStateTransitionManagerFactory( + new TestingStateTransitionManager.Factory( () -> actualEvents.add(onChangeEventLabel), () -> actualEvents.add(onTriggerEventLabel))) .build(ctx); @@ -606,8 +609,8 @@ private final class ExecutingStateBuilder { TestingDefaultExecutionGraphBuilder.newBuilder() .build(EXECUTOR_EXTENSION.getExecutor()); private OperatorCoordinatorHandler operatorCoordinatorHandler; - private RescaleManager.Factory rescaleManagerFactory = - TestingRescaleManager.Factory.noOpFactory(); + private StateTransitionManager.Factory stateTransitionManagerFactory = + TestingStateTransitionManager.Factory.noOpFactory(); private int rescaleOnFailedCheckpointCount = 1; private ExecutingStateBuilder() throws JobException, JobExecutionException { @@ -625,9 +628,9 @@ public ExecutingStateBuilder setOperatorCoordinatorHandler( return this; } - public ExecutingStateBuilder setRescaleManagerFactory( - RescaleManager.Factory rescaleManagerFactory) { - this.rescaleManagerFactory = rescaleManagerFactory; + public ExecutingStateBuilder setStateTransitionManagerFactory( + StateTransitionManager.Factory stateTransitionManagerFactory) { + this.stateTransitionManagerFactory = stateTransitionManagerFactory; return this; } @@ -649,7 +652,7 @@ private Executing build(MockExecutingContext ctx) { ctx, ClassLoader.getSystemClassLoader(), new ArrayList<>(), - rescaleManagerFactory, + stateTransitionManagerFactory, 1, rescaleOnFailedCheckpointCount, // will be ignored by the TestingRescaleManager.Factory diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/TestingRescaleManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/TestingStateTransitionManager.java similarity index 70% rename from flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/TestingRescaleManager.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/TestingStateTransitionManager.java index cc435ea4f7b40..3f4573cd74e3a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/TestingRescaleManager.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/TestingStateTransitionManager.java @@ -20,12 +20,13 @@ import java.time.Instant; -public class TestingRescaleManager implements RescaleManager { +/** Testing implementation for {@link StateTransitionManager}. */ +public class TestingStateTransitionManager implements StateTransitionManager { private final Runnable onChangeRunnable; private final Runnable onTriggerRunnable; - private TestingRescaleManager(Runnable onChangeRunnable, Runnable onTriggerRunnable) { + private TestingStateTransitionManager(Runnable onChangeRunnable, Runnable onTriggerRunnable) { this.onChangeRunnable = onChangeRunnable; this.onTriggerRunnable = onTriggerRunnable; } @@ -40,12 +41,15 @@ public void onTrigger() { this.onTriggerRunnable.run(); } - public static class Factory implements RescaleManager.Factory { + /** + * {@code Factory} implementation for creating {@code TestingStateTransitionManager} instances. + */ + public static class Factory implements StateTransitionManager.Factory { private final Runnable onChangeRunnable; private final Runnable onTriggerRunnable; - public static TestingRescaleManager.Factory noOpFactory() { + public static TestingStateTransitionManager.Factory noOpFactory() { return new Factory(() -> {}, () -> {}); } @@ -55,8 +59,8 @@ public Factory(Runnable onChangeRunnable, Runnable onTriggerRunnable) { } @Override - public RescaleManager create(Context ignoredContext, Instant ignoredLastRescale) { - return new TestingRescaleManager(onChangeRunnable, onTriggerRunnable); + public StateTransitionManager create(Context ignoredContext, Instant ignoredLastRescale) { + return new TestingStateTransitionManager(onChangeRunnable, onTriggerRunnable); } } }