Skip to content

Commit

Permalink
[hotfix] Fix unstable test CompactDatabaseActionITCase#testCombinedMo…
Browse files Browse the repository at this point in the history
…deWithDynamicOptions (#3150)
  • Loading branch information
yuzelin authored Apr 3, 2024
1 parent 6757f56 commit a1beb7a
Showing 1 changed file with 13 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommonTestUtils;
import org.apache.paimon.utils.SnapshotManager;

import org.apache.flink.core.execution.JobClient;
Expand All @@ -56,6 +55,7 @@
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;

import static org.apache.paimon.utils.CommonTestUtils.waitUtil;
import static org.assertj.core.api.Assertions.assertThat;

/** IT cases for {@link CompactDatabaseAction}. */
Expand Down Expand Up @@ -291,7 +291,7 @@ public void testStreamingCompact(String mode) throws Exception {
60_000);

// assert dedicated compact job will expire snapshots
CommonTestUtils.waitUtil(
waitUtil(
() ->
snapshotManager.latestSnapshotId() - 2
== snapshotManager.earliestSnapshotId(),
Expand Down Expand Up @@ -384,7 +384,7 @@ public void testStreamingCompact(String mode) throws Exception {
60_000);

// assert dedicated compact job will expire snapshots
CommonTestUtils.waitUtil(
waitUtil(
() ->
snapshotManager.latestSnapshotId() - 2
== snapshotManager.earliestSnapshotId(),
Expand Down Expand Up @@ -751,16 +751,20 @@ public void testCombinedModeWithDynamicOptions() throws Exception {
action.withStreamExecutionEnvironment(env).build();
JobClient jobClient = env.executeAsync();

CommonTestUtils.waitUtil(
waitUtil(
() -> snapshotManager.latestSnapshotId() == 11L,
Duration.ofSeconds(60),
Duration.ofMillis(100));
Duration.ofMillis(500));
jobClient.cancel();

Snapshot latest = snapshotManager.latestSnapshot();
Snapshot earliest = snapshotManager.earliestSnapshot();
assertThat(latest.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
assertThat(latest.id() - earliest.id()).isEqualTo(2);
assertThat(snapshotManager.latestSnapshot().commitKind())
.isEqualTo(Snapshot.CommitKind.COMPACT);

waitUtil(
() -> snapshotManager.earliestSnapshotId() == 9L,
Duration.ofSeconds(60),
Duration.ofMillis(200),
"Failed to wait snapshot expiration success");
}

private void writeData(
Expand Down

0 comments on commit a1beb7a

Please sign in to comment.