From e00ebc784837878a7fa5576487cdbad37a885240 Mon Sep 17 00:00:00 2001 From: "hongli.wwj" Date: Tue, 29 Oct 2024 10:15:03 +0800 Subject: [PATCH] [Flink] support speculative execution when batch read paimon table --- ...ortsHandleExecutionAttemptSourceEvent.java | 45 +++++++++++++++++++ .../StaticFileStoreSplitEnumerator.java | 15 ++++++- 2 files changed, 59 insertions(+), 1 deletion(-) create mode 100644 paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/api/connector/source/SupportsHandleExecutionAttemptSourceEvent.java diff --git a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/api/connector/source/SupportsHandleExecutionAttemptSourceEvent.java b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/api/connector/source/SupportsHandleExecutionAttemptSourceEvent.java new file mode 100644 index 000000000000..9d1ca7a43b8f --- /dev/null +++ b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/api/connector/source/SupportsHandleExecutionAttemptSourceEvent.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.source; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * An decorative interface of {@link SplitEnumerator} which allows to handle {@link SourceEvent} + * sent from a specific execution attempt. + * + *

The split enumerator must implement this interface if it needs to deal with custom source + * events and is used in cases that a subtask can have multiple concurrent execution attempts, e.g. + * if speculative execution is enabled. Otherwise an error will be thrown when the split enumerator + * receives a custom source event. + */ +@PublicEvolving +public interface SupportsHandleExecutionAttemptSourceEvent { + + /** + * Handles a custom source event from the source reader. It is similar to {@link + * SplitEnumerator#handleSourceEvent(int, SourceEvent)} but is aware of the subtask execution + * attempt who sent this event. + * + * @param subtaskId the subtask id of the source reader who sent the source event. + * @param attemptNumber the attempt number of the source reader who sent the source event. + * @param sourceEvent the source event from the source reader. + */ + void handleSourceEvent(int subtaskId, int attemptNumber, SourceEvent sourceEvent); +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java index c30b1c96d2b4..7bd8ff990f86 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java @@ -28,6 +28,7 @@ import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.api.connector.source.SupportsHandleExecutionAttemptSourceEvent; import org.apache.flink.table.connector.source.DynamicFilteringEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,7 +42,8 @@ /** A {@link SplitEnumerator} implementation for {@link StaticFileStoreSource} input. */ public class StaticFileStoreSplitEnumerator - implements SplitEnumerator { + implements SplitEnumerator, + SupportsHandleExecutionAttemptSourceEvent { private static final Logger LOG = LoggerFactory.getLogger(StaticFileStoreSplitEnumerator.class); @@ -118,6 +120,17 @@ public Snapshot snapshot() { return snapshot; } + @Override + public void handleSourceEvent(int subtaskId, int attemptNumber, SourceEvent sourceEvent) { + // Only recognize events that don't care attemptNumber. + handleSourceEvent(subtaskId, sourceEvent); + } + + /** + * When to support a new kind of event, pay attention that whether the new event can be sent + * multiple times from different attempts of one subtask. If so, it should be handled via method + * {@link #handleSourceEvent(int, int, SourceEvent)} + */ @Override public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { if (sourceEvent instanceof ReaderConsumeProgressEvent) {