-
Notifications
You must be signed in to change notification settings - Fork 62
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Issue 191: Add at-least-once stream processing example #192
base: dev
Are you sure you want to change the base?
Issue 191: Add at-least-once stream processing example #192
Conversation
...les/src/main/java/io/pravega/example/streamprocessing/ExactlyOnceMultithreadedProcessor.java
Outdated
Show resolved
Hide resolved
...les/src/main/java/io/pravega/example/streamprocessing/ExactlyOnceMultithreadedProcessor.java
Outdated
Show resolved
Hide resolved
...les/src/main/java/io/pravega/example/streamprocessing/ExactlyOnceMultithreadedProcessor.java
Outdated
Show resolved
Hide resolved
...les/src/main/java/io/pravega/example/streamprocessing/ExactlyOnceMultithreadedProcessor.java
Outdated
Show resolved
Hide resolved
...les/src/main/java/io/pravega/example/streamprocessing/ExactlyOnceMultithreadedProcessor.java
Outdated
Show resolved
Hide resolved
.../src/main/java/io/pravega/example/streamprocessing/NonRecoverableMultithreadedProcessor.java
Outdated
Show resolved
Hide resolved
...src/main/java/io/pravega/example/streamprocessing/NonRecoverableSingleThreadedProcessor.java
Outdated
Show resolved
Hide resolved
pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/README.md
Outdated
Show resolved
Hide resolved
...les/src/main/java/io/pravega/example/streamprocessing/RecoverableMultithreadedProcessor.java
Outdated
Show resolved
Hide resolved
...les/src/main/java/io/pravega/example/streamprocessing/RecoverableMultithreadedProcessor.java
Outdated
Show resolved
Hide resolved
This comment has been minimized.
This comment has been minimized.
pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/EventGenerator.java
Outdated
Show resolved
Hide resolved
...les/src/main/java/io/pravega/example/streamprocessing/ExactlyOnceMultithreadedProcessor.java
Outdated
Show resolved
Hide resolved
...les/src/main/java/io/pravega/example/streamprocessing/ExactlyOnceMultithreadedProcessor.java
Outdated
Show resolved
Hide resolved
...les/src/main/java/io/pravega/example/streamprocessing/ExactlyOnceMultithreadedProcessor.java
Outdated
Show resolved
Hide resolved
...c/main/java/io/pravega/example/streamprocessing/ExactlyOnceMultithreadedProcessorWorker.java
Outdated
Show resolved
Hide resolved
...c/main/java/io/pravega/example/streamprocessing/ExactlyOnceMultithreadedProcessorWorker.java
Outdated
Show resolved
Hide resolved
...src/main/java/io/pravega/example/streamprocessing/NonRecoverableSingleThreadedProcessor.java
Outdated
Show resolved
Hide resolved
pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/Parameters.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But consider what happens if the application crashes after commit() but before the reader group is updated with readNextEvent. Upon recovery, the reader group will resume at the previous checkpoint and duplicate records will be created. To avoid this, we still need to use the two-phase commit process of flush, persist transaction IDs, commit, update reader group. I may be able to do this with another Pravega stream instead of a shared file system but I don't see how to completely eliminate this need.
Yes. This is the reason the readerOffline() call takes a Position object. It allows you to tell Pravega exactly what data was processed and what data was not when the worker dies.
So yes, the two phase scheme you are doing works, but you could make it better by doing it per-reader as opposed to per-readerGroup.
pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/EventGenerator.java
Outdated
Show resolved
Hide resolved
@claudiofahey due to issues we had in previous releases merging |
a6f9340
to
f4e5b94
Compare
This PR is ready for re-review. |
pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/EventGenerator.java
Outdated
Show resolved
Hide resolved
...les/src/main/java/io/pravega/example/streamprocessing/ExactlyOnceMultithreadedProcessor.java
Outdated
Show resolved
Hide resolved
...les/src/main/java/io/pravega/example/streamprocessing/ExactlyOnceMultithreadedProcessor.java
Outdated
Show resolved
Hide resolved
f4e5b94
to
0e57327
Compare
...-client-examples/src/main/java/io/pravega/example/streamprocessing/AtLeastOnceProcessor.java
Outdated
Show resolved
Hide resolved
pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/README.md
Outdated
Show resolved
Hide resolved
pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/README.md
Outdated
Show resolved
Hide resolved
The latest commit should provide at-least-once semantics in a clean way using only Pravega. I believe the code is complete but it does not have any automated tests yet. We also need to update the README to describe how this works. |
This PR is ready for review. For a description of this PR, see https://github.com/claudiofahey/pravega-samples/blob/issue-191-streamprocessing/pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/README.md. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks good, I only have a couple of small comments.
return getEnvVar("INSTANCE_ID", UUID.randomUUID().toString()); | ||
} | ||
|
||
public String getStream1Name() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor comment, but could we use input stream and output stream rather than stream 1 and 2?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AppConfiguration is shared by EventGenerator, AtLeastOnceApp, and EventDebugSink. Stream 1 is the output for EventGenerator and the input for AtLeastOnceApp. Stream 2 is the output for AtLeastOnceApp and the input for EventDebugSink. To avoid the confusion from this point of view, I chose Stream 1 and Stream 2. It is a little odd, I admit. I added comments to the code to clarify this.
// We must ensure that we add this reader to the membership synchronizer before the reader group. | ||
membershipSynchronizer.startAsync(); | ||
membershipSynchronizer.awaitRunning(); | ||
task = executor.scheduleAtFixedRate(new PruneRunner(), heartbeatIntervalMillis, heartbeatIntervalMillis, TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if it would be best to add some randomization to the period between runs of the prune runner. If distinct readers run it at the same period and close to each other, then we would have them calling reader offline unnecessarily for the same offline readers.
I don't feel very strongly about the comment because:
- reader offline is idempotent
- crashes are rare
- it should only matter for a larger reader groups
In any case, if you feel that you want to introduce it here, then I think it is a small improvement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have randomized the initial delay and the period as you have suggested.
Signed-off-by: Claudio Fahey <[email protected]>
Signed-off-by: Claudio Fahey <[email protected]>
- Removed deleteStream calls. - No longer waiting for transactions to go into COMMIT state after commit() call. - Now using Callable instead of Runnable. - Improved shutdown cleanup. Signed-off-by: Claudio Fahey <[email protected]>
…aging state. - Added additional startup scripts. - Updated default parameters. - Updated documentation. Signed-off-by: Claudio Fahey <[email protected]>
Signed-off-by: Claudio Fahey <[email protected]>
Signed-off-by: Claudio Fahey <[email protected]>
Signed-off-by: Claudio Fahey <[email protected]>
@RaulGracia, I have addressed all of your comments. However, it appears that there is an intermittent timeout failure in the test kill5of6Test. I will continue to investigate. |
@claudiofahey maybe the failure you are seeing are related to the sporadic failures we see in Pravega core repo related to tests involving Pravega standalone: pravega/pravega#5864 |
@claudiofahey I tested the sample and it works. Also compiled with JDK11 and JDK8 and it also works ( |
…detection. Signed-off-by: Claudio Fahey <[email protected]>
Signed-off-by: Claudio Fahey <[email protected]>
Signed-off-by: Claudio Fahey <[email protected]>
Signed-off-by: Claudio Fahey <[email protected]>
FYI, I ran the integration 20 times and all have passed:
|
@claudiofahey tests passed for me. I had to upgrade the Scala version of Spark in the Hadoop sample, as build was broken by one of the previous PRs. But now looks good, thanks!. |
@tkaitchuck please, would you mind to do another review to either approve it or request more changes? |
pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/AtLeastOnceApp.java
Outdated
Show resolved
Hide resolved
processor.startAsync(); | ||
|
||
// Add shutdown hook for graceful shutdown. | ||
Runtime.getRuntime().addShutdownHook(new Thread(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't the processor do this itself if this is needed.?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It gets messy to do this in the AtLeastOnceProcessor because then it would need to deal with removing the shutdown hook, but only doing that when the JVM is not actually shutting down. The point of this shutdown hook is to shutdown the whole application gracefully, which in this case happens to consist of only one AtLeastOnceProcessor service. But in general, an application may consist of many services that all need to be shutdown in a particular order. So it seems appropriate that AtLeastOnceApp should coordinate the shutdown.
|
||
public void run() throws Exception { | ||
final ClientConfig clientConfig = ClientConfig.builder().controllerURI(getConfig().getControllerURI()).build(); | ||
try (StreamManager streamManager = StreamManager.create(getConfig().getControllerURI())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see the change
} | ||
} | ||
} finally { | ||
readerGroupManager.deleteReaderGroup(readerGroup); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we necessarily want to delete the group because one reader shutdown.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case we do want to delete the reader group. Each instance of EventDebugSink will read the entire stream so it creates a random reader group for itself and then it cleans up when it is done. There's no need for load balancing between EventDebugSink instances (in contrast to AtLeastOnceApp).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also added a createStream
private method to this class.
pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/EventDebugSink.java
Outdated
Show resolved
Hide resolved
|
||
public void run() throws Exception { | ||
final ClientConfig clientConfig = ClientConfig.builder().controllerURI(getConfig().getControllerURI()).build(); | ||
try (StreamManager streamManager = StreamManager.create(getConfig().getControllerURI())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Same here" referred to making this a private method.
pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/EventGenerator.java
Outdated
Show resolved
Hide resolved
pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/EventGenerator.java
Outdated
Show resolved
Hide resolved
...ga-client-examples/src/test/java/io/pravega/example/streamprocessing/WorkerProcessGroup.java
Show resolved
Hide resolved
Signed-off-by: Claudio Fahey <[email protected]>
Signed-off-by: Claudio Fahey <[email protected]>
Signed-off-by: Claudio Fahey <[email protected]>
Signed-off-by: Claudio Fahey <[email protected]>
Signed-off-by: Claudio Fahey <[email protected]>
Signed-off-by: Claudio Fahey <[email protected]>
createStreams(); | ||
final Random rand = new Random(42); | ||
try (final EventStreamClientFactory clientFactory = EventStreamClientFactory.withScope(getConfig().getScope(), clientConfig)) { | ||
try (final EventStreamWriter<SampleEvent> writer = clientFactory.createEventWriter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These don't need to nest. These two items can be in the same try block.
long sum = 0; | ||
for (; ; ) { | ||
sequenceNumber++; | ||
final String routingKey = String.format("%3d", rand.nextInt(1000)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Random locals like this don't need to be declared final.
public class StreamProcessingTest { | ||
static final Logger log = LoggerFactory.getLogger(StreamProcessingTest.class); | ||
|
||
protected static final AtomicReference<SetupUtils> SETUP_UTILS = new AtomicReference<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is bad from. Please use a resource to avoid this pattern.
@claudiofahey can you address @tkaitchuck comments so we can close this one? I think we are close. |
Position lastFlushedPosition = null; | ||
try { | ||
while (isRunning()) { | ||
final EventRead<T> eventRead = reader.readNextEvent(readTimeoutMillis); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
readNextEvent
can throw a TruncatedDataException
when StreamRetention kicks in. We would need to retry reading the event in such a scenario, right?
@claudiofahey any updates addressing @tkaitchuck feedback? This one is actually looks close to get merged, wondering if it is worth it to do a last push for it. |
These examples are intended to illustrate how at-least-once semantics can be achieved with Pravega. In particular, these illustrative examples do not use Apache Flink.