Skip to content

Commit

Permalink
Issue 191: Allow variable number of workers; simplify recovery
Browse files Browse the repository at this point in the history
- Now allows the number of workers to be different from the number of workers in previous executions.
- Simplified the recovery process by terminating the entire process upon any exceptions during checkpointing.
- Changed formatting of routing key.

Signed-off-by: Claudio Fahey <[email protected]>
  • Loading branch information
Claudio Fahey committed May 2, 2019
1 parent 5eef4a4 commit a6f9340
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void run() throws Exception {
for (;;) {
eventCounter++;
long intData = rand.nextInt(100);
String routingKey = Long.toString(intData);
String routingKey = String.format("%02d", intData);
sum += intData;
String generatedTimestampStr = dateFormat.format(new Date());
String message = String.join(",",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.pravega.client.stream.impl.UTF8StringSerializer;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
Expand Down Expand Up @@ -136,7 +137,7 @@ public ExactlyOnceMultithreadedProcessor(String scope, String inputStreamName, S
}

/**
* Commit all transactions that have been opened since the last checkpoint.
* Commit all transactions that are part of a checkpoint.
*
* @param checkpointName
*/
Expand All @@ -146,21 +147,23 @@ private void commitTransactions(String checkpointName) {
// Read the contents of all pravega-transactions-worker-XX files.
// These files contain the Pravega transaction IDs that must be committed now.
Path checkpointDirPath = checkpointRootPath.resolve(checkpointName);
List<UUID> txnIds = IntStream
.range(0, numWorkers)
.parallel()
.boxed()
.map(workerIndex -> checkpointDirPath.resolve(CHECKPOINT_TRANSACTION_ID_FILE_NAME_PREFIX + workerIndex))
.filter(Files::exists)
.flatMap(path -> {
try {
return Files.readAllLines(path, StandardCharsets.UTF_8).stream();
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.map(UUID::fromString)
.collect(Collectors.toList());
List<UUID> txnIds = null;
try {
txnIds = Files.list(checkpointDirPath)
.filter(path -> path.getFileName().toString().startsWith(CHECKPOINT_TRANSACTION_ID_FILE_NAME_PREFIX))
.parallel()
.flatMap(path -> {
try {
return Files.readAllLines(path, StandardCharsets.UTF_8).stream();
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.map(UUID::fromString)
.collect(Collectors.toList());
} catch (IOException e) {
throw new RuntimeException(e);
}

log.info("commitTransactions: txnIds={}", txnIds);

Expand Down Expand Up @@ -214,7 +217,8 @@ public Void call() throws InterruptedException {
}

/**
* Initiate a checkpoint, wait for it to complete, and write the checkpoint to the state.
* Initiate a checkpoint, wait for it to complete, write the checkpoint to the state,
* and commit transactions.
*/
private void performCheckpoint() {
final String checkpointName = UUID.randomUUID().toString();
Expand Down Expand Up @@ -254,8 +258,9 @@ private void performCheckpoint() {

cleanCheckpointDirectory(checkpointDirPath);
} catch (final Exception e) {
log.warn("performCheckpoint: Error performing checkpoint", e);
// Ignore error. We will retry when we are scheduled again.
// If any exception occurs, this application will abnormally terminate.
// Upon restart, it will resume from the last successful checkpoint.
panic(e);
}
log.info("performCheckpoint: END: checkpointName={}", checkpointName);
}
Expand Down Expand Up @@ -295,10 +300,17 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IO
}
});
} catch (IOException e) {
// Any errors here are non-fatal. The next call to this function
// will attempt to clean anything that was missed.
log.warn("cleanCheckpointDirectory", e);
}
}

private void panic(Exception e) {
log.error("Aborting due to fatal exception", e);
System.exit(1);
}

public static void main(String[] args) throws Exception {
ExactlyOnceMultithreadedProcessor master = new ExactlyOnceMultithreadedProcessor(
Parameters.getScope(),
Expand Down

0 comments on commit a6f9340

Please sign in to comment.