Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba committed Sep 10, 2024
1 parent 5f7ad3f commit d3143f8
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void addSplits(Collection<DorisSourceSplit> splits) {

@Override
public PendingSplitsCheckpoint snapshotState(long checkpointId) {
LOG.info("Checkpointing {} splits: {}", checkpointId, splits);
LOG.info("Checkpointing splits: {}, id {}", splits, checkpointId);
return new PendingSplitsCheckpoint(splits);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ public void testJobManagerFailoverSource() throws Exception {
initializeTableWithData(TABLE_CSV_JM);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
env.enableCheckpointing(100L);
env.enableCheckpointing(200L);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
String sourceDDL =
Expand All @@ -372,7 +372,7 @@ public void testJobManagerFailoverSource() throws Exception {
CloseableIterator<Row> iterator = tableResult.collect();
JobID jobId = tableResult.getJobClient().get().getJobID();

List<String> expectedSnapshotData = getExpectedData();
List<String> expectedData = getExpectedData();
if (iterator.hasNext()) {
LOG.info("trigger jobmanager failover...");
triggerFailover(
Expand All @@ -381,10 +381,9 @@ public void testJobManagerFailoverSource() throws Exception {
miniClusterResource.getMiniCluster(),
() -> sleepMs(100));
}
checkResultInAnyOrder(
"testJobManagerFailoverSource",
expectedSnapshotData.toArray(),
fetchRows(iterator).toArray());
List<String> actual = fetchRows(iterator);
LOG.info("actual data: {}, expected: {}", actual, expectedData);
Assert.assertTrue(actual.containsAll(expectedData));
}

private static List<String> getExpectedData() {
Expand Down Expand Up @@ -427,7 +426,7 @@ public void testTaskManagerFailoverSource() throws Exception {
initializeTableWithData(TABLE_CSV_TM);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
env.enableCheckpointing(100L);
env.enableCheckpointing(200L);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
String sourceDDL =
Expand All @@ -450,7 +449,7 @@ public void testTaskManagerFailoverSource() throws Exception {
TableResult tableResult = tEnv.executeSql("select * from doris_source_tm");
CloseableIterator<Row> iterator = tableResult.collect();
JobID jobId = tableResult.getJobClient().get().getJobID();
List<String> expectedSnapshotData = getExpectedData();
List<String> expectedData = getExpectedData();
if (iterator.hasNext()) {
LOG.info("trigger taskmanager failover...");
triggerFailover(
Expand All @@ -460,10 +459,9 @@ public void testTaskManagerFailoverSource() throws Exception {
() -> sleepMs(100));
}

checkResultInAnyOrder(
"testTaskManagerFailoverSource",
expectedSnapshotData.toArray(),
fetchRows(iterator).toArray());
List<String> actual = fetchRows(iterator);
LOG.info("actual data: {}, expected: {}", actual, expectedData);
Assert.assertTrue(actual.containsAll(expectedData));
}

private void checkResult(String testName, Object[] expected, Object[] actual) {
Expand Down

0 comments on commit d3143f8

Please sign in to comment.