diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java index 24823a78927f..5b1b53d6326a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java @@ -183,7 +183,10 @@ public PartitionMarkDoneTriggerState(boolean isRestored, OperatorStateStore stat public List restore() throws Exception { List pendingPartitions = new ArrayList<>(); if (isRestored) { - pendingPartitions.addAll(pendingPartitionsState.get().iterator().next()); + Iterator> state = pendingPartitionsState.get().iterator(); + if (state.hasNext()) { + pendingPartitions.addAll(state.next()); + } } return pendingPartitions; } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java index b2764fc37c6e..c96db63cb52d 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java @@ -22,6 +22,7 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.GenericRow; +import org.apache.paimon.flink.FlinkConnectorOptions; import org.apache.paimon.flink.utils.TestingMetricUtils; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.io.CompactIncrement; @@ -37,6 +38,7 @@ import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.ThrowingConsumer; +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; import org.apache.flink.api.common.ExecutionConfig; @@ -281,6 +283,34 @@ public void testRestoreCommitUser() throws Exception { Assertions.assertThat(actual).hasSameElementsAs(Lists.newArrayList(commitUser)); } + @Test + public void testRestoreEmptyMarkDoneState() throws Exception { + FileStoreTable table = createFileStoreTable(o -> {}, Collections.singletonList("b")); + + String commitUser = UUID.randomUUID().toString(); + + // 1. Generate operatorSubtaskState + OperatorSubtaskState snapshot; + { + OneInputStreamOperatorTestHarness testHarness = + createLossyTestHarness(table, commitUser); + + testHarness.open(); + snapshot = writeAndSnapshot(table, commitUser, 1, 1, testHarness); + testHarness.close(); + } + // 2. enable mark done. + table = + table.copy( + ImmutableMap.of( + FlinkConnectorOptions.PARTITION_IDLE_TIME_TO_DONE.key(), "1h")); + + // 3. restore from state. + OneInputStreamOperatorTestHarness testHarness = + createLossyTestHarness(table); + testHarness.initializeState(snapshot); + } + @Test public void testCommitInputEnd() throws Exception { FileStoreTable table = createFileStoreTable(); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTestBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTestBase.java index 77b53ba7069d..a69f8dbd3a35 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTestBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTestBase.java @@ -88,10 +88,15 @@ protected void assertResults(FileStoreTable table, String... expected) { } protected FileStoreTable createFileStoreTable() throws Exception { - return createFileStoreTable(options -> {}); + return createFileStoreTable(options -> {}, Collections.emptyList()); } protected FileStoreTable createFileStoreTable(Consumer setOptions) throws Exception { + return createFileStoreTable(setOptions, Collections.emptyList()); + } + + protected FileStoreTable createFileStoreTable( + Consumer setOptions, List partitionKeys) throws Exception { Options conf = new Options(); conf.set(CoreOptions.PATH, tablePath.toString()); conf.setString("bucket", "1"); @@ -101,7 +106,7 @@ protected FileStoreTable createFileStoreTable(Consumer setOptions) thro schemaManager.createTable( new Schema( ROW_TYPE.getFields(), - Collections.emptyList(), + partitionKeys, Collections.emptyList(), conf.toMap(), ""));