Skip to content

Commit

Permalink
[core] Fix NoSuchElementException when enable partition mark done (#4356
Browse files Browse the repository at this point in the history
)
  • Loading branch information
Aitozi authored Oct 21, 2024
1 parent 87321e1 commit 936a842
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,10 @@ public PartitionMarkDoneTriggerState(boolean isRestored, OperatorStateStore stat
public List<String> restore() throws Exception {
List<String> pendingPartitions = new ArrayList<>();
if (isRestored) {
pendingPartitions.addAll(pendingPartitionsState.get().iterator().next());
Iterator<List<String>> state = pendingPartitionsState.get().iterator();
if (state.hasNext()) {
pendingPartitions.addAll(state.next());
}
}
return pendingPartitions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.utils.TestingMetricUtils;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.CompactIncrement;
Expand All @@ -37,6 +38,7 @@
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.ThrowingConsumer;

import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;

import org.apache.flink.api.common.ExecutionConfig;
Expand Down Expand Up @@ -281,6 +283,34 @@ public void testRestoreCommitUser() throws Exception {
Assertions.assertThat(actual).hasSameElementsAs(Lists.newArrayList(commitUser));
}

@Test
public void testRestoreEmptyMarkDoneState() throws Exception {
FileStoreTable table = createFileStoreTable(o -> {}, Collections.singletonList("b"));

String commitUser = UUID.randomUUID().toString();

// 1. Generate operatorSubtaskState
OperatorSubtaskState snapshot;
{
OneInputStreamOperatorTestHarness<Committable, Committable> testHarness =
createLossyTestHarness(table, commitUser);

testHarness.open();
snapshot = writeAndSnapshot(table, commitUser, 1, 1, testHarness);
testHarness.close();
}
// 2. enable mark done.
table =
table.copy(
ImmutableMap.of(
FlinkConnectorOptions.PARTITION_IDLE_TIME_TO_DONE.key(), "1h"));

// 3. restore from state.
OneInputStreamOperatorTestHarness<Committable, Committable> testHarness =
createLossyTestHarness(table);
testHarness.initializeState(snapshot);
}

@Test
public void testCommitInputEnd() throws Exception {
FileStoreTable table = createFileStoreTable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,15 @@ protected void assertResults(FileStoreTable table, String... expected) {
}

protected FileStoreTable createFileStoreTable() throws Exception {
return createFileStoreTable(options -> {});
return createFileStoreTable(options -> {}, Collections.emptyList());
}

protected FileStoreTable createFileStoreTable(Consumer<Options> setOptions) throws Exception {
return createFileStoreTable(setOptions, Collections.emptyList());
}

protected FileStoreTable createFileStoreTable(
Consumer<Options> setOptions, List<String> partitionKeys) throws Exception {
Options conf = new Options();
conf.set(CoreOptions.PATH, tablePath.toString());
conf.setString("bucket", "1");
Expand All @@ -101,7 +106,7 @@ protected FileStoreTable createFileStoreTable(Consumer<Options> setOptions) thro
schemaManager.createTable(
new Schema(
ROW_TYPE.getFields(),
Collections.emptyList(),
partitionKeys,
Collections.emptyList(),
conf.toMap(),
""));
Expand Down

0 comments on commit 936a842

Please sign in to comment.