Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 191: Add at-least-once stream processing example #192

Open
wants to merge 82 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 57 commits
Commits
Show all changes
82 commits
Select commit Hold shift + click to select a range
85ea475
Issue 191: Add non-Flink stream processing example.
Mar 14, 2019
666c86e
Issue 191: Moved inner classes to top level. Removed obsolete classes.
Mar 16, 2019
eded361
Issue 191: Various updates. This PR is still incomplete.
Mar 16, 2019
13c3931
Issue 191: Processor is now stateless to avoid complications with man…
Mar 17, 2019
178957e
Issue 191: Cleanup
May 1, 2019
90af788
Issue 191: Allow variable number of workers; simplify recovery
May 2, 2019
c2c5fe6
Update to use EventStreamClientFactory. Removed ExactlyOnceMultithrea…
May 31, 2020
666ea50
Add test structure
Jun 1, 2020
c76d165
Add AtLeastOnceProcessor
Jun 1, 2020
18b8fb4
Add https://github.com/pravega/pravega/blob/c7ac009970787df2633163644…
Jun 3, 2020
6905085
Add ReaderGroupPruner (incomplete)
Jun 3, 2020
adb61b8
Add ReaderGroupPruner (incomplete)
Jun 3, 2020
6bb05b1
Bug fixes in MembershipSynchronizer
Jun 4, 2020
9584ce0
Document new classes. Add various parameters.
Jun 17, 2020
4f11177
Use new AppConfiguration class. Update license. Mark obsolete section…
Jun 17, 2020
7eac911
Add writer to AtLeastOnceApp
Jun 17, 2020
9daa883
Add graceful shutdown
Jun 17, 2020
f17131b
Add logging
Jun 17, 2020
d2f3b26
Fix typo in comment
Jun 17, 2020
39a9f92
Create runAtLeastOnceApp.sh
Jun 18, 2020
2af2fb7
Now using SampleEvent class with JSON serializer. InstanceId can now …
Jun 18, 2020
c18bfe9
Update README (incomplete)
Jun 20, 2020
353b346
Update README (incomplete)
Jun 20, 2020
f31a483
Remove unused test code. Revert changes to logback.xml. Add new scrip…
Jun 20, 2020
753c7f6
Remove unused test class.
Jun 21, 2020
faddc14
EventGenerator logs when events are acked.
Jun 21, 2020
0d6ebd1
Minor logging and formatting changes.
Jun 21, 2020
a1f3a5f
Move runAtLeastOnceApp.sh
Jun 21, 2020
82a074c
Documentation, logging, and formatting updates
Jun 21, 2020
13076f2
Fix typo.
Jun 21, 2020
bf5a9b0
Fix /README.md.
Jun 21, 2020
7836d27
Add integration test (incomplete)
Jul 5, 2020
5629ffc
Add integration test (incomplete)
Jul 5, 2020
ca59a61
Add integration test (incomplete)
Jul 5, 2020
36b18cd
Basic graceful shutdown test working
Jul 6, 2020
b47ed44
AtLeastOnceProcessor now uses suppliers. Added basic structure for fa…
Jul 8, 2020
ad26df4
Add pause and unpause to ReaderGroupPruner
Jul 8, 2020
f43a2cd
Basic pause of worker working
Jul 8, 2020
5f9aeb0
Use TestContext
Jul 9, 2020
f2bdf1a
Now validating that stopped instances do not process events
Jul 9, 2020
4601f08
Add license headers
Jul 9, 2020
5f46551
Add unit tests for TestEventValidator
Jul 9, 2020
da84a01
Add testing documentation
Jul 9, 2020
d409f44
Add test forcing duplicates.
Jul 9, 2020
bb00ed4
Add pause/unpause to AtLeastOnceProcessorInstrumented. Add failing te…
Jul 9, 2020
52e5caa
Add different write modes. Split writeEventsAndValidate.
Jul 9, 2020
b2b2873
Now using EndToEndTestConfig with builder
Jul 9, 2020
0fc8783
Add timeout to failing tests
Jul 10, 2020
a429894
Created failing test handleExceptionDuringProcessTest
Jul 10, 2020
e34bb90
Bug fix: Fix handling of position when an exception occurs during pro…
Jul 10, 2020
265fd13
Fix killAndRestart1of1ForcingDuplicatesTest
Jul 10, 2020
6aab6bb
Updated documentation. Now using in-process Pravega cluster.
Aug 4, 2020
0439f09
Update README.
Aug 4, 2020
1b5d275
Remove unneeded dependency JUnitParams.
Aug 4, 2020
f4a0be8
Add description to getStream1Name and getStream2Name
Aug 21, 2020
dc9dfcf
Randomize execution period of ReaderGroupPruner
Aug 21, 2020
f329c6b
Revert log level to ERROR
Aug 21, 2020
c3eae91
Move inline class definition to top-level class SampleEventProcessor
Sep 12, 2020
c80ef0d
Clean up EventDebugSink: Make methods private. Add final.
Sep 12, 2020
ca1d896
Remove commented code
Sep 12, 2020
3a9978a
Replace some try-with-resource blocks with Lombok @Cleanup
Sep 12, 2020
5c56356
Add final
Sep 12, 2020
6b4f1bb
Merge branch 'dev' into issue-191-streamprocessing
Mar 8, 2021
6aebc88
Merge branch 'dev' into issue-191-streamprocessing
RaulGracia Mar 18, 2021
f8155cf
Move annotations to separate lines
Mar 19, 2021
2ed765d
Fix samples version number and improve feedback in runAtLastOnceApp.sh
Mar 19, 2021
913ef6e
Fix typo in streamprocessing/README
Mar 19, 2021
ec39792
Fix formatting
Mar 19, 2021
097fb8b
Removed all commented and unused code from SetupUtils.java
Mar 19, 2021
a06dbc6
Decrease heartbeat interval for kill5of6Test to speed up dead reader …
Mar 25, 2021
5c09d55
Adding logging
Mar 25, 2021
bd6029c
Add retry to killAndRestart1of1ForcingDuplicatesTest
Mar 25, 2021
9215290
Fix retry in killAndRestart1of1ForcingDuplicatesTest
Mar 25, 2021
33902d8
Merge branch 'dev' into issue-191-streamprocessing
Mar 26, 2021
843d4c2
Fixed Scala version of Spark dependency in Hadoop samples.
RaulGracia Mar 26, 2021
8c880cd
Make methods private
Mar 27, 2021
4dc1aae
Create private method createStream
Mar 27, 2021
1ad8163
Make SampleEvent immutable. Replace @Cleanup with try-with-resources.
Mar 27, 2021
afa5139
Update README
Mar 27, 2021
40675a4
Add createStreams method to AtLeastOnceApp
Mar 27, 2021
f67ea43
Add createStreams private method to EventGenerator
Mar 27, 2021
55a8607
Merge branch 'dev' into issue-191-streamprocessing
RaulGracia Jul 27, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ more complex applications as sub-projects, which show use-cases exploiting one o
| `noop` | Example of how to add a simple callback executed upon a read event. | [Java](pravega-client-examples/src/main/java/io/pravega/example/noop)
| `statesynchronizer` | Application that allows users to work with `StateSynchronizer` API via CLI. | [Java](pravega-client-examples/src/main/java/io/pravega/example/statesynchronizer)
| `streamcuts` | Application examples demonstrating the use of `StreamCut`s via CLI. | [Java](pravega-client-examples/src/main/java/io/pravega/example/streamcuts)
| `streamprocessing` | An example that illustrates at-least-once processing using the Pravega API. | [Java](pravega-client-examples/src/main/java/io/pravega/example/streamprocessing)

The related documentation and instructions are [here](pravega-client-examples).

Expand Down
3 changes: 3 additions & 0 deletions pravega-client-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,6 @@ $ bin/secureBatchReader [-scope "myScope"] [-stream "myStream"] [-uri "tls://loc

All args are optional. If not included, the default values are same as the defaults mentioned earlier for
`bin\secureWriter`.

## `streamprocessing`
See [streamprocessing](src/main/java/io/pravega/example/streamprocessing/README.md).
44 changes: 44 additions & 0 deletions pravega-client-examples/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ ext {

dependencies {
testCompile "junit:junit:${junitVersion}"
testCompile "io.pravega:pravega-standalone:${pravegaVersion}"
testCompile "io.pravega:pravega-test-testcommon:${pravegaVersion}"
compileOnly "org.projectlombok:lombok:${lombokVersion}"

compile "io.pravega:pravega-client:${pravegaVersion}",
"io.pravega:pravega-common:${pravegaVersion}",
Expand Down Expand Up @@ -182,6 +185,44 @@ task startSecureBatchReader(type: JavaExec) {
}
}

task scriptEventGenerator(type: CreateStartScripts) {
outputDir = file('build/scripts')
mainClassName = 'io.pravega.example.streamprocessing.EventGenerator'
applicationName = 'eventGenerator'
defaultJvmOpts = ["-Dlogback.configurationFile=file:conf/logback.xml"]
classpath = files(jar.archivePath) + sourceSets.main.runtimeClasspath
}

task startEventGenerator(type: JavaExec) {
main = "io.pravega.example.streamprocessing.EventGenerator"
classpath = sourceSets.main.runtimeClasspath
}

task scriptAtLeastOnceApp(type: CreateStartScripts) {
outputDir = file('build/scripts')
mainClassName = 'io.pravega.example.streamprocessing.AtLeastOnceApp'
applicationName = 'atLeastOnceApp'
defaultJvmOpts = ["-Dlogback.configurationFile=file:conf/logback.xml"]
classpath = files(jar.archivePath) + sourceSets.main.runtimeClasspath
}

task startAtLeastOnceApp(type: JavaExec) {
main = "io.pravega.example.streamprocessing.AtLeastOnceApp"
classpath = sourceSets.main.runtimeClasspath
}

task scriptEventDebugSink(type: CreateStartScripts) {
outputDir = file('build/scripts')
mainClassName = 'io.pravega.example.streamprocessing.EventDebugSink'
applicationName = 'eventDebugSink'
defaultJvmOpts = ["-Dlogback.configurationFile=file:conf/logback.xml"]
classpath = files(jar.archivePath) + sourceSets.main.runtimeClasspath
}

task startEventDebugSink(type: JavaExec) {
main = "io.pravega.example.streamprocessing.EventDebugSink"
classpath = sourceSets.main.runtimeClasspath
}

distributions {
main {
Expand All @@ -198,6 +239,9 @@ distributions {
from project.scriptSecureWriter
from project.scriptSecureReader
from project.scriptSecureBatchReader
from project.scriptEventGenerator
from project.scriptAtLeastOnceApp
from project.scriptEventDebugSink
}
into('lib') {
from(jar)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*/
package io.pravega.example.streamprocessing;

import java.net.URI;
import java.util.UUID;

/**
* All parameters will come from environment variables.
* This makes it easy to configure on Docker, Kubernetes, etc.
*/
class AppConfiguration {
AppConfiguration(String[] args) {
}

// By default, we will connect to a standalone Pravega running on localhost.
public URI getControllerURI() {
return URI.create(getEnvVar("PRAVEGA_CONTROLLER", "tcp://localhost:9090"));
}

public String getScope() {
return getEnvVar("PRAVEGA_SCOPE", "examples");
}

public String getInstanceId() {
return getEnvVar("INSTANCE_ID", UUID.randomUUID().toString());
}

/**
* The output of EventGenerator and the input of AtLeastOnceApp.
*/
public String getStream1Name() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor comment, but could we use input stream and output stream rather than stream 1 and 2?

Copy link
Author

@claudiofahey claudiofahey Aug 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AppConfiguration is shared by EventGenerator, AtLeastOnceApp, and EventDebugSink. Stream 1 is the output for EventGenerator and the input for AtLeastOnceApp. Stream 2 is the output for AtLeastOnceApp and the input for EventDebugSink. To avoid the confusion from this point of view, I chose Stream 1 and Stream 2. It is a little odd, I admit. I added comments to the code to clarify this.

return getEnvVar("PRAVEGA_STREAM_1", "streamprocessing1c");
}

/**
* The output of AtLeastOnceApp and the input of EventDebugSink.
*/
public String getStream2Name() {
return getEnvVar("PRAVEGA_STREAM_2", "streamprocessing2c");
}

public String getReaderGroup() {
return getEnvVar("PRAVEGA_READER_GROUP", "streamprocessing1c-rg1");
}

public String getMembershipSynchronizerStreamName() {
return getReaderGroup() + "-membership";
}

public int getTargetRateEventsPerSec() {
return Integer.parseInt(getEnvVar("PRAVEGA_TARGET_RATE_EVENTS_PER_SEC", "10"));
}

public int getScaleFactor() {
return Integer.parseInt(getEnvVar("PRAVEGA_SCALE_FACTOR", "2"));
}

public int getMinNumSegments() {
return Integer.parseInt(getEnvVar("PRAVEGA_MIN_NUM_SEGMENTS", "6"));
}

public long getCheckpointPeriodMs() {
return Long.parseLong(getEnvVar("CHECKPOINT_PERIOD_MS", "3000"));
}

public long getCheckpointTimeoutMs() {
return Long.parseLong(getEnvVar("CHECKPOINT_TIMEOUT_MS", "120000"));
}

public long getTransactionTimeoutMs() {
return Long.parseLong(getEnvVar("TRANSACTION_TIMEOUT_MS", "120000"));
}

public long getHeartbeatIntervalMillis() {
return Long.parseLong(getEnvVar("HEARTBEAT_INTERVAL_MS", "500"));
}

private static String getEnvVar(String name, String defaultValue) {
String value = System.getenv(name);
if (value == null || value.isEmpty()) {
return defaultValue;
}
return value;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*/
package io.pravega.example.streamprocessing;

import com.google.gson.reflect.TypeToken;
import io.pravega.client.ClientConfig;
import io.pravega.client.EventStreamClientFactory;
import io.pravega.client.SynchronizerClientFactory;
import io.pravega.client.admin.ReaderGroupManager;
import io.pravega.client.admin.StreamManager;
import io.pravega.client.stream.EventRead;
import io.pravega.client.stream.EventStreamWriter;
import io.pravega.client.stream.EventWriterConfig;
import io.pravega.client.stream.ReaderConfig;
import io.pravega.client.stream.ReaderGroup;
import io.pravega.client.stream.ReaderGroupConfig;
import io.pravega.client.stream.ScalingPolicy;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.StreamConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.Executors;

/**
* This demonstrates reading events from a Pravega stream, processing each event,
* and writing each output event to another Pravega stream.
* It guarantees that each event is processed at least once.
* If multiple instances of this application are executed using the same readerGroupName parameter,
* each instance will get a distinct subset of events.
*
* Use {@link EventGenerator} to generate input events and {@link EventDebugSink}
* to view the output events.
*/
public class AtLeastOnceApp {
private static final Logger log = LoggerFactory.getLogger(AtLeastOnceApp.class);

private final AppConfiguration config;

public static void main(String[] args) throws Exception {
final AtLeastOnceApp app = new AtLeastOnceApp(new AppConfiguration(args));
app.run();
}

public AtLeastOnceApp(AppConfiguration config) {
this.config = config;
}

public AppConfiguration getConfig() {
return config;
}

public void run() throws Exception {
// Get the provided instanceId that uniquely identifes this instances of AtLeastOnceApp.
// It will be randomly generated if not provided by the user.
final String instanceId = getConfig().getInstanceId();
log.info("instanceId={}", instanceId);

// Define configuration to connect to Pravega.
final ClientConfig clientConfig = ClientConfig.builder().controllerURI(getConfig().getControllerURI()).build();

// Create the input and output streams (ignored if they already exist).
try (StreamManager streamManager = StreamManager.create(clientConfig)) {
tkaitchuck marked this conversation as resolved.
Show resolved Hide resolved
streamManager.createScope(getConfig().getScope());
final StreamConfiguration streamConfig = StreamConfiguration.builder()
.scalingPolicy(ScalingPolicy.byEventRate(
getConfig().getTargetRateEventsPerSec(),
getConfig().getScaleFactor(),
getConfig().getMinNumSegments()))
.build();
streamManager.createStream(getConfig().getScope(), getConfig().getStream1Name(), streamConfig);
streamManager.createStream(getConfig().getScope(), getConfig().getStream2Name(), streamConfig);
// Create stream for the membership state synchronizer.
streamManager.createStream(
getConfig().getScope(),
getConfig().getMembershipSynchronizerStreamName(),
StreamConfiguration.builder().scalingPolicy(ScalingPolicy.fixed(1)).build());
}

final ReaderGroupConfig readerGroupConfig = ReaderGroupConfig.builder()
.stream(Stream.of(getConfig().getScope(), getConfig().getStream1Name()))
.automaticCheckpointIntervalMillis(getConfig().getCheckpointPeriodMs())
.build();
try (ReaderGroupManager readerGroupManager = ReaderGroupManager.withScope(getConfig().getScope(), clientConfig)) {
// Create the Reader Group (ignored if it already exists)
readerGroupManager.createReaderGroup(getConfig().getReaderGroup(), readerGroupConfig);
final ReaderGroup readerGroup = readerGroupManager.getReaderGroup(getConfig().getReaderGroup());
try (EventStreamClientFactory eventStreamClientFactory = EventStreamClientFactory.withScope(getConfig().getScope(), clientConfig);
SynchronizerClientFactory synchronizerClientFactory = SynchronizerClientFactory.withScope(getConfig().getScope(), clientConfig);
// Create a Pravega stream writer that we will send our processed output to.
EventStreamWriter<SampleEvent> writer = eventStreamClientFactory.createEventWriter(
getConfig().getStream2Name(),
new JSONSerializer<>(new TypeToken<SampleEvent>(){}.getType()),
EventWriterConfig.builder().build())) {

final AtLeastOnceProcessor<SampleEvent> processor = new AtLeastOnceProcessor<SampleEvent>(
() -> ReaderGroupPruner.create(
readerGroup,
getConfig().getMembershipSynchronizerStreamName(),
instanceId,
synchronizerClientFactory,
Executors.newScheduledThreadPool(1),
getConfig().getHeartbeatIntervalMillis()),
() -> eventStreamClientFactory.<SampleEvent>createReader(
instanceId,
readerGroup.getGroupName(),
new JSONSerializer<>(new TypeToken<SampleEvent>(){}.getType()),
ReaderConfig.builder().build()),
1000)
{
tkaitchuck marked this conversation as resolved.
Show resolved Hide resolved
/**
* Process an event that was read.
* Processing can be performed asynchronously after this method returns.
* This method must be stateless.
*
* For this demonstration, we output the same event that was read but with
* the processedBy field set.
*
* @param eventRead The event read.
*/
@Override
public void process(EventRead<SampleEvent> eventRead) {
final SampleEvent event = eventRead.getEvent();
event.processedBy = instanceId;
event.processedLatencyMs = System.currentTimeMillis() - event.timestamp;
log.info("{}", event);
writer.writeEvent(event.routingKey, event);
}

/**
* If {@link #process} did not completely process prior events, it must do so before returning.
* If writing to a Pravega stream, this should call {@link EventStreamWriter#flush}.
*/
@Override
public void flush() {
writer.flush();
}
};

processor.startAsync();

// Add shutdown hook for graceful shutdown.
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
log.info("Running shutdown hook.");
processor.stopAsync();
log.info("Waiting for processor to terminate.");
processor.awaitTerminated();
log.info("Processor terminated.");
}));

processor.awaitTerminated();
}
}
}
}
Loading