Skip to content

Commit

Permalink
[FLINK-36011] [runtime] Generalize RescaleManager to become StateTran…
Browse files Browse the repository at this point in the history
…sitionManager
  • Loading branch information
ztison authored and XComp committed Sep 2, 2024
1 parent 339f97c commit c869326
Show file tree
Hide file tree
Showing 11 changed files with 1,250 additions and 975 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ public int getRescaleOnFailedCheckpointCount() {
}

private final Settings settings;
private final RescaleManager.Factory rescaleManagerFactory;
private final StateTransitionManager.Factory stateTransitionManagerFactory;

private final JobGraph jobGraph;

Expand Down Expand Up @@ -427,7 +427,7 @@ public AdaptiveScheduler(
throws JobExecutionException {
this(
settings,
DefaultRescaleManager.Factory.fromSettings(settings),
DefaultStateTransitionManager.Factory.fromSettings(settings),
(metricGroup, checkpointStatsListener) ->
new DefaultCheckpointStatsTracker(
configuration.get(WebOptions.CHECKPOINTS_HISTORY_SIZE),
Expand Down Expand Up @@ -455,7 +455,7 @@ public AdaptiveScheduler(
@VisibleForTesting
AdaptiveScheduler(
Settings settings,
RescaleManager.Factory rescaleManagerFactory,
StateTransitionManager.Factory stateTransitionManagerFactory,
BiFunction<JobManagerJobMetricGroup, CheckpointStatsListener, CheckpointStatsTracker>
checkpointStatsTrackerFactory,
JobGraph jobGraph,
Expand All @@ -480,7 +480,7 @@ public AdaptiveScheduler(
assertPreconditions(jobGraph);

this.settings = settings;
this.rescaleManagerFactory = rescaleManagerFactory;
this.stateTransitionManagerFactory = stateTransitionManagerFactory;

this.jobGraph = jobGraph;
this.jobInfo = new JobInfoImpl(jobGraph.getJobID(), jobGraph.getName());
Expand Down Expand Up @@ -1175,7 +1175,7 @@ public void goToExecuting(
this,
userCodeClassLoader,
failureCollection,
rescaleManagerFactory,
stateTransitionManagerFactory,
settings.getMinParallelismChangeForDesiredRescale(),
settings.getRescaleOnFailedCheckpointCount()));
}
Expand Down

This file was deleted.

Loading

0 comments on commit c869326

Please sign in to comment.