From cdcf0e13a01c7e6eb0478e968afa7a27dad00d80 Mon Sep 17 00:00:00 2001 From: "liming.1018" Date: Wed, 30 Oct 2024 15:45:37 +0800 Subject: [PATCH] [flink] support setting uid suffix for source/sink to improve state compatibility when jobGraph changes. --- .../flink_connector_configuration.html | 12 + .../paimon/flink/FlinkConnectorOptions.java | 18 + .../paimon/flink/sink/DynamicBucketSink.java | 19 +- .../apache/paimon/flink/sink/FlinkSink.java | 15 + .../flink/source/FlinkSourceBuilder.java | 13 + .../paimon/flink/FlinkJobRecoveryITCase.java | 320 ++++++++++++++++++ 6 files changed, 395 insertions(+), 2 deletions(-) create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkJobRecoveryITCase.java diff --git a/docs/layouts/shortcodes/generated/flink_connector_configuration.html b/docs/layouts/shortcodes/generated/flink_connector_configuration.html index 4e12c88bcdec5..91074068a8c35 100644 --- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html +++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html @@ -236,6 +236,18 @@ Boolean If true, flink sink will use managed memory for merge tree; otherwise, it will create an independent memory allocator. + +
sink.operator-uid.suffix
+ (none) + String + Set the uid suffix for the writer, dynamic bucket assigner and committer operators. The uid format is ${UID_PREFIX}_${TABLE_NAME}_${USER_UID_SUFFIX}. If the uid suffix is not set, flink will automatically generate the operator uid, which may be incompatible when the topology changes. + + +
source.operator-uid.suffix
+ (none) + String + Set the uid suffix for the source operators. After setting, the uid format is ${UID_PREFIX}_${TABLE_NAME}_${USER_UID_SUFFIX}. If the uid suffix is not set, flink will automatically generate the operator uid, which may be incompatible when the topology changes. +
source.checkpoint-align.enabled
false diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java index 73b00460190ea..1bdf630335d07 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java @@ -405,6 +405,24 @@ public class FlinkConnectorOptions { .withDescription( "Optional endInput watermark used in case of batch mode or bounded stream."); + public static final ConfigOption SOURCE_OPERATOR_UID_SUFFIX = + key("source.operator-uid.suffix") + .stringType() + .noDefaultValue() + .withDescription( + "Set the uid suffix for the source operators. After setting, the uid format is " + + "${UID_PREFIX}_${TABLE_NAME}_${USER_UID_SUFFIX}. If the uid suffix is not set, flink will " + + "automatically generate the operator uid, which may be incompatible when the topology changes."); + + public static final ConfigOption SINK_OPERATOR_UID_SUFFIX = + key("sink.operator-uid.suffix") + .stringType() + .noDefaultValue() + .withDescription( + "Set the uid suffix for the writer, dynamic bucket assigner and committer operators. The uid format is " + + "${UID_PREFIX}_${TABLE_NAME}_${USER_UID_SUFFIX}. If the uid suffix is not set, flink will " + + "automatically generate the operator uid, which may be incompatible when the topology changes."); + public static List> getOptions() { final Field[] fields = FlinkConnectorOptions.class.getFields(); final List> list = new ArrayList<>(fields.length); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java index cf697108fd326..ef4b6495649c1 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java @@ -24,18 +24,21 @@ import org.apache.paimon.table.sink.ChannelComputer; import org.apache.paimon.table.sink.PartitionKeyExtractor; import org.apache.paimon.utils.SerializableFunction; +import org.apache.paimon.utils.StringUtils; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import javax.annotation.Nullable; import java.util.Map; import static org.apache.paimon.CoreOptions.createCommitUser; +import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_OPERATOR_UID_SUFFIX; import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition; /** Sink for dynamic bucket table. */ @@ -43,6 +46,8 @@ public abstract class DynamicBucketSink extends FlinkWriteSink overwritePartition) { super(table, overwritePartition); @@ -88,11 +93,21 @@ public DataStreamSink build(DataStream input, @Nullable Integer parallelis initialCommitUser, table, numAssigners, extractorFunction(), false); TupleTypeInfo> rowWithBucketType = new TupleTypeInfo<>(partitionByKeyHash.getType(), BasicTypeInfo.INT_TYPE_INFO); - DataStream> bucketAssigned = + SingleOutputStreamOperator> bucketAssigned = partitionByKeyHash - .transform("dynamic-bucket-assigner", rowWithBucketType, assignerOperator) + .transform( + DYNAMIC_BUCKET_ASSIGNER_NAME, rowWithBucketType, assignerOperator) .setParallelism(partitionByKeyHash.getParallelism()); + String uidSuffix = table.options().get(SINK_OPERATOR_UID_SUFFIX.key()); + if (!StringUtils.isNullOrWhitespaceOnly(uidSuffix)) { + bucketAssigned = + bucketAssigned.uid( + String.format( + "%s_%s_%s", + DYNAMIC_BUCKET_ASSIGNER_NAME, table.name(), uidSuffix)); + } + // 3. shuffle by partition & bucket DataStream> partitionByBucket = partition(bucketAssigned, channelComputer2(), parallelism); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java index e483e3c19f740..fcdc485afaf4d 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java @@ -61,6 +61,7 @@ import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_COMMITTER_MEMORY; import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_COMMITTER_OPERATOR_CHAINING; import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_MANAGED_WRITER_BUFFER_MEMORY; +import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_OPERATOR_UID_SUFFIX; import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_USE_MANAGED_MEMORY; import static org.apache.paimon.flink.utils.ManagedMemoryUtils.declareManagedMemory; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -223,6 +224,12 @@ public DataStream doWrite( .setParallelism(parallelism == null ? input.getParallelism() : parallelism); Options options = Options.fromMap(table.options()); + + String uidSuffix = options.get(SINK_OPERATOR_UID_SUFFIX); + if (options.get(SINK_OPERATOR_UID_SUFFIX) != null) { + written = written.uid(String.format("%s_%s_%s", WRITER_NAME, table.name(), uidSuffix)); + } + if (options.get(SINK_USE_MANAGED_MEMORY)) { declareManagedMemory(written, options.get(SINK_MANAGED_WRITER_BUFFER_MEMORY)); } @@ -274,6 +281,14 @@ protected DataStreamSink doCommit(DataStream written, String com committerOperator) .setParallelism(1) .setMaxParallelism(1); + if (options.get(SINK_OPERATOR_UID_SUFFIX) != null) { + committed.uid( + String.format( + "%s_%s_%s", + GLOBAL_COMMITTER_NAME, + table.name(), + options.get(SINK_OPERATOR_UID_SUFFIX))); + } configureGlobalCommitter( committed, options.get(SINK_COMMITTER_CPU), options.get(SINK_COMMITTER_MEMORY)); return committed.addSink(new DiscardingSink<>()).name("end").setParallelism(1); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java index 3131ae0e0afa8..3d7f5c1f0ef9f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java @@ -34,6 +34,7 @@ import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.utils.StringUtils; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; @@ -63,6 +64,7 @@ import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType; import static org.apache.paimon.CoreOptions.StreamingReadMode.FILE; +import static org.apache.paimon.flink.FlinkConnectorOptions.SOURCE_OPERATOR_UID_SUFFIX; import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType; import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.paimon.utils.Preconditions.checkState; @@ -73,6 +75,7 @@ * @since 0.8 */ public class FlinkSourceBuilder { + private static final String SOURCE_NAME = "Source"; private final Table table; private final Options conf; @@ -210,6 +213,16 @@ private DataStream toDataStream(Source source) { : watermarkStrategy, sourceName, produceTypeInfo()); + + String uidSuffix = table.options().get(SOURCE_OPERATOR_UID_SUFFIX.key()); + if (!StringUtils.isNullOrWhitespaceOnly(uidSuffix)) { + dataStream = + (DataStreamSource) + dataStream.uid( + String.format( + "%s_%s_%s", SOURCE_NAME, table.name(), uidSuffix)); + } + if (parallelism != null) { dataStream.setParallelism(parallelism); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkJobRecoveryITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkJobRecoveryITCase.java new file mode 100644 index 0000000000000..1176aed34fc67 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkJobRecoveryITCase.java @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.table.BucketMode; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ExternalizedCheckpointRetention; +import org.apache.flink.configuration.StateRecoveryOptions; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.types.Row; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test case for flink source / sink restore from savepoint. */ +public class FlinkJobRecoveryITCase extends CatalogITCaseBase { + + @BeforeEach + @Override + public void before() throws IOException { + super.before(); + + // disable checkpoint + sEnv.getConfig() + .getConfiguration() + .set( + CheckpointingOptions.EXTERNALIZED_CHECKPOINT_RETENTION, + ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION) + .removeConfig(CheckpointingOptions.CHECKPOINTING_INTERVAL); + + // insert source data + batchSql("INSERT INTO source_table1 VALUES (1, 'test-1', '20241030')"); + batchSql("INSERT INTO source_table1 VALUES (2, 'test-2', '20241030')"); + batchSql("INSERT INTO source_table1 VALUES (3, 'test-3', '20241030')"); + batchSql( + "INSERT INTO source_table2 VALUES (4, 'test-4', '20241031'), (5, 'test-5', '20241031'), (6, 'test-6', '20241031')"); + } + + @Override + protected List ddl() { + return Arrays.asList( + String.format( + "CREATE CATALOG `fs_catalog` WITH ('type'='paimon', 'warehouse'='%s')", + path), + "CREATE TABLE IF NOT EXISTS `source_table1` (k INT, f1 STRING, dt STRING) WITH ('bucket'='1', 'bucket-key'='k')", + "CREATE TABLE IF NOT EXISTS `source_table2` (k INT, f1 STRING, dt STRING) WITH ('bucket'='1', 'bucket-key'='k')"); + } + + @ParameterizedTest + @EnumSource(BucketMode.class) + @Timeout(300) + public void testRestoreFromSavepointWithJobGraphChange(BucketMode bucketMode) throws Exception { + createTargetTable("target_table", bucketMode); + String beforeRecoverSql = + "INSERT INTO `target_table` /*+ OPTIONS('sink.operator-uid.suffix'='test-uid') */ SELECT * FROM source_table1 /*+ OPTIONS('source.operator-uid.suffix'='test-uid') */"; + String beforeRecoverCheckSql = "SELECT * FROM target_table"; + List beforeRecoverExpectedRows = + Arrays.asList( + Row.of(1, "test-1", "20241030"), + Row.of(2, "test-2", "20241030"), + Row.of(3, "test-3", "20241030")); + String afterRecoverSql = + "INSERT INTO `target_table` /*+ OPTIONS('sink.operator-uid.suffix'='test-uid') */ (SELECT * FROM source_table1 /*+ OPTIONS('source.operator-uid.suffix'='test-uid') */ UNION ALL SELECT * FROM source_table2)"; + String afterRecoverCheckSql = "SELECT * FROM target_table"; + List afterRecoverExpectedRows = + Arrays.asList( + Row.of(1, "test-1", "20241030"), + Row.of(2, "test-2", "20241030"), + Row.of(3, "test-3", "20241030"), + Row.of(4, "test-4", "20241031"), + Row.of(5, "test-5", "20241031"), + Row.of(6, "test-6", "20241031")); + testRecoverFromSavepoint( + beforeRecoverSql, + beforeRecoverCheckSql, + beforeRecoverExpectedRows, + afterRecoverSql, + afterRecoverCheckSql, + afterRecoverExpectedRows, + Collections.emptyList(), + Collections.emptyMap()); + } + + @Test + @Timeout(300) + public void testRestoreFromSavepointWithIgnoreSourceState() throws Exception { + createTargetTable("target_table", BucketMode.HASH_FIXED); + String beforeRecoverSql = "INSERT INTO `target_table` SELECT * FROM source_table1"; + String beforeRecoverCheckSql = "SELECT * FROM target_table"; + List beforeRecoverExpectedRows = + Arrays.asList( + Row.of(1, "test-1", "20241030"), + Row.of(2, "test-2", "20241030"), + Row.of(3, "test-3", "20241030")); + String afterRecoverSql = + "INSERT INTO `target_table` SELECT * FROM source_table2 /*+ OPTIONS('source.operator-uid.suffix'='test-uid') */"; + String afterRecoverCheckSql = "SELECT * FROM target_table"; + List afterRecoverExpectedRows = + Arrays.asList( + Row.of(1, "test-1", "20241030"), + Row.of(2, "test-2", "20241030"), + Row.of(3, "test-3", "20241030"), + Row.of(4, "test-4", "20241031"), + Row.of(5, "test-5", "20241031"), + Row.of(6, "test-6", "20241031")); + testRecoverFromSavepoint( + beforeRecoverSql, + beforeRecoverCheckSql, + beforeRecoverExpectedRows, + afterRecoverSql, + afterRecoverCheckSql, + afterRecoverExpectedRows, + Collections.emptyList(), + Collections.singletonMap( + StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key(), "true")); + } + + @Test + @Timeout(300) + public void testRestoreFromSavepointWithIgnoreSinkState() throws Exception { + createTargetTable("target_table", BucketMode.HASH_FIXED); + createTargetTable("target_table2", BucketMode.HASH_FIXED); + + String beforeRecoverSql = "INSERT INTO `target_table` SELECT * FROM source_table1"; + String beforeRecoverCheckSql = "SELECT * FROM target_table"; + List beforeRecoverExpectedRows = + Arrays.asList( + Row.of(1, "test-1", "20241030"), + Row.of(2, "test-2", "20241030"), + Row.of(3, "test-3", "20241030")); + String afterRecoverSql = + "INSERT INTO `target_table2` /*+ OPTIONS('sink.operator-uid.suffix'='test-uid') */ SELECT * FROM source_table1"; + String afterRecoverCheckSql = "SELECT * FROM target_table2"; + List afterRecoverExpectedRows = + Arrays.asList( + Row.of(7, "test-7", "20241030"), + Row.of(8, "test-8", "20241030"), + Row.of(9, "test-9", "20241030")); + String updateSql = + "INSERT INTO source_table1 VALUES (7, 'test-7', '20241030'), (8, 'test-8', '20241030'), (9, 'test-9', '20241030')"; + testRecoverFromSavepoint( + beforeRecoverSql, + beforeRecoverCheckSql, + beforeRecoverExpectedRows, + afterRecoverSql, + afterRecoverCheckSql, + afterRecoverExpectedRows, + Collections.singletonList(updateSql), + Collections.singletonMap( + StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key(), "true")); + } + + private void testRecoverFromSavepoint( + String beforeRecoverSql, + String beforeRecoverCheckSql, + List beforeRecoverExpectedRows, + String afterRecoverSql, + String afterRecoverCheckSql, + List afterRecoverExpectedRows, + List updateSql, + Map recoverOptions) + throws Exception { + + JobClient jobClient = sEnv.executeSql(beforeRecoverSql).getJobClient().get(); + String checkpointPath = + triggerCheckpointAndWaitForWrites(jobClient, beforeRecoverExpectedRows.size()); + jobClient.cancel().get(); + + List rows = batchSql(beforeRecoverCheckSql); + assertThat(rows.size()).isEqualTo(beforeRecoverExpectedRows.size()); + assertThat(rows).containsExactlyInAnyOrder(beforeRecoverExpectedRows.toArray(new Row[0])); + + for (String sql : updateSql) { + batchSql(sql); + } + + Configuration config = + sEnv.getConfig() + .getConfiguration() + .set(StateRecoveryOptions.SAVEPOINT_PATH, checkpointPath); + for (Map.Entry entry : recoverOptions.entrySet()) { + config.setString(entry.getKey(), entry.getValue()); + } + + jobClient = sEnv.executeSql(afterRecoverSql).getJobClient().get(); + triggerCheckpointAndWaitForWrites(jobClient, afterRecoverExpectedRows.size()); + jobClient.cancel().get(); + + rows = batchSql(afterRecoverCheckSql); + assertThat(rows.size()).isEqualTo(afterRecoverExpectedRows.size()); + assertThat(rows).containsExactlyInAnyOrder(afterRecoverExpectedRows.toArray(new Row[0])); + } + + private void createTargetTable(String tableName, BucketMode bucketMode) { + switch (bucketMode) { + case HASH_FIXED: + batchSql( + String.format( + "CREATE TABLE IF NOT EXISTS `%s` (k INT, f1 STRING, pt STRING, PRIMARY KEY(k, pt) NOT ENFORCED) WITH ('bucket'='2', 'commit.force-create-snapshot'='true')", + tableName)); + return; + case HASH_DYNAMIC: + batchSql( + String.format( + "CREATE TABLE IF NOT EXISTS `%s` (k INT, f1 STRING, pt STRING, PRIMARY KEY(k, pt) NOT ENFORCED) WITH ('bucket'='-1', 'commit.force-create-snapshot'='true')", + tableName)); + return; + case CROSS_PARTITION: + batchSql( + String.format( + "CREATE TABLE IF NOT EXISTS `%s` (k INT, f1 STRING, pt STRING, PRIMARY KEY(k) NOT ENFORCED) WITH ('bucket'='-1', 'commit.force-create-snapshot'='true')", + tableName)); + return; + case BUCKET_UNAWARE: + batchSql( + String.format( + "CREATE TABLE IF NOT EXISTS `%s` (k INT, f1 STRING, pt STRING) WITH ('bucket'='-1', 'commit.force-create-snapshot'='true')", + tableName)); + return; + default: + throw new IllegalArgumentException("Unsupported bucket mode: " + bucketMode); + } + } + + private Snapshot waitForNewSnapshot(String tableName, long initialSnapshot) + throws InterruptedException { + Snapshot snapshot = findLatestSnapshot(tableName); + while (snapshot == null || snapshot.id() == initialSnapshot) { + Thread.sleep(2000L); + snapshot = findLatestSnapshot(tableName); + } + return snapshot; + } + + @SuppressWarnings("unchecked") + private T reflectGetField(Object instance, String fieldName) + throws NoSuchFieldException, IllegalAccessException { + Field field = instance.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + return (T) field.get(instance); + } + + private String triggerCheckpointAndWaitForWrites(JobClient jobClient, long totalReocrds) + throws Exception { + MiniCluster miniCluster = reflectGetField(jobClient, "miniCluster"); + JobID jobID = jobClient.getJobID(); + JobStatus jobStatus = jobClient.getJobStatus().get(); + while (jobStatus == JobStatus.INITIALIZING || jobStatus == JobStatus.CREATED) { + Thread.sleep(2000L); + jobStatus = jobClient.getJobStatus().get(); + } + + if (jobStatus != JobStatus.RUNNING) { + throw new IllegalStateException("Job status is not RUNNING"); + } + + AtomicBoolean allTaskRunning = new AtomicBoolean(false); + while (!allTaskRunning.get()) { + allTaskRunning.set(true); + Thread.sleep(2000L); + miniCluster + .getExecutionGraph(jobID) + .thenAccept( + eg -> + eg.getAllExecutionVertices() + .forEach( + ev -> { + if (ev.getExecutionState() + != ExecutionState.RUNNING) { + allTaskRunning.set(false); + } + })) + .get(); + } + + String checkpointPath = miniCluster.triggerCheckpoint(jobID).get(); + Snapshot snapshot = waitForNewSnapshot("target_table", -1L); + while (snapshot.totalRecordCount() < totalReocrds) { + checkpointPath = miniCluster.triggerCheckpoint(jobID).get(); + snapshot = waitForNewSnapshot("target_table", snapshot.id()); + } + + return checkpointPath; + } +}