diff --git a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/api/connector/source/SupportsHandleExecutionAttemptSourceEvent.java b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/api/connector/source/SupportsHandleExecutionAttemptSourceEvent.java new file mode 100644 index 0000000000000..9d1ca7a43b8f4 --- /dev/null +++ b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/api/connector/source/SupportsHandleExecutionAttemptSourceEvent.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.source; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * An decorative interface of {@link SplitEnumerator} which allows to handle {@link SourceEvent} + * sent from a specific execution attempt. + * + *
The split enumerator must implement this interface if it needs to deal with custom source
+ * events and is used in cases that a subtask can have multiple concurrent execution attempts, e.g.
+ * if speculative execution is enabled. Otherwise an error will be thrown when the split enumerator
+ * receives a custom source event.
+ */
+@PublicEvolving
+public interface SupportsHandleExecutionAttemptSourceEvent {
+
+ /**
+ * Handles a custom source event from the source reader. It is similar to {@link
+ * SplitEnumerator#handleSourceEvent(int, SourceEvent)} but is aware of the subtask execution
+ * attempt who sent this event.
+ *
+ * @param subtaskId the subtask id of the source reader who sent the source event.
+ * @param attemptNumber the attempt number of the source reader who sent the source event.
+ * @param sourceEvent the source event from the source reader.
+ */
+ void handleSourceEvent(int subtaskId, int attemptNumber, SourceEvent sourceEvent);
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java
index c30b1c96d2b4f..b36d2f7221084 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java
@@ -28,6 +28,7 @@
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.api.connector.source.SupportsHandleExecutionAttemptSourceEvent;
import org.apache.flink.table.connector.source.DynamicFilteringEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,7 +42,8 @@
/** A {@link SplitEnumerator} implementation for {@link StaticFileStoreSource} input. */
public class StaticFileStoreSplitEnumerator
- implements SplitEnumerator