diff --git a/docs/layouts/shortcodes/generated/flink_connector_configuration.html b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index ae89b4163c0c..2e6fb1f38741 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -242,6 +242,18 @@
Boolean |
If true, flink sink will use managed memory for merge tree; otherwise, it will create an independent memory allocator. |
+
+ sink.operator-uid.suffix |
+ (none) |
+ String |
+ Set the uid suffix for the writer, dynamic bucket assigner and committer operators. The uid format is ${UID_PREFIX}_${TABLE_NAME}_${USER_UID_SUFFIX}. If the uid suffix is not set, flink will automatically generate the operator uid, which may be incompatible when the topology changes. |
+
+
+ source.operator-uid.suffix |
+ (none) |
+ String |
+ Set the uid suffix for the source operators. After setting, the uid format is ${UID_PREFIX}_${TABLE_NAME}_${USER_UID_SUFFIX}. If the uid suffix is not set, flink will automatically generate the operator uid, which may be incompatible when the topology changes. |
+
source.checkpoint-align.enabled |
false |
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index 1adeeef0ed63..55e21a35470e 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -414,6 +414,24 @@ public class FlinkConnectorOptions {
+ "in order to compact several changelog files from the same partition into large ones, "
+ "which can decrease the number of small files. ");
+ public static final ConfigOption SOURCE_OPERATOR_UID_SUFFIX =
+ key("source.operator-uid.suffix")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Set the uid suffix for the source operators. After setting, the uid format is "
+ + "${UID_PREFIX}_${TABLE_NAME}_${USER_UID_SUFFIX}. If the uid suffix is not set, flink will "
+ + "automatically generate the operator uid, which may be incompatible when the topology changes.");
+
+ public static final ConfigOption SINK_OPERATOR_UID_SUFFIX =
+ key("sink.operator-uid.suffix")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Set the uid suffix for the writer, dynamic bucket assigner and committer operators. The uid format is "
+ + "${UID_PREFIX}_${TABLE_NAME}_${USER_UID_SUFFIX}. If the uid suffix is not set, flink will "
+ + "automatically generate the operator uid, which may be incompatible when the topology changes.");
+
public static List> getOptions() {
final Field[] fields = FlinkConnectorOptions.class.getFields();
final List> list = new ArrayList<>(fields.length);
@@ -429,6 +447,11 @@ public static List> getOptions() {
return list;
}
+ public static String generateCustomUid(
+ String uidPrefix, String tableName, String userDefinedSuffix) {
+ return String.format("%s_%s_%s", uidPrefix, tableName, userDefinedSuffix);
+ }
+
/** The mode of lookup cache. */
public enum LookupCacheMode {
/** Auto mode, try to use partial mode. */
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java
index cf697108fd32..c2299a7e8699 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java
@@ -24,18 +24,22 @@
import org.apache.paimon.table.sink.ChannelComputer;
import org.apache.paimon.table.sink.PartitionKeyExtractor;
import org.apache.paimon.utils.SerializableFunction;
+import org.apache.paimon.utils.StringUtils;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import javax.annotation.Nullable;
import java.util.Map;
import static org.apache.paimon.CoreOptions.createCommitUser;
+import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_OPERATOR_UID_SUFFIX;
+import static org.apache.paimon.flink.FlinkConnectorOptions.generateCustomUid;
import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
/** Sink for dynamic bucket table. */
@@ -43,6 +47,8 @@ public abstract class DynamicBucketSink extends FlinkWriteSink overwritePartition) {
super(table, overwritePartition);
@@ -88,11 +94,20 @@ public DataStreamSink> build(DataStream input, @Nullable Integer parallelis
initialCommitUser, table, numAssigners, extractorFunction(), false);
TupleTypeInfo> rowWithBucketType =
new TupleTypeInfo<>(partitionByKeyHash.getType(), BasicTypeInfo.INT_TYPE_INFO);
- DataStream> bucketAssigned =
+ SingleOutputStreamOperator> bucketAssigned =
partitionByKeyHash
- .transform("dynamic-bucket-assigner", rowWithBucketType, assignerOperator)
+ .transform(
+ DYNAMIC_BUCKET_ASSIGNER_NAME, rowWithBucketType, assignerOperator)
.setParallelism(partitionByKeyHash.getParallelism());
+ String uidSuffix = table.options().get(SINK_OPERATOR_UID_SUFFIX.key());
+ if (!StringUtils.isNullOrWhitespaceOnly(uidSuffix)) {
+ bucketAssigned =
+ bucketAssigned.uid(
+ generateCustomUid(
+ DYNAMIC_BUCKET_ASSIGNER_NAME, table.name(), uidSuffix));
+ }
+
// 3. shuffle by partition & bucket
DataStream> partitionByBucket =
partition(bucketAssigned, channelComputer2(), parallelism);
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index d7764f149a55..59f2f4b1035f 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -66,7 +66,9 @@
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_COMMITTER_MEMORY;
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_COMMITTER_OPERATOR_CHAINING;
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_MANAGED_WRITER_BUFFER_MEMORY;
+import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_OPERATOR_UID_SUFFIX;
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_USE_MANAGED_MEMORY;
+import static org.apache.paimon.flink.FlinkConnectorOptions.generateCustomUid;
import static org.apache.paimon.flink.utils.ManagedMemoryUtils.declareManagedMemory;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -228,6 +230,12 @@ public DataStream doWrite(
.setParallelism(parallelism == null ? input.getParallelism() : parallelism);
Options options = Options.fromMap(table.options());
+
+ String uidSuffix = options.get(SINK_OPERATOR_UID_SUFFIX);
+ if (options.get(SINK_OPERATOR_UID_SUFFIX) != null) {
+ written = written.uid(generateCustomUid(WRITER_NAME, table.name(), uidSuffix));
+ }
+
if (options.get(SINK_USE_MANAGED_MEMORY)) {
declareManagedMemory(written, options.get(SINK_MANAGED_WRITER_BUFFER_MEMORY));
}
@@ -295,6 +303,14 @@ protected DataStreamSink> doCommit(DataStream written, String com
committerOperator)
.setParallelism(1)
.setMaxParallelism(1);
+ if (options.get(SINK_OPERATOR_UID_SUFFIX) != null) {
+ committed =
+ committed.uid(
+ generateCustomUid(
+ GLOBAL_COMMITTER_NAME,
+ table.name(),
+ options.get(SINK_OPERATOR_UID_SUFFIX)));
+ }
configureGlobalCommitter(
committed, options.get(SINK_COMMITTER_CPU), options.get(SINK_COMMITTER_MEMORY));
return committed.addSink(new DiscardingSink<>()).name("end").setParallelism(1);
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
index 3131ae0e0afa..ed94043c035d 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
@@ -34,6 +34,7 @@
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.utils.StringUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
@@ -63,6 +64,8 @@
import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
import static org.apache.paimon.CoreOptions.StreamingReadMode.FILE;
+import static org.apache.paimon.flink.FlinkConnectorOptions.SOURCE_OPERATOR_UID_SUFFIX;
+import static org.apache.paimon.flink.FlinkConnectorOptions.generateCustomUid;
import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkState;
@@ -73,6 +76,7 @@
* @since 0.8
*/
public class FlinkSourceBuilder {
+ private static final String SOURCE_NAME = "Source";
private final Table table;
private final Options conf;
@@ -210,6 +214,14 @@ private DataStream toDataStream(Source source) {
: watermarkStrategy,
sourceName,
produceTypeInfo());
+
+ String uidSuffix = table.options().get(SOURCE_OPERATOR_UID_SUFFIX.key());
+ if (!StringUtils.isNullOrWhitespaceOnly(uidSuffix)) {
+ dataStream =
+ (DataStreamSource)
+ dataStream.uid(generateCustomUid(SOURCE_NAME, table.name(), uidSuffix));
+ }
+
if (parallelism != null) {
dataStream.setParallelism(parallelism);
}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkJobRecoveryITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkJobRecoveryITCase.java
new file mode 100644
index 000000000000..c46c4c358922
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkJobRecoveryITCase.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.utils.Pair;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExternalizedCheckpointRetention;
+import org.apache.flink.configuration.StateRecoveryOptions;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test case for flink source / sink restore from savepoint. */
+@SuppressWarnings("BusyWait")
+public class FlinkJobRecoveryITCase extends CatalogITCaseBase {
+
+ private static final String MINI_CLUSTER_FIELD = "miniCluster";
+
+ @BeforeEach
+ @Override
+ public void before() throws IOException {
+ super.before();
+
+ // disable checkpoint
+ sEnv.getConfig()
+ .getConfiguration()
+ .set(
+ CheckpointingOptions.EXTERNALIZED_CHECKPOINT_RETENTION,
+ ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION)
+ .removeConfig(CheckpointingOptions.CHECKPOINTING_INTERVAL);
+
+ // insert source data
+ batchSql("INSERT INTO source_table1 VALUES (1, 'test-1', '20241030')");
+ batchSql("INSERT INTO source_table1 VALUES (2, 'test-2', '20241030')");
+ batchSql("INSERT INTO source_table1 VALUES (3, 'test-3', '20241030')");
+ batchSql(
+ "INSERT INTO source_table2 VALUES (4, 'test-4', '20241031'), (5, 'test-5', '20241031'), (6, 'test-6', '20241031')");
+ }
+
+ @Override
+ protected List ddl() {
+ return Arrays.asList(
+ String.format(
+ "CREATE CATALOG `fs_catalog` WITH ('type'='paimon', 'warehouse'='%s')",
+ path),
+ "CREATE TABLE IF NOT EXISTS `source_table1` (k INT, f1 STRING, dt STRING) WITH ('bucket'='1', 'bucket-key'='k')",
+ "CREATE TABLE IF NOT EXISTS `source_table2` (k INT, f1 STRING, dt STRING) WITH ('bucket'='1', 'bucket-key'='k')");
+ }
+
+ @ParameterizedTest
+ @EnumSource(BucketMode.class)
+ @Timeout(300)
+ public void testRestoreFromSavepointWithJobGraphChange(BucketMode bucketMode) throws Exception {
+ createTargetTable("target_table", bucketMode);
+ String beforeRecoverSql =
+ "INSERT INTO `target_table` /*+ OPTIONS('sink.operator-uid.suffix'='test-uid') */ SELECT * FROM source_table1 /*+ OPTIONS('source.operator-uid.suffix'='test-uid') */";
+ String beforeRecoverCheckSql = "SELECT * FROM target_table";
+ List beforeRecoverExpectedRows =
+ Arrays.asList(
+ Row.of(1, "test-1", "20241030"),
+ Row.of(2, "test-2", "20241030"),
+ Row.of(3, "test-3", "20241030"));
+ String afterRecoverSql =
+ "INSERT INTO `target_table` /*+ OPTIONS('sink.operator-uid.suffix'='test-uid') */ (SELECT * FROM source_table1 /*+ OPTIONS('source.operator-uid.suffix'='test-uid') */ UNION ALL SELECT * FROM source_table2)";
+ String afterRecoverCheckSql = "SELECT * FROM target_table";
+ List afterRecoverExpectedRows =
+ Arrays.asList(
+ Row.of(1, "test-1", "20241030"),
+ Row.of(2, "test-2", "20241030"),
+ Row.of(3, "test-3", "20241030"),
+ Row.of(4, "test-4", "20241031"),
+ Row.of(5, "test-5", "20241031"),
+ Row.of(6, "test-6", "20241031"));
+ testRecoverFromSavepoint(
+ beforeRecoverSql,
+ beforeRecoverCheckSql,
+ beforeRecoverExpectedRows,
+ afterRecoverSql,
+ afterRecoverCheckSql,
+ afterRecoverExpectedRows,
+ Collections.emptyList(),
+ Pair.of("target_table", "target_table"),
+ Collections.emptyMap());
+ }
+
+ @Test
+ @Timeout(300)
+ public void testRestoreFromSavepointWithIgnoreSourceState() throws Exception {
+ createTargetTable("target_table", BucketMode.HASH_FIXED);
+ String beforeRecoverSql = "INSERT INTO `target_table` SELECT * FROM source_table1";
+ String beforeRecoverCheckSql = "SELECT * FROM target_table";
+ List beforeRecoverExpectedRows =
+ Arrays.asList(
+ Row.of(1, "test-1", "20241030"),
+ Row.of(2, "test-2", "20241030"),
+ Row.of(3, "test-3", "20241030"));
+ String afterRecoverSql =
+ "INSERT INTO `target_table` SELECT * FROM source_table2 /*+ OPTIONS('source.operator-uid.suffix'='test-uid') */";
+ String afterRecoverCheckSql = "SELECT * FROM target_table";
+ List afterRecoverExpectedRows =
+ Arrays.asList(
+ Row.of(1, "test-1", "20241030"),
+ Row.of(2, "test-2", "20241030"),
+ Row.of(3, "test-3", "20241030"),
+ Row.of(4, "test-4", "20241031"),
+ Row.of(5, "test-5", "20241031"),
+ Row.of(6, "test-6", "20241031"));
+ testRecoverFromSavepoint(
+ beforeRecoverSql,
+ beforeRecoverCheckSql,
+ beforeRecoverExpectedRows,
+ afterRecoverSql,
+ afterRecoverCheckSql,
+ afterRecoverExpectedRows,
+ Collections.emptyList(),
+ Pair.of("target_table", "target_table"),
+ Collections.singletonMap(
+ StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key(), "true"));
+ }
+
+ @Test
+ @Timeout(300)
+ public void testRestoreFromSavepointWithIgnoreSinkState() throws Exception {
+ createTargetTable("target_table", BucketMode.HASH_FIXED);
+ createTargetTable("target_table2", BucketMode.HASH_FIXED);
+
+ String beforeRecoverSql = "INSERT INTO `target_table` SELECT * FROM source_table1";
+ String beforeRecoverCheckSql = "SELECT * FROM target_table";
+ List beforeRecoverExpectedRows =
+ Arrays.asList(
+ Row.of(1, "test-1", "20241030"),
+ Row.of(2, "test-2", "20241030"),
+ Row.of(3, "test-3", "20241030"));
+ String afterRecoverSql =
+ "INSERT INTO `target_table2` /*+ OPTIONS('sink.operator-uid.suffix'='test-uid') */ SELECT * FROM source_table1";
+ String afterRecoverCheckSql = "SELECT * FROM target_table2";
+ List afterRecoverExpectedRows =
+ Arrays.asList(
+ Row.of(7, "test-7", "20241030"),
+ Row.of(8, "test-8", "20241030"),
+ Row.of(9, "test-9", "20241030"));
+ String updateSql =
+ "INSERT INTO source_table1 VALUES (7, 'test-7', '20241030'), (8, 'test-8', '20241030'), (9, 'test-9', '20241030')";
+ testRecoverFromSavepoint(
+ beforeRecoverSql,
+ beforeRecoverCheckSql,
+ beforeRecoverExpectedRows,
+ afterRecoverSql,
+ afterRecoverCheckSql,
+ afterRecoverExpectedRows,
+ Collections.singletonList(updateSql),
+ Pair.of("target_table", "target_table2"),
+ Collections.singletonMap(
+ StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key(), "true"));
+ }
+
+ private void testRecoverFromSavepoint(
+ String beforeRecoverSql,
+ String beforeRecoverCheckSql,
+ List beforeRecoverExpectedRows,
+ String afterRecoverSql,
+ String afterRecoverCheckSql,
+ List afterRecoverExpectedRows,
+ List updateSql,
+ Pair targetTables,
+ Map recoverOptions)
+ throws Exception {
+
+ //noinspection OptionalGetWithoutIsPresent
+ JobClient jobClient = sEnv.executeSql(beforeRecoverSql).getJobClient().get();
+ String checkpointPath =
+ triggerCheckpointAndWaitForWrites(
+ jobClient, targetTables.getLeft(), beforeRecoverExpectedRows.size());
+ jobClient.cancel().get();
+
+ List rows = batchSql(beforeRecoverCheckSql);
+ assertThat(rows.size()).isEqualTo(beforeRecoverExpectedRows.size());
+ assertThat(rows).containsExactlyInAnyOrder(beforeRecoverExpectedRows.toArray(new Row[0]));
+
+ for (String sql : updateSql) {
+ batchSql(sql);
+ }
+
+ Configuration config =
+ sEnv.getConfig()
+ .getConfiguration()
+ .set(StateRecoveryOptions.SAVEPOINT_PATH, checkpointPath);
+ for (Map.Entry entry : recoverOptions.entrySet()) {
+ config.setString(entry.getKey(), entry.getValue());
+ }
+
+ //noinspection OptionalGetWithoutIsPresent
+ jobClient = sEnv.executeSql(afterRecoverSql).getJobClient().get();
+ triggerCheckpointAndWaitForWrites(
+ jobClient, targetTables.getRight(), afterRecoverExpectedRows.size());
+ jobClient.cancel().get();
+
+ rows = batchSql(afterRecoverCheckSql);
+ assertThat(rows.size()).isEqualTo(afterRecoverExpectedRows.size());
+ assertThat(rows).containsExactlyInAnyOrder(afterRecoverExpectedRows.toArray(new Row[0]));
+ }
+
+ private void createTargetTable(String tableName, BucketMode bucketMode) {
+ switch (bucketMode) {
+ case HASH_FIXED:
+ batchSql(
+ String.format(
+ "CREATE TABLE IF NOT EXISTS `%s` (k INT, f1 STRING, pt STRING, PRIMARY KEY(k, pt) NOT ENFORCED) WITH ('bucket'='2', 'commit.force-create-snapshot'='true')",
+ tableName));
+ return;
+ case HASH_DYNAMIC:
+ batchSql(
+ String.format(
+ "CREATE TABLE IF NOT EXISTS `%s` (k INT, f1 STRING, pt STRING, PRIMARY KEY(k, pt) NOT ENFORCED) WITH ('bucket'='-1', 'commit.force-create-snapshot'='true')",
+ tableName));
+ return;
+ case CROSS_PARTITION:
+ batchSql(
+ String.format(
+ "CREATE TABLE IF NOT EXISTS `%s` (k INT, f1 STRING, pt STRING, PRIMARY KEY(k) NOT ENFORCED) WITH ('bucket'='-1', 'commit.force-create-snapshot'='true')",
+ tableName));
+ return;
+ case BUCKET_UNAWARE:
+ batchSql(
+ String.format(
+ "CREATE TABLE IF NOT EXISTS `%s` (k INT, f1 STRING, pt STRING) WITH ('bucket'='-1', 'commit.force-create-snapshot'='true')",
+ tableName));
+ return;
+ default:
+ throw new IllegalArgumentException("Unsupported bucket mode: " + bucketMode);
+ }
+ }
+
+ private Snapshot waitForNewSnapshot(String tableName, long initialSnapshot)
+ throws InterruptedException {
+ Snapshot snapshot = findLatestSnapshot(tableName);
+ while (snapshot == null || snapshot.id() == initialSnapshot) {
+ Thread.sleep(2000L);
+ snapshot = findLatestSnapshot(tableName);
+ }
+ return snapshot;
+ }
+
+ @SuppressWarnings("unchecked")
+ private T reflectGetMiniCluster(Object instance)
+ throws NoSuchFieldException, IllegalAccessException {
+ Field field = instance.getClass().getDeclaredField(MINI_CLUSTER_FIELD);
+ field.setAccessible(true);
+ return (T) field.get(instance);
+ }
+
+ private String triggerCheckpointAndWaitForWrites(
+ JobClient jobClient, String targetTable, long totalRecords) throws Exception {
+ //noinspection resource
+ MiniCluster miniCluster = reflectGetMiniCluster(jobClient);
+ JobID jobID = jobClient.getJobID();
+ JobStatus jobStatus = jobClient.getJobStatus().get();
+ while (jobStatus == JobStatus.INITIALIZING || jobStatus == JobStatus.CREATED) {
+ Thread.sleep(2000L);
+ jobStatus = jobClient.getJobStatus().get();
+ }
+
+ if (jobStatus != JobStatus.RUNNING) {
+ throw new IllegalStateException("Job status is not RUNNING");
+ }
+
+ AtomicBoolean allTaskRunning = new AtomicBoolean(false);
+ while (!allTaskRunning.get()) {
+ allTaskRunning.set(true);
+ Thread.sleep(2000L);
+ miniCluster
+ .getExecutionGraph(jobID)
+ .thenAccept(
+ eg ->
+ eg.getAllExecutionVertices()
+ .forEach(
+ ev -> {
+ if (ev.getExecutionState()
+ != ExecutionState.RUNNING) {
+ allTaskRunning.set(false);
+ }
+ }))
+ .get();
+ }
+
+ String checkpointPath = miniCluster.triggerCheckpoint(jobID).get();
+ Snapshot snapshot = waitForNewSnapshot(targetTable, -1L);
+ //noinspection DataFlowIssue
+ while (snapshot.totalRecordCount() < totalRecords) {
+ checkpointPath = miniCluster.triggerCheckpoint(jobID).get();
+ snapshot = waitForNewSnapshot(targetTable, snapshot.id());
+ }
+
+ return checkpointPath;
+ }
+}