Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Flink] Support speculative execution when batch read paimon table. #4395

Merged

Conversation

wwj6591812
Copy link
Contributor

@wwj6591812 wwj6591812 commented Oct 29, 2024

Purpose

image

In my company's production job, when batch read a paimon table, I receive the exception.
After read flink and paimon code, I know it means paimon use custom SourceEvent ReaderConsumeProgressEvent.
I have come up with two solutions:
1、Implement the interface SupportsHandleExecutionAttemptSourceEvent according to the exception suggestion;
2、Delete ReaderConsumeProgressEvent in StaticFileStoreSplitEnumerator.

Due to the possibility of new other SourceEvent being added in the future, so I chose the solutions-1.

Linked issue: close #xxx

Tests

API and Format

Documentation

@wwj6591812 wwj6591812 changed the title [Flink] support speculative execution when batch read paimon table [Flink] Support speculative execution when batch read paimon table. Oct 29, 2024
@@ -41,7 +42,8 @@

/** A {@link SplitEnumerator} implementation for {@link StaticFileStoreSource} input. */
public class StaticFileStoreSplitEnumerator
implements SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> {
implements SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint>,
SupportsHandleExecutionAttemptSourceEvent {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may need to introduce classes for Flink 1.15.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we remove 1.15 like remove spark-3.1(#3941)?

We also has plan to support Flink-2.0, to avoid to maintain too many Flink versions, could we remove the support for Flink-1.15?

@JingsongLi

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it is good to keep Flink 1.15 support.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

@wwj6591812 wwj6591812 force-pushed the support_speculative_interface_1029 branch from 2e799fe to 766d8d2 Compare October 30, 2024 03:29
@wwj6591812 wwj6591812 changed the title [Flink] Support speculative execution when batch read paimon table. [WIP][Flink] Support speculative execution when batch read paimon table. Oct 30, 2024
Copy link
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A simpler solution for 1.15, you can just create a class org.apache.flink.api.connector.source.SupportsHandleExecutionAttemptSourceEvent in paimon-flink-1.15.

@wwj6591812 wwj6591812 force-pushed the support_speculative_interface_1029 branch from 766d8d2 to 35b595c Compare October 30, 2024 06:48
@wwj6591812
Copy link
Contributor Author

A simpler solution for 1.15, you can just create a class org.apache.flink.api.connector.source.SupportsHandleExecutionAttemptSourceEvent in paimon-flink-1.15.

Good idea, done.

@wwj6591812 wwj6591812 changed the title [WIP][Flink] Support speculative execution when batch read paimon table. [Flink] Support speculative execution when batch read paimon table. Oct 30, 2024
// Only recognize events that don't care attemptNumber
handleSourceEvent(subtaskId, sourceEvent);
}

@Override
public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
if (sourceEvent instanceof ReaderConsumeProgressEvent) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a comment to note that when to support a new kind of event, one needs to pay attention that whether the event can be sent multiple times from different attempts of one subtask. See the description of SupportsHandleExecutionAttemptSourceEvent for more details.

Copy link

@zhuzhurk zhuzhurk Oct 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case someone later adds a new kind of event directly to this method, ignoring the method with attemptNumber.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thx, done

@wwj6591812 wwj6591812 force-pushed the support_speculative_interface_1029 branch 2 times, most recently from c60a5b5 to 97f7845 Compare October 31, 2024 04:53
@wwj6591812 wwj6591812 force-pushed the support_speculative_interface_1029 branch from 97f7845 to e00ebc7 Compare October 31, 2024 05:56
Copy link

@zhuzhurk zhuzhurk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@JingsongLi
Copy link
Contributor

+1 Thanks @wwj6591812 and @zhuzhurk

@JingsongLi JingsongLi merged commit d373d18 into apache:master Oct 31, 2024
12 checks passed
hang8929201 pushed a commit to hang8929201/paimon that referenced this pull request Nov 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants