Skip to content

Commit

Permalink
Merge pull request #12 from Serviceware/checkpoints-and-checkpoint-st…
Browse files Browse the repository at this point in the history
…ates-are-removed

Checkpoints and checkpoint states are removed
  • Loading branch information
paulh86 authored Jan 7, 2021
2 parents 3080e43 + 539fe47 commit 8af9314
Show file tree
Hide file tree
Showing 6 changed files with 303 additions and 56 deletions.
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ dependencies {
testCompile "junit:junit:4.12"
testCompile "org.mockito:mockito-core:1.10.19"
testCompile 'com.pszymczyk.consul:embedded-consul:2.1.4'
testCompile 'org.awaitility:awaitility:4.0.3'
testCompile "org.apache.flink:flink-runtime_$scalaVersion:$flinkVersion:tests"
testCompile "org.apache.flink:flink-test-utils_$scalaVersion:$flinkVersion"

shadow 'com.ecwid.consul:consul-api:1.4.5'
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/espro/flink/consul/ConsulHaServices.java
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public LeaderElectionService getDispatcherLeaderElectionService() {

@Override
public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
return new ConsulCheckpointRecoveryFactory(client, configuration);
return new ConsulCheckpointRecoveryFactory(client, configuration, executor);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.espro.flink.consul.checkpoint;

import java.util.concurrent.Executor;

import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
Expand All @@ -18,9 +20,11 @@ public final class ConsulCheckpointRecoveryFactory implements CheckpointRecovery

private final ConsulClient client;
private final Configuration configuration;
private final Executor executor;

public ConsulCheckpointRecoveryFactory(ConsulClient client, Configuration configuration) {
this.client = Preconditions.checkNotNull(client, "client");
public ConsulCheckpointRecoveryFactory(ConsulClient client, Configuration configuration, Executor executor) {
this.executor = executor;
this.client = Preconditions.checkNotNull(client, "client");
this.configuration = Preconditions.checkNotNull(configuration, "configuration");
}

Expand All @@ -29,7 +33,7 @@ public CompletedCheckpointStore createCheckpointStore(JobID jobId, int maxNumber
RetrievableStateStorageHelper<CompletedCheckpoint> stateStorage = new FileSystemStateStorageHelper<>(
HighAvailabilityServicesUtils.getClusterHighAvailableStoragePath(configuration), "completedCheckpoint");

return new ConsulCompletedCheckpointStore(client, checkpointsPath(), jobId, maxNumberOfCheckpointsToRetain, stateStorage);
return new ConsulCompletedCheckpointStore(client, checkpointsPath(), jobId, maxNumberOfCheckpointsToRetain, stateStorage, executor);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.ArrayDeque;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;

import org.apache.flink.api.common.JobID;
Expand Down Expand Up @@ -37,18 +38,19 @@ final class ConsulCompletedCheckpointStore implements CompletedCheckpointStore {
private final RetrievableStateStorageHelper<CompletedCheckpoint> storage;

private final ArrayDeque<CompletedCheckpoint> completedCheckpoints;
private final Executor executor;

public ConsulCompletedCheckpointStore(ConsulClient client, String checkpointsPath, JobID jobID, int maxCheckpoints,
RetrievableStateStorageHelper<CompletedCheckpoint> storage) {
this.client = Preconditions.checkNotNull(client, "client");
RetrievableStateStorageHelper<CompletedCheckpoint> storage, Executor executor) {
this.client = Preconditions.checkNotNull(client, "client");
this.checkpointsPath = Preconditions.checkNotNull(checkpointsPath, "checkpointsPath");
Preconditions.checkArgument(checkpointsPath.endsWith("/"), "checkpointsPath must end with /");
this.jobID = Preconditions.checkNotNull(jobID, "jobID");
this.storage = Preconditions.checkNotNull(storage, "storage");
Preconditions.checkState(maxCheckpoints > 0, "maxCheckpoints must be > 0");
this.maxCheckpoints = maxCheckpoints;

this.completedCheckpoints = new ArrayDeque<>(maxCheckpoints + 1);
this.executor = executor;
}

@Override
Expand Down Expand Up @@ -100,29 +102,33 @@ public void recover() throws Exception {
}

private List<RetrievableStateHandle<CompletedCheckpoint>> readCheckpointStateHandlesFromConsul(List<String> checkpointPaths) {
return checkpointPaths.stream().map(path -> {
try {
GetBinaryValue binaryValue = client.getKVBinaryValue(path).getValue();

return InstantiationUtil.<RetrievableStateHandle<CompletedCheckpoint>>deserializeObject(
binaryValue.getValue(),
Thread.currentThread().getContextClassLoader()
);
} catch (IOException | ClassNotFoundException e) {
throw new IllegalStateException(e);
}
}).collect(Collectors.toList());
return checkpointPaths.stream()
.map(this::readCheckpointStateHandleFromConsul)
.collect(Collectors.toList());
}

private List<CompletedCheckpoint> readCheckpointsFromStorage(List<RetrievableStateHandle<CompletedCheckpoint>> stateHandles) {
return stateHandles.stream().map(sh -> {
try {
return sh.retrieveState();
} catch (IOException | ClassNotFoundException e) {
throw new IllegalStateException(e);
}
}).collect(Collectors.toList());
}
private RetrievableStateHandle<CompletedCheckpoint> readCheckpointStateHandleFromConsul(String path) {
try {
GetBinaryValue binaryValue = client.getKVBinaryValue(path).getValue();

return InstantiationUtil.<RetrievableStateHandle<CompletedCheckpoint>>deserializeObject(
binaryValue.getValue(),
Thread.currentThread().getContextClassLoader()
);
} catch (IOException | ClassNotFoundException e) {
throw new IllegalStateException(e);
}
}

private List<CompletedCheckpoint> readCheckpointsFromStorage(List<RetrievableStateHandle<CompletedCheckpoint>> stateHandles) {
return stateHandles.stream().map(sh -> {
try {
return sh.retrieveState();
} catch (IOException | ClassNotFoundException e) {
throw new IllegalStateException(e);
}
}).collect(Collectors.toList());
}

@Override
public void shutdown(JobStatus jobStatus) throws Exception {
Expand All @@ -149,16 +155,16 @@ private String jobPath() {
}

private void writeCheckpoint(CompletedCheckpoint checkpoint) throws Exception {
String key = jobPath() + checkpoint.getCheckpointID();
String key = createCheckpointKeyForConsul(checkpoint);

RetrievableStateHandle<CompletedCheckpoint> storeHandle = storage.store(checkpoint);

byte[] serializedStoreHandle = InstantiationUtil.serializeObject(storeHandle);
boolean success = false;
try {
success = client.setKVBinaryValue(key, serializedStoreHandle).getValue();
} catch (Exception ignored) {

} catch (Exception e) {
LOG.warn("Writing of checkpoint to Consul failed.", e);
}
if (!success) {
// cleanup if data was not stored in Consul
Expand All @@ -169,9 +175,28 @@ private void writeCheckpoint(CompletedCheckpoint checkpoint) throws Exception {
}

private void removeCheckpoint(CompletedCheckpoint checkpoint) {
String key = jobPath() + checkpoint.getCheckpointID();
client.deleteKVValue(key);
String key = createCheckpointKeyForConsul(checkpoint);
RetrievableStateHandle<CompletedCheckpoint> stateHandle = readCheckpointStateHandleFromConsul(key);

try {
client.deleteKVValue(key);
stateHandle.discardState();
} catch (Exception e) {
LOG.warn("Error while deleting state handle for checkpoint {}", key, e);
return;
}

// Should only be executed if removal of state handle was successful
executor.execute(() -> {
try {
checkpoint.discardOnSubsume();
} catch (Exception e) {
LOG.warn("Fail to subsume the old checkpoint.", e);
}
});
}


private String createCheckpointKeyForConsul(CompletedCheckpoint checkpoint) {
return jobPath() + checkpoint.getCheckpointID();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/*
* Copyright (c) SABIO GmbH, Hamburg 2020 - All rights reserved
*/
package com.espro.flink.consul.checkpoint;

import static org.junit.Assert.assertTrue;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStoreTest;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStoreTest.TestOperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;

/**
* Some of these methods and class are taken from flink test module. Providing simple use of the checkpointing.
*/
public class CheckpointTestHelper {

/**
* Taken from {@link CompletedCheckpointStoreTest}.
*
* @param id - checkpijt id
* @param sharedStateRegistry - shared state registry
* @return {@link TestCompletedCheckpoint}
*/
public static TestCompletedCheckpoint createCheckpoint(long id, SharedStateRegistry sharedStateRegistry) {

int numberOfStates = 4;
CheckpointProperties props = CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION);

OperatorID operatorID = new OperatorID();

Map<OperatorID, OperatorState> operatorGroupState = new HashMap<>();
OperatorState operatorState = new OperatorState(operatorID, numberOfStates, numberOfStates);
operatorGroupState.put(operatorID, operatorState);

for (int i = 0; i < numberOfStates; i++) {
OperatorSubtaskState subtaskState = new TestOperatorSubtaskState();

operatorState.putState(i, subtaskState);
}

operatorState.registerSharedStates(sharedStateRegistry);

return new TestCompletedCheckpoint(new JobID(), id, 0, operatorGroupState, props);
}

/**
* Taken from {@link CompletedCheckpointStoreTest}.
*
* @param completedCheckpoint
*/
public static void verifyCheckpointDiscarded(TestCompletedCheckpoint completedCheckpoint) {
assertTrue(completedCheckpoint.isDiscarded());
verifyCheckpointDiscarded(completedCheckpoint.getOperatorStates().values());
}

/**
* Taken from {@link CompletedCheckpointStoreTest}.
*
* @param operatorStates
*/
private static void verifyCheckpointDiscarded(Collection<OperatorState> operatorStates) {
for (OperatorState operatorState : operatorStates) {
for (OperatorSubtaskState subtaskState : operatorState.getStates()) {
assertTrue(((TestOperatorSubtaskState) subtaskState).isDiscarded());
}
}
}

protected static class TestCompletedCheckpoint extends CompletedCheckpoint {

private static final long serialVersionUID = 4211419809665983026L;

private boolean isDiscarded;

// Latch for test variants which discard asynchronously
private transient final CountDownLatch discardLatch = new CountDownLatch(1);

public TestCompletedCheckpoint(
JobID jobId,
long checkpointId,
long timestamp,
Map<OperatorID, OperatorState> operatorGroupState,
CheckpointProperties props) {

super(jobId, checkpointId, timestamp, Long.MAX_VALUE, operatorGroupState, null, props,
new TestCompletedCheckpointStorageLocation());
}

@Override
public boolean discardOnSubsume() throws Exception {
if (super.discardOnSubsume()) {
discard();
return true;
} else {
return false;
}
}

@Override
public boolean discardOnShutdown(JobStatus jobStatus) throws Exception {
if (super.discardOnShutdown(jobStatus)) {
discard();
return true;
} else {
return false;
}
}

void discard() {
if (!isDiscarded) {
this.isDiscarded = true;

if (discardLatch != null) {
discardLatch.countDown();
}
}
}

public boolean isDiscarded() {
return isDiscarded;
}

public void awaitDiscard() throws InterruptedException {
if (discardLatch != null) {
discardLatch.await();
}
}

public boolean awaitDiscard(long timeout) throws InterruptedException {
if (discardLatch != null) {
return discardLatch.await(timeout, TimeUnit.MILLISECONDS);
} else {
return false;
}
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

TestCompletedCheckpoint that = (TestCompletedCheckpoint) o;

return getJobId().equals(that.getJobId())
&& getCheckpointID() == that.getCheckpointID();
}

@Override
public int hashCode() {
return getJobId().hashCode() + (int) getCheckpointID();
}
}
}
Loading

0 comments on commit 8af9314

Please sign in to comment.