From a1beb7a3ae63628453471b29afa505d0f734b7e0 Mon Sep 17 00:00:00 2001 From: yuzelin <33053040+yuzelin@users.noreply.github.com> Date: Wed, 3 Apr 2024 09:59:40 +0800 Subject: [PATCH] [hotfix] Fix unstable test CompactDatabaseActionITCase#testCombinedModeWithDynamicOptions (#3150) --- .../action/CompactDatabaseActionITCase.java | 22 +++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java index 0a026490c1f1..d7104f591893 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java @@ -35,7 +35,6 @@ import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; -import org.apache.paimon.utils.CommonTestUtils; import org.apache.paimon.utils.SnapshotManager; import org.apache.flink.core.execution.JobClient; @@ -56,6 +55,7 @@ import java.util.Map; import java.util.concurrent.ThreadLocalRandom; +import static org.apache.paimon.utils.CommonTestUtils.waitUtil; import static org.assertj.core.api.Assertions.assertThat; /** IT cases for {@link CompactDatabaseAction}. */ @@ -291,7 +291,7 @@ public void testStreamingCompact(String mode) throws Exception { 60_000); // assert dedicated compact job will expire snapshots - CommonTestUtils.waitUtil( + waitUtil( () -> snapshotManager.latestSnapshotId() - 2 == snapshotManager.earliestSnapshotId(), @@ -384,7 +384,7 @@ public void testStreamingCompact(String mode) throws Exception { 60_000); // assert dedicated compact job will expire snapshots - CommonTestUtils.waitUtil( + waitUtil( () -> snapshotManager.latestSnapshotId() - 2 == snapshotManager.earliestSnapshotId(), @@ -751,16 +751,20 @@ public void testCombinedModeWithDynamicOptions() throws Exception { action.withStreamExecutionEnvironment(env).build(); JobClient jobClient = env.executeAsync(); - CommonTestUtils.waitUtil( + waitUtil( () -> snapshotManager.latestSnapshotId() == 11L, Duration.ofSeconds(60), - Duration.ofMillis(100)); + Duration.ofMillis(500)); jobClient.cancel(); - Snapshot latest = snapshotManager.latestSnapshot(); - Snapshot earliest = snapshotManager.earliestSnapshot(); - assertThat(latest.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT); - assertThat(latest.id() - earliest.id()).isEqualTo(2); + assertThat(snapshotManager.latestSnapshot().commitKind()) + .isEqualTo(Snapshot.CommitKind.COMPACT); + + waitUtil( + () -> snapshotManager.earliestSnapshotId() == 9L, + Duration.ofSeconds(60), + Duration.ofMillis(200), + "Failed to wait snapshot expiration success"); } private void writeData(