Skip to content

Commit

Permalink
Merge branch 'master' into update-flink-1.17
Browse files Browse the repository at this point in the history
  • Loading branch information
crazyzhou authored Oct 12, 2023
2 parents 4177133 + a55bfe1 commit 2b58656
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 14 deletions.
10 changes: 4 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,13 @@ supported versions of Flink and Pravega.

| Git Branch | Pravega Version | Flink Version | Status | Artifact Link |
|-------------------------------------------------------------------------------------|-----------------|---------------|-------------------|--------------------------------------------------------------------------------------|
| [master](https://github.com/pravega/flink-connectors) | 0.13 | 1.16 | Under Development | https://github.com/pravega/flink-connectors/packages/1441637 |
| [r0.13-flink1.15](https://github.com/pravega/flink-connectors/tree/r0.13-flink1.15) | 0.13 | 1.15 | Under Development | https://github.com/pravega/flink-connectors/packages/1441637 |
| [r0.13-flink1.14](https://github.com/pravega/flink-connectors/tree/r0.13-flink1.14) | 0.13 | 1.14 | Under Development | https://github.com/pravega/flink-connectors/packages/1441637 |
| [master](https://github.com/pravega/flink-connectors) | 0.14 | 1.16 | Under Development | https://github.com/pravega/flink-connectors/packages/1441637 |
| [r0.13](https://github.com/pravega/flink-connectors/tree/r0.13) | 0.13 | 1.16 | Released | https://repo1.maven.org/maven2/io/pravega/pravega-connectors-flink-1.16_2.12/0.13.0/ |
| [r0.13-flink1.15](https://github.com/pravega/flink-connectors/tree/r0.13-flink1.15) | 0.13 | 1.15 | Released | https://repo1.maven.org/maven2/io/pravega/pravega-connectors-flink-1.15_2.12/0.13.0/ |
| [r0.13-flink1.14](https://github.com/pravega/flink-connectors/tree/r0.13-flink1.14) | 0.13 | 1.14 | Released | https://repo1.maven.org/maven2/io/pravega/pravega-connectors-flink-1.14_2.12/0.13.0/ |
| [r0.12](https://github.com/pravega/flink-connectors/tree/r0.12) | 0.12 | 1.15 | Released | https://repo1.maven.org/maven2/io/pravega/pravega-connectors-flink-1.15_2.12/0.12.0/ |
| [r0.12-flink1.14](https://github.com/pravega/flink-connectors/tree/r0.12-flink1.14) | 0.12 | 1.14 | Released | https://repo1.maven.org/maven2/io/pravega/pravega-connectors-flink-1.14_2.12/0.12.0/ |
| [r0.12-flink1.13](https://github.com/pravega/flink-connectors/tree/r0.12-flink1.13) | 0.12 | 1.13 | Released | https://repo1.maven.org/maven2/io/pravega/pravega-connectors-flink-1.13_2.12/0.12.0/ |
| [r0.11](https://github.com/pravega/flink-connectors/tree/r0.11) | 0.11 | 1.14 | Released | https://repo1.maven.org/maven2/io/pravega/pravega-connectors-flink-1.14_2.12/0.11.0/ |
| [r0.11-flink1.13](https://github.com/pravega/flink-connectors/tree/r0.11-flink1.13) | 0.11 | 1.13 | Released | https://repo1.maven.org/maven2/io/pravega/pravega-connectors-flink-1.13_2.12/0.11.0/ |
| [r0.11-flink1.12](https://github.com/pravega/flink-connectors/tree/r0.11-flink1.12) | 0.11 | 1.12 | Released | https://repo1.maven.org/maven2/io/pravega/pravega-connectors-flink-1.12_2.12/0.11.0/ |

## How to build

Expand Down
6 changes: 3 additions & 3 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ nettyVersion=4.1.65.Final
testcontainersVersion=1.15.3

# Version and base tags can be overridden at build time.
connectorVersion=0.13.0-SNAPSHOT
pravegaVersion=0.13.0-3151.28c485f-SNAPSHOT
schemaRegistryVersion=0.6.0-88.65039cd-SNAPSHOT
connectorVersion=0.14.0-SNAPSHOT
pravegaVersion=0.14.0-3266.625ec7f-SNAPSHOT
schemaRegistryVersion=0.7.0-95.c4b6a43-SNAPSHOT
apacheCommonsVersion=3.7

# These properties are only needed for publishing to maven central
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.pravega.client.stream.ReaderGroup;
import io.pravega.client.stream.ReaderGroupConfig;
import io.pravega.client.stream.ReaderGroupNotFoundException;
import io.pravega.client.stream.impl.MaxNumberOfCheckpointsExceededException;
import io.pravega.connectors.flink.serialization.CheckpointSerializer;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.io.SimpleVersionedSerializer;
Expand Down Expand Up @@ -113,7 +114,13 @@ public CompletableFuture<Checkpoint> triggerCheckpoint(
final String checkpointName = createCheckpointName(checkpointId);

final CompletableFuture<Checkpoint> checkpointResult =
this.readerGroup.initiateCheckpoint(checkpointName, scheduledExecutorService);
this.readerGroup.initiateCheckpoint(checkpointName)
.exceptionally(e -> {
if (e instanceof MaxNumberOfCheckpointsExceededException) {
readerGroup.cancelOutstandingCheckpoints();
}
return null;
});

// Add a timeout to the future, to prevent long blocking calls
scheduledExecutorService.schedule(() -> checkpointResult.cancel(false), triggerTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.StreamCut;
import io.pravega.client.stream.impl.CheckpointImpl;
import io.pravega.client.stream.impl.MaxNumberOfCheckpointsExceededException;
import io.pravega.client.stream.impl.StreamCutImpl;
import io.pravega.connectors.flink.serialization.CheckpointSerializer;
import org.apache.flink.api.common.time.Time;
Expand Down Expand Up @@ -65,10 +66,10 @@ public void testTriggerCheckpoint() throws Exception {
CompletableFuture<Checkpoint> checkpointPromise = new CompletableFuture<>();
TestableReaderCheckpointHook hook = new TestableReaderCheckpointHook(HOOK_UID, READER_GROUP_NAME, SCOPE, Time.minutes(1), clientConfig, readerGroupConfig);

when(hook.readerGroup.initiateCheckpoint(anyString(), any())).thenReturn(checkpointPromise);
when(hook.readerGroup.initiateCheckpoint(anyString())).thenReturn(checkpointPromise);
CompletableFuture<Checkpoint> checkpointFuture = hook.triggerCheckpoint(1L, 1L, Executors.directExecutor());
assertThat(checkpointFuture).isNotNull();
verify(hook.readerGroup).initiateCheckpoint(anyString(), any());
verify(hook.readerGroup).initiateCheckpoint(anyString());

// complete the checkpoint promise
Checkpoint expectedCheckpoint = mock(Checkpoint.class);
Expand All @@ -84,17 +85,35 @@ public void testTriggerCheckpointTimeout() throws Exception {
CompletableFuture<Checkpoint> checkpointPromise = new CompletableFuture<>();

TestableReaderCheckpointHook hook = new TestableReaderCheckpointHook(HOOK_UID, READER_GROUP_NAME, SCOPE, Time.minutes(1), clientConfig, readerGroupConfig);
when(hook.readerGroup.initiateCheckpoint(anyString(), any())).thenReturn(checkpointPromise);
when(hook.readerGroup.initiateCheckpoint(anyString())).thenReturn(checkpointPromise);

CompletableFuture<Checkpoint> checkpointFuture = hook.triggerCheckpoint(1L, 1L, Executors.directExecutor());
assertThat(checkpointFuture).isNotNull();
verify(hook.readerGroup).initiateCheckpoint(anyString(), any());
verify(hook.readerGroup).initiateCheckpoint(anyString());

// invoke the timeout callback
hook.invokeScheduledCallables();
assertThat(checkpointFuture.isCancelled()).isTrue();
}

@Test
public void testCancelWhenExceedingMaxOutstandingCheckpoints() throws Exception {
ReaderGroupConfig readerGroupConfig = mock(ReaderGroupConfig.class);
ClientConfig clientConfig = mock(ClientConfig.class);
CompletableFuture<Checkpoint> checkpointPromise = new CompletableFuture<>();
checkpointPromise.completeExceptionally(new MaxNumberOfCheckpointsExceededException("test"));

TestableReaderCheckpointHook hook = new TestableReaderCheckpointHook(HOOK_UID, READER_GROUP_NAME, SCOPE, Time.minutes(1), clientConfig, readerGroupConfig);
when(hook.readerGroup.initiateCheckpoint(anyString())).thenReturn(checkpointPromise);

CompletableFuture<Checkpoint> checkpointFuture = hook.triggerCheckpoint(1L, 1L, Executors.directExecutor());
assertThat(checkpointFuture).isNotNull();
verify(hook.readerGroup).initiateCheckpoint(anyString());

// invoke the cancelOutstandingCheckpoints
verify(hook.readerGroup).cancelOutstandingCheckpoints();
}

@Test
public void testReset() {
ReaderGroupConfig readerGroupConfig = mock(ReaderGroupConfig.class);
Expand Down

0 comments on commit 2b58656

Please sign in to comment.