Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Job cannot recover from checkpoint/savepoint if parallelism is changed from 1 to 2 #4543

Open
1 of 2 tasks
Tan-JiaLiang opened this issue Nov 18, 2024 · 5 comments
Open
1 of 2 tasks
Labels
bug Something isn't working

Comments

@Tan-JiaLiang
Copy link
Contributor

Search before asking

  • I searched in the issues and found nothing similar.

Paimon version

0.9.0

Compute Engine

Flink

Minimal reproduce step

  1. start a job to write paimon append only table in parallelism=1.
  2. stop the job.
  3. restore the job with checkpoint, and change the job's parallelism=2.
  4. error appear, job can not restore from checkpoint.

What doesn't meet your expectations?

Job can restore from checkpoint/savepoint even if I change the parallelism.

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@Tan-JiaLiang Tan-JiaLiang added the bug Something isn't working label Nov 18, 2024
@Tan-JiaLiang
Copy link
Contributor Author

if (enableCompaction && isStreamingMode) {
written =
written.transform(
"Compact Coordinator: " + table.name(),
new EitherTypeInfo<>(
new CommittableTypeInfo(),
new CompactionTaskTypeInfo()),
new AppendBypassCoordinateOperatorFactory<>(table))
.forceNonParallel()
.transform(
"Compact Worker: " + table.name(),
new CommittableTypeInfo(),
new AppendBypassCompactWorkerOperator(table, initialCommitUser))
.setParallelism(written.getParallelism());
}

If the job parallelism is 1, the Writer operator and the Compact Coordinator operator will be chained. However, since the parallelism of the Compact Coordinator operator is always 1, when the job parallelism is adjusted, the Writer operator and the Compact Coordinator operator will be separated, resulting in the state not being recoverable.

We need to disable chain between writer operator and compact corrdinator operator.

@Tan-JiaLiang
Copy link
Contributor Author

if (options.get(CHANGELOG_PRECOMMIT_COMPACT)) {
written =
written.transform(
"Changelog Compact Coordinator",
new EitherTypeInfo<>(
new CommittableTypeInfo(), new ChangelogTaskTypeInfo()),
new ChangelogCompactCoordinateOperator(table))
.forceNonParallel()
.transform(
"Changelog Compact Worker",
new CommittableTypeInfo(),
new ChangelogCompactWorkerOperator(table))
.setParallelism(written.getParallelism());
}

Same as org.apache.paimon.flink.sink.FlinkSink#doWrite

@Tan-JiaLiang
Copy link
Contributor Author

@JingsongLi WDYT? Should we add an option to control this? Like #3232.

@Tan-JiaLiang
Copy link
Contributor Author

I think #4424 can solve this problem. But do we still need to add the disable chain? Or do we just need to recommend that users add the 'sink.operator-uid.suffix' and 'source.operator-uid.suffix' options to their Flink job?

@huyuanfeng2018
Copy link
Contributor

I think #4424 can solve this problem. But do we still need to add the disable chain? Or do we just need to recommend that users add the 'sink.operator-uid.suffix' and 'source.operator-uid.suffix' options to their Flink job?

@Tan-JiaLiang
I think #4424 is enough to solve the problem of not being able to recover from a checkpoint, but I think #3232. is also necessary, Because if the flink adaptive scheduler is used, we can quickly modify the parallelism of the writer in flink webui, No need to restart the job(if chaining false).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants