Skip to content

Commit

Permalink
[issue-719] Explicitly cancel outstanding checkpoints when checkpoint…
Browse files Browse the repository at this point in the history
…s get stuck (#720)

Signed-off-by: Brian Zhou <[email protected]>
  • Loading branch information
crazyzhou authored Sep 19, 2023
1 parent cf2c995 commit da4c706
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 2 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ testcontainersVersion=1.15.3

# Version and base tags can be overridden at build time.
connectorVersion=0.13.0-SNAPSHOT
pravegaVersion=0.13.0-3151.28c485f-SNAPSHOT
pravegaVersion=0.13.0-3184.23d8ef6-SNAPSHOT
schemaRegistryVersion=0.6.0-88.65039cd-SNAPSHOT
apacheCommonsVersion=3.7

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.pravega.client.stream.ReaderGroup;
import io.pravega.client.stream.ReaderGroupConfig;
import io.pravega.client.stream.ReaderGroupNotFoundException;
import io.pravega.client.stream.impl.MaxNumberOfCheckpointsExceededException;
import io.pravega.connectors.flink.serialization.CheckpointSerializer;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.io.SimpleVersionedSerializer;
Expand Down Expand Up @@ -113,7 +114,13 @@ public CompletableFuture<Checkpoint> triggerCheckpoint(
final String checkpointName = createCheckpointName(checkpointId);

final CompletableFuture<Checkpoint> checkpointResult =
this.readerGroup.initiateCheckpoint(checkpointName, scheduledExecutorService);
this.readerGroup.initiateCheckpoint(checkpointName, scheduledExecutorService)
.exceptionally(e -> {
if (e instanceof MaxNumberOfCheckpointsExceededException) {
readerGroup.cancelOutstandingCheckpoints();
}
return null;
});

// Add a timeout to the future, to prevent long blocking calls
scheduledExecutorService.schedule(() -> checkpointResult.cancel(false), triggerTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.StreamCut;
import io.pravega.client.stream.impl.CheckpointImpl;
import io.pravega.client.stream.impl.MaxNumberOfCheckpointsExceededException;
import io.pravega.client.stream.impl.StreamCutImpl;
import io.pravega.connectors.flink.serialization.CheckpointSerializer;
import org.apache.flink.api.common.time.Time;
Expand Down Expand Up @@ -95,6 +96,24 @@ public void testTriggerCheckpointTimeout() throws Exception {
assertThat(checkpointFuture.isCancelled()).isTrue();
}

@Test
public void testCancelWhenExceedingMaxOutstandingCheckpoints() throws Exception {
ReaderGroupConfig readerGroupConfig = mock(ReaderGroupConfig.class);
ClientConfig clientConfig = mock(ClientConfig.class);
CompletableFuture<Checkpoint> checkpointPromise = new CompletableFuture<>();
checkpointPromise.completeExceptionally(new MaxNumberOfCheckpointsExceededException("test"));

TestableReaderCheckpointHook hook = new TestableReaderCheckpointHook(HOOK_UID, READER_GROUP_NAME, SCOPE, Time.minutes(1), clientConfig, readerGroupConfig);
when(hook.readerGroup.initiateCheckpoint(anyString(), any())).thenReturn(checkpointPromise);

CompletableFuture<Checkpoint> checkpointFuture = hook.triggerCheckpoint(1L, 1L, Executors.directExecutor());
assertThat(checkpointFuture).isNotNull();
verify(hook.readerGroup).initiateCheckpoint(anyString(), any());

// invoke the cancelOutstandingCheckpoints
verify(hook.readerGroup).cancelOutstandingCheckpoints();
}

@Test
public void testReset() {
ReaderGroupConfig readerGroupConfig = mock(ReaderGroupConfig.class);
Expand Down

0 comments on commit da4c706

Please sign in to comment.