diff --git a/README.md b/README.md index 10e6c4b6..ae854c9c 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,7 @@ more complex applications as sub-projects, which show use-cases exploiting one o | `noop` | Example of how to add a simple callback executed upon a read event. | [Java](pravega-client-examples/src/main/java/io/pravega/example/noop) | `statesynchronizer` | Application that allows users to work with `StateSynchronizer` API via CLI. | [Java](pravega-client-examples/src/main/java/io/pravega/example/statesynchronizer) | `streamcuts` | Application examples demonstrating the use of `StreamCut`s via CLI. | [Java](pravega-client-examples/src/main/java/io/pravega/example/streamcuts) +| `streamprocessing` | An example that illustrates at-least-once processing using the Pravega API. | [Java](pravega-client-examples/src/main/java/io/pravega/example/streamprocessing) The related documentation and instructions are [here](pravega-client-examples). diff --git a/pravega-client-examples/README.md b/pravega-client-examples/README.md index 3a3cdb30..e73ee7e8 100644 --- a/pravega-client-examples/README.md +++ b/pravega-client-examples/README.md @@ -198,3 +198,6 @@ $ bin/secureBatchReader [-scope "myScope"] [-stream "myStream"] [-uri "tls://loc All args are optional. If not included, the default values are same as the defaults mentioned earlier for `bin\secureWriter`. + +## `streamprocessing` +See [streamprocessing](src/main/java/io/pravega/example/streamprocessing/README.md). diff --git a/pravega-client-examples/build.gradle b/pravega-client-examples/build.gradle index c903ee47..2d8e010a 100644 --- a/pravega-client-examples/build.gradle +++ b/pravega-client-examples/build.gradle @@ -26,6 +26,9 @@ ext { dependencies { testCompile "junit:junit:${junitVersion}" + testCompile "io.pravega:pravega-standalone:${pravegaVersion}" + testCompile "io.pravega:pravega-test-testcommon:${pravegaVersion}" + compileOnly "org.projectlombok:lombok:${lombokVersion}" compile "io.pravega:pravega-client:${pravegaVersion}", "io.pravega:pravega-common:${pravegaVersion}", @@ -182,6 +185,44 @@ task startSecureBatchReader(type: JavaExec) { } } +task scriptEventGenerator(type: CreateStartScripts) { + outputDir = file('build/scripts') + mainClassName = 'io.pravega.example.streamprocessing.EventGenerator' + applicationName = 'eventGenerator' + defaultJvmOpts = ["-Dlogback.configurationFile=file:conf/logback.xml"] + classpath = files(jar.archivePath) + sourceSets.main.runtimeClasspath +} + +task startEventGenerator(type: JavaExec) { + main = "io.pravega.example.streamprocessing.EventGenerator" + classpath = sourceSets.main.runtimeClasspath +} + +task scriptAtLeastOnceApp(type: CreateStartScripts) { + outputDir = file('build/scripts') + mainClassName = 'io.pravega.example.streamprocessing.AtLeastOnceApp' + applicationName = 'atLeastOnceApp' + defaultJvmOpts = ["-Dlogback.configurationFile=file:conf/logback.xml"] + classpath = files(jar.archivePath) + sourceSets.main.runtimeClasspath +} + +task startAtLeastOnceApp(type: JavaExec) { + main = "io.pravega.example.streamprocessing.AtLeastOnceApp" + classpath = sourceSets.main.runtimeClasspath +} + +task scriptEventDebugSink(type: CreateStartScripts) { + outputDir = file('build/scripts') + mainClassName = 'io.pravega.example.streamprocessing.EventDebugSink' + applicationName = 'eventDebugSink' + defaultJvmOpts = ["-Dlogback.configurationFile=file:conf/logback.xml"] + classpath = files(jar.archivePath) + sourceSets.main.runtimeClasspath +} + +task startEventDebugSink(type: JavaExec) { + main = "io.pravega.example.streamprocessing.EventDebugSink" + classpath = sourceSets.main.runtimeClasspath +} distributions { main { @@ -198,6 +239,9 @@ distributions { from project.scriptSecureWriter from project.scriptSecureReader from project.scriptSecureBatchReader + from project.scriptEventGenerator + from project.scriptAtLeastOnceApp + from project.scriptEventDebugSink } into('lib') { from(jar) diff --git a/pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/AppConfiguration.java b/pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/AppConfiguration.java new file mode 100644 index 00000000..c131d981 --- /dev/null +++ b/pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/AppConfiguration.java @@ -0,0 +1,94 @@ +/* + * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + */ +package io.pravega.example.streamprocessing; + +import java.net.URI; +import java.util.UUID; + +/** + * All parameters will come from environment variables. + * This makes it easy to configure on Docker, Kubernetes, etc. + */ +class AppConfiguration { + AppConfiguration(String[] args) { + } + + // By default, we will connect to a standalone Pravega running on localhost. + public URI getControllerURI() { + return URI.create(getEnvVar("PRAVEGA_CONTROLLER", "tcp://localhost:9090")); + } + + public String getScope() { + return getEnvVar("PRAVEGA_SCOPE", "examples"); + } + + public String getInstanceId() { + return getEnvVar("INSTANCE_ID", UUID.randomUUID().toString()); + } + + /** + * The output of EventGenerator and the input of AtLeastOnceApp. + */ + public String getStream1Name() { + return getEnvVar("PRAVEGA_STREAM_1", "streamprocessing1c"); + } + + /** + * The output of AtLeastOnceApp and the input of EventDebugSink. + */ + public String getStream2Name() { + return getEnvVar("PRAVEGA_STREAM_2", "streamprocessing2c"); + } + + public String getReaderGroup() { + return getEnvVar("PRAVEGA_READER_GROUP", "streamprocessing1c-rg1"); + } + + public String getMembershipSynchronizerStreamName() { + return getReaderGroup() + "-membership"; + } + + public int getTargetRateEventsPerSec() { + return Integer.parseInt(getEnvVar("PRAVEGA_TARGET_RATE_EVENTS_PER_SEC", "10")); + } + + public int getScaleFactor() { + return Integer.parseInt(getEnvVar("PRAVEGA_SCALE_FACTOR", "2")); + } + + public int getMinNumSegments() { + return Integer.parseInt(getEnvVar("PRAVEGA_MIN_NUM_SEGMENTS", "6")); + } + + public long getCheckpointPeriodMs() { + return Long.parseLong(getEnvVar("CHECKPOINT_PERIOD_MS", "3000")); + } + + public long getCheckpointTimeoutMs() { + return Long.parseLong(getEnvVar("CHECKPOINT_TIMEOUT_MS", "120000")); + } + + public long getTransactionTimeoutMs() { + return Long.parseLong(getEnvVar("TRANSACTION_TIMEOUT_MS", "120000")); + } + + public long getHeartbeatIntervalMillis() { + return Long.parseLong(getEnvVar("HEARTBEAT_INTERVAL_MS", "500")); + } + + private static String getEnvVar(String name, String defaultValue) { + String value = System.getenv(name); + if (value == null || value.isEmpty()) { + return defaultValue; + } + return value; + } +} diff --git a/pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/AtLeastOnceApp.java b/pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/AtLeastOnceApp.java new file mode 100644 index 00000000..6192bf85 --- /dev/null +++ b/pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/AtLeastOnceApp.java @@ -0,0 +1,145 @@ +/* + * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + */ +package io.pravega.example.streamprocessing; + +import com.google.gson.reflect.TypeToken; +import io.pravega.client.ClientConfig; +import io.pravega.client.EventStreamClientFactory; +import io.pravega.client.SynchronizerClientFactory; +import io.pravega.client.admin.ReaderGroupManager; +import io.pravega.client.admin.StreamManager; +import io.pravega.client.stream.EventRead; +import io.pravega.client.stream.EventStreamWriter; +import io.pravega.client.stream.EventWriterConfig; +import io.pravega.client.stream.ReaderConfig; +import io.pravega.client.stream.ReaderGroup; +import io.pravega.client.stream.ReaderGroupConfig; +import io.pravega.client.stream.ScalingPolicy; +import io.pravega.client.stream.Stream; +import io.pravega.client.stream.StreamConfiguration; +import lombok.Cleanup; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Executors; + +/** + * This demonstrates reading events from a Pravega stream, processing each event, + * and writing each output event to another Pravega stream. + * It guarantees that each event is processed at least once. + * If multiple instances of this application are executed using the same readerGroupName parameter, + * each instance will get a distinct subset of events. + * + * Use {@link EventGenerator} to generate input events and {@link EventDebugSink} + * to view the output events. + */ +public class AtLeastOnceApp { + private static final Logger log = LoggerFactory.getLogger(AtLeastOnceApp.class); + + private final AppConfiguration config; + private final ClientConfig clientConfig; + + public static void main(String[] args) throws Exception { + final AtLeastOnceApp app = new AtLeastOnceApp(new AppConfiguration(args)); + app.run(); + } + + public AtLeastOnceApp(AppConfiguration config) { + this.config = config; + this.clientConfig = ClientConfig.builder().controllerURI(getConfig().getControllerURI()).build(); + } + + public AppConfiguration getConfig() { + return config; + } + + private void run() { + // Get the provided instanceId that uniquely identifes this instances of AtLeastOnceApp. + // It will be randomly generated if not provided by the user. + final String instanceId = getConfig().getInstanceId(); + log.info("instanceId={}", instanceId); + + createStreams(); + + final ReaderGroupConfig readerGroupConfig = ReaderGroupConfig.builder() + .stream(Stream.of(getConfig().getScope(), getConfig().getStream1Name())) + .automaticCheckpointIntervalMillis(getConfig().getCheckpointPeriodMs()) + .build(); + @Cleanup + ReaderGroupManager readerGroupManager = ReaderGroupManager.withScope(getConfig().getScope(), clientConfig); + // Create the Reader Group (ignored if it already exists) + readerGroupManager.createReaderGroup(getConfig().getReaderGroup(), readerGroupConfig); + @Cleanup + final ReaderGroup readerGroup = readerGroupManager.getReaderGroup(getConfig().getReaderGroup()); + @Cleanup + final EventStreamClientFactory eventStreamClientFactory = EventStreamClientFactory.withScope(getConfig().getScope(), clientConfig); + @Cleanup + final SynchronizerClientFactory synchronizerClientFactory = SynchronizerClientFactory.withScope(getConfig().getScope(), clientConfig); + // Create a Pravega stream writer that we will send our processed output to. + @Cleanup + final EventStreamWriter writer = eventStreamClientFactory.createEventWriter( + getConfig().getStream2Name(), + new JSONSerializer<>(new TypeToken(){}.getType()), + EventWriterConfig.builder().build()); + + final SampleEventProcessor processor = new SampleEventProcessor( + () -> ReaderGroupPruner.create( + readerGroup, + getConfig().getMembershipSynchronizerStreamName(), + instanceId, + synchronizerClientFactory, + Executors.newScheduledThreadPool(1), + getConfig().getHeartbeatIntervalMillis()), + () -> eventStreamClientFactory.createReader( + instanceId, + readerGroup.getGroupName(), + new JSONSerializer<>(new TypeToken(){}.getType()), + ReaderConfig.builder().build()), + 1000, + instanceId, + writer); + + processor.startAsync(); + + // Add shutdown hook for graceful shutdown. + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + log.info("Running shutdown hook."); + processor.stopAsync(); + log.info("Waiting for processor to terminate."); + processor.awaitTerminated(); + log.info("Processor terminated."); + })); + + processor.awaitTerminated(); + } + + /** + * Create the input and output streams (ignored if they already exist). + */ + private void createStreams() { + try (StreamManager streamManager = StreamManager.create(clientConfig)) { + streamManager.createScope(getConfig().getScope()); + final StreamConfiguration streamConfig = StreamConfiguration.builder() + .scalingPolicy(ScalingPolicy.byEventRate( + getConfig().getTargetRateEventsPerSec(), + getConfig().getScaleFactor(), + getConfig().getMinNumSegments())) + .build(); + streamManager.createStream(getConfig().getScope(), getConfig().getStream1Name(), streamConfig); + streamManager.createStream(getConfig().getScope(), getConfig().getStream2Name(), streamConfig); + // Create stream for the membership state synchronizer. + streamManager.createStream( + getConfig().getScope(), + getConfig().getMembershipSynchronizerStreamName(), + StreamConfiguration.builder().scalingPolicy(ScalingPolicy.fixed(1)).build()); + } + } +} diff --git a/pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/AtLeastOnceProcessor.java b/pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/AtLeastOnceProcessor.java new file mode 100644 index 00000000..dcb23a57 --- /dev/null +++ b/pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/AtLeastOnceProcessor.java @@ -0,0 +1,113 @@ +/* + * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + */ +package io.pravega.example.streamprocessing; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.AbstractExecutionThreadService; +import io.pravega.client.stream.EventRead; +import io.pravega.client.stream.EventStreamReader; +import io.pravega.client.stream.EventStreamWriter; +import io.pravega.client.stream.Position; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.function.Supplier; + +/** + * This is an abstract class for implementing a stateless event processor with Pravega. + * It reads an event from a Pravega stream and then calls a user-defined function to process it. + * It guarantees that each event is processed at least once, even if failures occur. + * If multiple instances are executed using the same readerGroup parameter, + * each instance will get a distinct subset of events. + * Instances can be in different processes. + */ +abstract public class AtLeastOnceProcessor extends AbstractExecutionThreadService { + private static final Logger log = LoggerFactory.getLogger(AtLeastOnceProcessor.class); + + private final Supplier prunerSupplier; + private final Supplier> readerSupplier; + private final long readTimeoutMillis; + + public AtLeastOnceProcessor(Supplier prunerSupplier, Supplier> readerSupplier, long readTimeoutMillis) { + this.prunerSupplier = prunerSupplier; + this.readerSupplier = readerSupplier; + this.readTimeoutMillis = readTimeoutMillis; + } + + /** + * Run the event processor loop. + */ + @Override + protected void run() throws Exception { + // It is critical that the ReaderGroupPruner is running (and therefore has added itself to the membership synchronizer) + // before the EventStreamReader is created. Otherwise, another ReaderGroupPruner instance may place this reader offline. + // It is also critical that when this method stops running the ReaderGroupPruner is eventually stopped so that + // it no longer sends heartbeats. + try (final ReaderGroupPruner pruner = prunerSupplier.get()) { + final EventStreamReader reader = readerSupplier.get(); + Position lastProcessedPosition = null; + Position lastFlushedPosition = null; + try { + while (isRunning()) { + final EventRead eventRead = reader.readNextEvent(readTimeoutMillis); + log.info("eventRead={}", eventRead); + // We must inject the fault between read and process. + // This ensures that a *new* event cannot be processed after the fault injection latch is set. + injectFault(pruner); + if (eventRead.isCheckpoint()) { + flush(); + lastProcessedPosition = eventRead.getPosition(); + lastFlushedPosition = lastProcessedPosition; + } else if (eventRead.getEvent() != null) { + try { + process(eventRead); + lastProcessedPosition = eventRead.getPosition(); + } catch (Exception e) { + // If an exception occurs during processing, attempt to flush. + flush(); + lastFlushedPosition = lastProcessedPosition; + throw e; + } + } + } + // Gracefully stop. + log.info("Stopping"); + flush(); + lastFlushedPosition = lastProcessedPosition; + } finally { + log.info("Closing reader"); + // Note that if lastFlushedPosition is null, the segment offset used by a future reader will remain unchanged. + reader.closeAt(lastFlushedPosition); + } + } + log.info("Stopped"); + } + + /** + * Process an event that was read. + * Processing can be performed asynchronously after this method returns. + * This method must be stateless. + * + * @param eventRead The event read. + */ + abstract public void process(EventRead eventRead) throws Exception; + + /** + * If {@link #process} did not completely process prior events, it must do so before returning. + * If writing to a Pravega stream, this should call {@link EventStreamWriter#flush}. + */ + public void flush() throws Exception { + } + + @VisibleForTesting + protected void injectFault(ReaderGroupPruner pruner) throws Exception { + } +} diff --git a/pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/EventDebugSink.java b/pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/EventDebugSink.java new file mode 100644 index 00000000..6af158b4 --- /dev/null +++ b/pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/EventDebugSink.java @@ -0,0 +1,105 @@ +/* + * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + */ +package io.pravega.example.streamprocessing; + +import com.google.gson.reflect.TypeToken; +import io.pravega.client.ClientConfig; +import io.pravega.client.EventStreamClientFactory; +import io.pravega.client.admin.ReaderGroupManager; +import io.pravega.client.admin.StreamManager; +import io.pravega.client.stream.EventRead; +import io.pravega.client.stream.EventStreamReader; +import io.pravega.client.stream.ReaderConfig; +import io.pravega.client.stream.ReaderGroupConfig; +import io.pravega.client.stream.ScalingPolicy; +import io.pravega.client.stream.Stream; +import io.pravega.client.stream.StreamConfiguration; +import lombok.Cleanup; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.UUID; + +/** + * A simple example that continuously shows the events in a stream. + * Each instance of this class will read the entire stream. + */ +public class EventDebugSink { + private static final Logger log = LoggerFactory.getLogger(EventDebugSink.class); + + private static final int READER_TIMEOUT_MS = 2000; + + private final AppConfiguration config; + + public static void main(String[] args) throws Exception { + EventDebugSink app = new EventDebugSink(new AppConfiguration(args)); + app.run(); + } + + public EventDebugSink(AppConfiguration config) { + this.config = config; + } + + private AppConfiguration getConfig() { + return config; + } + + private void run() { + final ClientConfig clientConfig = ClientConfig.builder().controllerURI(getConfig().getControllerURI()).build(); + createStream(); + + // Create a reader group that begins at the earliest event. + final String readerGroup = UUID.randomUUID().toString().replace("-", ""); + final ReaderGroupConfig readerGroupConfig = ReaderGroupConfig.builder() + .stream(Stream.of(getConfig().getScope(), getConfig().getStream2Name())) + .build(); + + try (final ReaderGroupManager readerGroupManager = ReaderGroupManager.withScope(getConfig().getScope(), getConfig().getControllerURI())) { + readerGroupManager.createReaderGroup(readerGroup, readerGroupConfig); + try (EventStreamClientFactory clientFactory = EventStreamClientFactory.withScope(getConfig().getScope(), clientConfig); + EventStreamReader reader = clientFactory.createReader( + "reader", + readerGroup, + new JSONSerializer<>(new TypeToken() {}.getType()), + ReaderConfig.builder().build())) { + long eventCounter = 0; + long sum = 0; + for (; ; ) { + final EventRead eventRead = reader.readNextEvent(READER_TIMEOUT_MS); + if (eventRead.getEvent() != null) { + eventCounter++; + sum += eventRead.getEvent().intData; + log.info("eventCounter={}, sum={}, event={}", + String.format("%6d", eventCounter), + String.format("%8d", sum), + eventRead.getEvent()); + } + } + } finally { + // Delete the reader group since it is not intended to be shared with any other instances. + readerGroupManager.deleteReaderGroup(readerGroup); + } + } + } + + private void createStream() { + try (final StreamManager streamManager = StreamManager.create(getConfig().getControllerURI())) { + streamManager.createScope(getConfig().getScope()); + StreamConfiguration streamConfig = StreamConfiguration.builder() + .scalingPolicy(ScalingPolicy.byEventRate( + getConfig().getTargetRateEventsPerSec(), + getConfig().getScaleFactor(), + getConfig().getMinNumSegments())) + .build(); + streamManager.createStream(getConfig().getScope(), getConfig().getStream2Name(), streamConfig); + } + } +} diff --git a/pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/EventGenerator.java b/pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/EventGenerator.java new file mode 100644 index 00000000..8f6f97dc --- /dev/null +++ b/pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/EventGenerator.java @@ -0,0 +1,95 @@ +/* + * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + */ +package io.pravega.example.streamprocessing; + +import com.google.gson.reflect.TypeToken; +import io.pravega.client.ClientConfig; +import io.pravega.client.EventStreamClientFactory; +import io.pravega.client.admin.StreamManager; +import io.pravega.client.stream.EventStreamWriter; +import io.pravega.client.stream.EventWriterConfig; +import io.pravega.client.stream.ScalingPolicy; +import io.pravega.client.stream.StreamConfiguration; +import lombok.Cleanup; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Random; +import java.util.concurrent.CompletableFuture; + +/** + * A simple example app to write messages to a Pravega stream. + */ +public class EventGenerator { + private static final Logger log = LoggerFactory.getLogger(EventGenerator.class); + + private final AppConfiguration config; + + public static void main(String[] args) throws Exception { + final EventGenerator app = new EventGenerator(new AppConfiguration(args)); + app.run(); + } + + public EventGenerator(AppConfiguration config) { + this.config = config; + } + + public AppConfiguration getConfig() { + return config; + } + + private void run() throws Exception { + final ClientConfig clientConfig = ClientConfig.builder().controllerURI(getConfig().getControllerURI()).build(); + createStreams(); + final Random rand = new Random(42); + try (final EventStreamClientFactory clientFactory = EventStreamClientFactory.withScope(getConfig().getScope(), clientConfig)) { + try (final EventStreamWriter writer = clientFactory.createEventWriter( + getConfig().getStream1Name(), + new JSONSerializer<>(new TypeToken() {}.getType()), + EventWriterConfig.builder().build())) { + long sequenceNumber = 0; + long sum = 0; + for (; ; ) { + sequenceNumber++; + final String routingKey = String.format("%3d", rand.nextInt(1000)); + final int intData = rand.nextInt(1000); + sum += intData; + final SampleEvent event = new SampleEvent( + sequenceNumber, + routingKey, + intData, + sum); + log.info("{}", event); + final CompletableFuture writeFuture = writer.writeEvent(event.routingKey, event); + final long ackedSequenceNumber = sequenceNumber; + writeFuture.thenRun(() -> log.debug("Acknowledged: sequenceNumber={}", ackedSequenceNumber)); + Thread.sleep(1000); + } + } + } + } + + private void createStreams() { + try (StreamManager streamManager = StreamManager.create(getConfig().getControllerURI())) { + streamManager.createScope(getConfig().getScope()); + StreamConfiguration streamConfig = StreamConfiguration.builder() + .scalingPolicy(ScalingPolicy.byEventRate( + getConfig().getTargetRateEventsPerSec(), + getConfig().getScaleFactor(), + getConfig().getMinNumSegments())) + .build(); + streamManager.createStream(getConfig().getScope(), getConfig().getStream1Name(), streamConfig); + streamManager.updateStream(getConfig().getScope(), getConfig().getStream1Name(), streamConfig); + } + } +} diff --git a/pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/JSONSerializer.java b/pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/JSONSerializer.java new file mode 100644 index 00000000..7cbdead9 --- /dev/null +++ b/pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/JSONSerializer.java @@ -0,0 +1,56 @@ +/** + * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.pravega.example.streamprocessing; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import io.pravega.client.stream.Serializer; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import java.lang.reflect.Type; +import java.nio.ByteBuffer; + +import static java.nio.charset.StandardCharsets.UTF_8; + +/** + * An implmentation of {@link Serializer} that uses JSON serialization. + * @param Type of Object to be serialized. + */ +@Slf4j +@AllArgsConstructor +public class JSONSerializer implements Serializer { + + /** + * This is used to represent the generic type. The user can use {@link com.google.gson.reflect.TypeToken} to represent + * the type of the object which will be serialized/deserialized. + */ + private final Type type; + private final Gson gson = new GsonBuilder().create(); + + /** + * {@inheritDoc} + * JSON based serialization. + */ + @Override + public ByteBuffer serialize(T value) { + return ByteBuffer.wrap(gson.toJson(value, type).getBytes(UTF_8)); + } + + /** + * {@inheritDoc} + * JSON based deserialization. + */ + @Override + public T deserialize(ByteBuffer serializedValue) { + return gson.fromJson(UTF_8.decode(serializedValue).toString(), type); + } + +} \ No newline at end of file diff --git a/pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/MembershipSynchronizer.java b/pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/MembershipSynchronizer.java new file mode 100644 index 00000000..8e6ac71c --- /dev/null +++ b/pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/MembershipSynchronizer.java @@ -0,0 +1,296 @@ +/* + * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + */ +package io.pravega.example.streamprocessing; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.AbstractService; +import io.pravega.client.SynchronizerClientFactory; +import io.pravega.client.state.InitialUpdate; +import io.pravega.client.state.Revision; +import io.pravega.client.state.Revisioned; +import io.pravega.client.state.StateSynchronizer; +import io.pravega.client.state.SynchronizerConfig; +import io.pravega.client.state.Update; +import io.pravega.client.stream.impl.JavaSerializer; +import lombok.Data; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import java.io.Serializable; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +/** + * This class maintains a list of process names for processes that are healthy, + * as indicated by periodic heartbeat messages. + * It uses a Pravega state synchronizer to allow processes to communicate + * with each other. + */ +@Slf4j +public class MembershipSynchronizer extends AbstractService { + + /** + * Number of intervals behind before switching to unconditional updates. + */ + private static final int UNCONDITIONAL_UPDATE_THRESHOLD = 3; + + /** + * Number of intervals behind before we should stop executing for safety. + */ + private static final int UNHEALTHY_THRESHOLD = 5; + + /** + * Number of intervals behind before another host should be considered dead. + */ + private static final int DEATH_THRESHOLD = 10; + + /** + * Unique identifier for this member. + */ + private final String instanceId; + + /** + * How frequently to update the segment using a heartbeat. + */ + private final long heartbeatIntervalMillis; + + private final AtomicBoolean healthy = new AtomicBoolean(); + + private final ScheduledExecutorService executor; + + private final StateSynchronizer stateSync; + private final MembershipListener listener; + private ScheduledFuture task; + + MembershipSynchronizer(String streamName, String instanceId, long heartbeatIntervalMillis, SynchronizerClientFactory clientFactory, ScheduledExecutorService executor, + MembershipListener listener) { + Preconditions.checkNotNull(streamName); + Preconditions.checkNotNull(clientFactory); + Preconditions.checkNotNull(listener); + this.instanceId = instanceId; + this.heartbeatIntervalMillis = heartbeatIntervalMillis; + this.executor = executor; + this.listener = listener; + stateSync = clientFactory.createStateSynchronizer(streamName, + new JavaSerializer(), + new JavaSerializer(), + SynchronizerConfig.builder().build()); + } + + @Data + private static class LiveInstances + implements Revisioned, Comparable, Serializable, InitialUpdate { + + private static final long serialVersionUID = 1L; + private final String scopedStreamName; + private final Revision revision; + private final Map liveInstances; + private final long vectorTime; + + @Override + public int compareTo(LiveInstances o) { + return revision.compareTo(o.revision); + } + + @Override + public LiveInstances create(String scopedStreamName, Revision revision) { + return new LiveInstances(scopedStreamName, revision, liveInstances, vectorTime); + } + + public boolean isHealthy(String instance) { + long unhealthyThreshold = vectorTime - UNHEALTHY_THRESHOLD * liveInstances.size(); + Long time = liveInstances.get(instance); + return time == null || time >= unhealthyThreshold; + } + + /** + * If a host is behind in it's heartbeating it does not want to become unhealthy as it might + * need to halt execution. So it can use unconditional writes to updates itself more quickly + * by avoiding contention. + */ + public boolean isOverUnconditionalThreshold(String instance) { + long updateThreshold = vectorTime - UNCONDITIONAL_UPDATE_THRESHOLD * liveInstances.size(); + Long time = liveInstances.get(instance); + return time == null || time < updateThreshold; + } + + public List findInstancesThatWillDieBy(long vectorTime) { + long deathThreshold = vectorTime - DEATH_THRESHOLD * liveInstances.size(); + return liveInstances.entrySet() + .stream() + .filter(entry -> entry.getValue() < deathThreshold) + .map(entry -> entry.getKey()) + .collect(Collectors.toList()); + } + + public Set getLiveInstances() { + return liveInstances.keySet(); + } + } + + private static class CreateState implements Serializable, InitialUpdate { + private static final long serialVersionUID = 1L; + + @Override + public LiveInstances create(String scopedStreamName, Revision revision) { + return new LiveInstances(scopedStreamName, revision, new HashMap<>(), 0); + } + } + + private class HeartBeater implements Runnable { + @Override + public void run() { + try { + stateSync.fetchUpdates(); + notifyListener(); + log.debug("run: BEGIN: vectorTime={}, isOverUnconditionalThreshold={}, liveInstances={}", + stateSync.getState().getVectorTime(), + stateSync.getState().isOverUnconditionalThreshold(instanceId), + stateSync.getState().liveInstances); + if (!stateSync.getState().isOverUnconditionalThreshold(instanceId)) { + stateSync.updateState((state, updates) -> { + long vectorTime = state.getVectorTime() + 1; + updates.add(new HeartBeat(instanceId, vectorTime)); + for (String id : state.findInstancesThatWillDieBy(vectorTime)) { + if (!id.equals(instanceId)) { + updates.add(new DeclareDead(id)); + } + } + }); + } else { + stateSync.updateStateUnconditionally(new HeartBeat(instanceId, stateSync.getState().vectorTime)); + stateSync.fetchUpdates(); + } + notifyListener(); + log.debug("run: END: vectorTime={}, liveInstances={}", + stateSync.getState().getVectorTime(), + stateSync.getState().liveInstances); + } catch (Exception e) { + log.warn("Encountered an error while heartbeating", e); + if (healthy.compareAndSet(true, false)) { + listener.unhealthy(); + } + } + } + } + + private static abstract class HeartbeatUpdate implements Update, Serializable { + private static final long serialVersionUID = 1L; + } + + @RequiredArgsConstructor + private static class HeartBeat extends HeartbeatUpdate { + private static final long serialVersionUID = 1L; + private final String name; + private final long timestamp; + + @Override + public LiveInstances applyTo(LiveInstances state, Revision newRevision) { + Map timestamps = new HashMap<>(state.liveInstances); + long vectorTime = Long.max(timestamps.values().stream().max(Long::compare).orElse(0L), timestamp); + timestamps.put(name, timestamp); + return new LiveInstances(state.scopedStreamName, + newRevision, + Collections.unmodifiableMap(timestamps), + vectorTime); + } + } + + @RequiredArgsConstructor + private static final class DeclareDead extends HeartbeatUpdate { + private static final long serialVersionUID = 1L; + private final String name; + + @Override + public LiveInstances applyTo(LiveInstances state, Revision newRevision) { + Map timestamps = new HashMap<>(state.liveInstances); + timestamps.remove(name); + return new LiveInstances(state.scopedStreamName, + newRevision, + Collections.unmodifiableMap(timestamps), + state.vectorTime); + } + } + + public void notifyListener() { + LiveInstances currentState = stateSync.getState(); + if (currentState.isHealthy(instanceId)) { + if (healthy.compareAndSet(false, true)) { + listener.healthy(); + } + } else { + if (healthy.compareAndSet(true, false)) { + listener.unhealthy(); + } + } + } + + public boolean isCurrentlyHealthy() { + return healthy.get(); + } + + public Set getCurrentMembers() { + stateSync.fetchUpdates(); + return stateSync.getState().getLiveInstances(); + } + + public interface MembershipListener { + default void healthy() {}; + + default void unhealthy() {}; + } + + @Override + protected void doStart() { + // Create initial empty state if stream is empty. + stateSync.initialize(new CreateState()); + // Try to ensure that this instance is considered healthy before returning. + stateSync.fetchUpdates(); + stateSync.updateStateUnconditionally(new HeartBeat(instanceId, stateSync.getState().vectorTime)); + stateSync.fetchUpdates(); + notifyListener(); + + task = executor.scheduleAtFixedRate( + new HeartBeater(), + heartbeatIntervalMillis, + heartbeatIntervalMillis, + TimeUnit.MILLISECONDS); + notifyStarted(); + } + + @Override + protected void doStop() { + task.cancel(false); + } + + @VisibleForTesting + public void pause() { + log.warn("paused"); + task.cancel(false); + } + + /** + * Not implemented. + */ + @VisibleForTesting + public void unpause() { + log.warn("unpause"); + } +} diff --git a/pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/README.md b/pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/README.md new file mode 100644 index 00000000..5e9da864 --- /dev/null +++ b/pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/README.md @@ -0,0 +1,212 @@ +# Pravega Stream Processing Example + +# Overview + +The examples in this directory are intended to illustrate how at-least-once semantics can be achieved with Pravega. +These show how to use Pravega directly, without relying on an external framework such as Apache Flink. + +These examples include: + +- [EventGenerator](EventGenerator.java): + This application generates new events every 1 second + and writes them to a Pravega stream (referred to as stream1). + +- [AtLeastOnceApp](AtLeastOnceApp.java): + This application demonstrates reading events from a Pravega stream (stream1), processing each event, + and writing each output event to another Pravega stream (stream2). + It guarantees that each event is processed at least once, even in the presence of failures. + If multiple instances of this application are executed using the same Reader Group name, + each instance will get a distinct subset of events, providing load balancing and redundancy. + The user-defined processing function must be stateless. + + There is no direct dependency on other systems, except for Pravega. + In particular, this application does not *directly* use a distributed file system, nor ZooKeeper. + Coordination between processes occurs only through Pravega streams such as + Reader Groups and State Synchronizers. + +- [EventDebugSink](EventDebugSink.java): + This application reads events from stream2 and displays them on the console. + +# How to Run + +- (Optional) Enable INFO (or DEBUG) level logging by editing the file [logback.xml](../../../../../resources/logback.xml). + Update it to include: + ``` + + ``` + +- Start the event generator. + ```shell + cd ~/pravega-samples + ./gradlew pravega-client-examples:startEventGenerator + ``` + + All parameters are provided as environment variables. + You can either set them in your shell (`export PRAVEGA_SCOPE=examples`) or use the below syntax. + + If you are using a non-local Pravega instance, specify the controller as follows: + ```shell + PRAVEGA_CONTROLLER=tcp://pravega.example.com:9090 ./gradlew pravega-client-examples:startEventGenerator + ``` + + Multiple parameters can be specified as follows. + ```shell + PRAVEGA_SCOPE=examples PRAVEGA_CONTROLLER=tcp://localhost:9090 ./gradlew pravega-client-examples:startEventGenerator + ``` + + See [AppConfiguration.java](AppConfiguration.java) for available parameters. + +- You will see log lines showing the generated events, as follows: + ``` + EventGenerator: SampleEvent{sequenceNumber= 144, routingKey=819, intData=788, sum= 71872, timestampStr=2021-03-27T00:42:41.549Z, processedLatencyMs= 0, processedBy=null} + EventGenerator: SampleEvent{sequenceNumber= 145, routingKey=725, intData=590, sum= 72462, timestampStr=2021-03-27T00:42:42.551Z, processedLatencyMs= 0, processedBy=null} + ``` + +- In another window, start one or more instances of the stream processor. + The `runAtLeastOnceApp.sh` script can be used to run multiple instances concurrently. + + ```shell + cd ~/pravega-samples + scripts/runAtLeastOnceApp.sh 2 + ``` + + You may view the log files `/tmp/atLeastOnceApp-*.log`. + +- The log file `/tmp/atLeastOnceApp-02.log` will show the output of processor 02, which will process approximately half of the events. + ``` + SampleEventProcessor: SampleEvent{sequenceNumber= 144, routingKey=819, intData=788, sum= 71872, timestampStr=2021-03-27T00:42:41.549Z, processedLatencyMs= 21, processedBy=02} + SampleEventProcessor: SampleEvent{sequenceNumber= 145, routingKey=725, intData=590, sum= 72462, timestampStr=2021-03-27T00:42:42.551Z, processedLatencyMs= 284, processedBy=02} + ``` + +- Start a single instance of the event debug sink: + ```shell + ./gradlew pravega-client-examples:startEventDebugSink + ``` + +- The event debug sink will read from the sine Pravega stream that the multiple instances of AtLeastOnceApp are writing to. + ``` + EventDebugSink: eventCounter= 134, sum= 63485, event=SampleEvent{sequenceNumber= 144, routingKey=819, intData=788, sum= 71872, timestampStr=2021-03-27T00:42:41.549Z, processedLatencyMs= 21, processedBy=02} + EventDebugSink: eventCounter= 220, sum= 109622, event=SampleEvent{sequenceNumber= 145, routingKey=725, intData=590, sum= 72462, timestampStr=2021-03-27T00:42:42.551Z, processedLatencyMs= 284, processedBy=02} + ``` + +- Go ahead and kill one of the AtLeastOnceApp processes. + ```shell + kill $(jps | grep AtLeastOnceApp | awk '{print $1;}') + ``` + +- The remaining AtLeastOnceApp will immediately begin processing any segments that were assigned to the killed process. + +- Run the kill command again to kill the remaining process. + Now you will see that the event debug sink stops showing new events. + +- Start one new processor with `scripts/runAtLeastOnceApp.sh 1`. + You will see the event debug sink resumes showing processed events, with no gaps. + Since the AtLeastOnceApp was stopped gracefully, there will also be no duplicates. + +- Repeat this experiment with `kill -9` to see how it behaves with an ungraceful shutdown. + You may see duplicates. + +# Parallelism + +A Reader Group will provide load balancing by assigning each reader a distinct set of segments (partitions). +Readers may not share segments. +If there are more readers than segments, the extra readers will be idle. +The number of segments is controlled by the stream's scaling policy. +A scaling policy can specify a fixed number of segments or a dynamic number of segments based on +the target write rate, measured as events/sec or bytes/sec. + +# Achieving At-Least-Once Semantics + +## Reader Groups + +Pravega uses a [Reader Group](http://pravega.io/docs/latest/reader-group-design/) to coordinate +load balancing across multiple readers of a Pravega stream. +A Reader Group tracks the position (byte offsets) of each reader. +The Reader Group is the central building block for the at-least-once processor. + +## Checkpoints + +A checkpoint can be initiated on a Reader Group by writing a `ReaderGroupState.CreateCheckpoint` event, +causing all readers to receive a checkpoint event when they call `EventStreamReader.readNextEvent()`. +When a reader receives a checkpoint event, +it is expected to flush any pending writes and then it will notify the Reader Group by writing a +`ReaderGroupState.CheckpointReader` event as represented below. + +```java +class CheckpointReader extends ReaderGroupStateUpdate { + String checkpointId; + String readerId; + Map positions; // map from segment to byte offset of the next event to read + // ... +} +``` + +By default, checkpoints are initiated automatically by the Reader Group. +The interval can be specified using `ReaderGroupConfig.automaticCheckpointIntervalMillis()`. + +## Graceful shutdown + +A graceful shutdown of an at-least-once process requires that it call +`EventStreamReader.closeAt(position)`, where `position` +is the position object of the last event that was successfully processed. +If `position` is null, this will indicate that the reader did not successfully process any events. +When graceful shutdowns occur, events will be processed exactly once. + +## Exceptional shutdown + +An exceptional shutdown occurs when `AtLeastOnceProcessor.run()` encounters a fatal exception. +This is handled similarly to the graceful shutdown case. + +## Ungraceful shutdown + +An ungraceful shutdown of a process can occur when the host abruptly loses power, +when it loses its network connection, or when the process is terminated with `kill -9`. +In these situations, the process is unable to successfully call `EventStreamReader.closeAt(position)`. +However, the Reader Group guarantees that any assigned segments will remain assigned +to the reader until `readerOffline()` is called. +If no action is taken, the events in the segments assigned to the dead worker's reader will never be read. +Therefore, we must have another process that is able to detect any dead workers +and call `readerOffline(readerId, lastPosition)` on their behalf. +The `ReaderGroupPruner` class provides this functionality. +Each worker process runs an instance of `ReaderGroupPruner`. +When it detects a dead worker, it calls `readerOffline(readerId, lastPosition)` with +a null value for `lastPosition`. +The null value for `lastPosition` indicates that it should use the position stored in the +last `ReaderGroupState.CheckpointReader` event written by that reader in the Reader Group. +Any events processed by the now-dead worker after the last checkpoint will be reprocessed by other workers. +If writes are not idempotent, this will produce duplicates. + +In order to detect dead workers, each worker process must run an instance of `ReaderGroupPruner`. +`ReaderGroupPruner` uses a [MembershipSynchronizer](MembershipSynchronizer.java) which uses a +[State Synchronizer](http://pravega.io/docs/latest/state-synchronizer-design/) to +maintain the set of workers that are providing heartbeats. +Each worker sends a heartbeat by adding an update to the `MembershipSynchronizer`'s State Synchronizer. +Workers that fail to provide a heartbeat after 10 intervals will be removed from the `MembershipSynchronizer`. +Finally, any workers in the Reader Group that are not in the `MembershipSynchronizer` are +considered dead and `readerOffline(readerId, null)` will be called by one or more instances of `ReaderGroupPruner`. + +## Stream Life Cycle + +The first instance of `AtLeastOnceApp` will create new State Synchronizer streams for the Reader Group and the `MembershipSynchronizer`. +All subsequent instances will use the existing streams. +It is expected that the number of worker processes may scale up and down and may even scale to zero processes during maintenance windows. +To ensure that the position of readers is not lost, the `AtLastOnceApp` will not automatically delete these streams. +These streams can be deleted manually when it is known that the application will not need to restart from the +stored positions. + +# Achieving Exactly Once Semantics + +Exactly-once semantics can be achieved by using an idempotent writer with an at-least-once processor. +The `AtLeastOnceApp` writes its output to a non-transactional Pravega stream using `EventStreamWriter` which is *not* idempotent. +However, if this were modified to write to a key/value store or relational database +with a deterministic key, then the writer would be idempotent and the system would provide exactly-once semantics. +Another technique is to use a transactional Pravega stream, which does offer idempotence. + +# Stateful Exactly-once Semantics with Apache Flink + +Although achieving at-least-once semantics with a parallel stateless processor is relatively simple +as shown here, it becomes significantly more complex when state must be managed +or when exactly-once semantics is required without an idempotent writer. +In these cases, [Apache Flink](https://flink.apache.org/) with the +[Pravega connectors](https://github.com/pravega/flink-connectors) provides a framework +to greatly simplify application development. diff --git a/pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/ReaderGroupPruner.java b/pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/ReaderGroupPruner.java new file mode 100644 index 00000000..195bf4f5 --- /dev/null +++ b/pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/ReaderGroupPruner.java @@ -0,0 +1,123 @@ +/* + * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + */ +package io.pravega.example.streamprocessing; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.AbstractService; +import io.pravega.client.SynchronizerClientFactory; +import io.pravega.client.stream.ReaderGroup; +import lombok.extern.slf4j.Slf4j; + +import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * This class removes unhealthy processes from a Pravega reader group. + * It uses {@link MembershipSynchronizer} to identify healthy processes. + */ +@Slf4j +public class ReaderGroupPruner extends AbstractService implements AutoCloseable { + private final ReaderGroup readerGroup; + private final MembershipSynchronizer membershipSynchronizer; + private final ScheduledExecutorService executor; + private final long heartbeatIntervalMillis; + + private ScheduledFuture task; + + public static ReaderGroupPruner create(ReaderGroup readerGroup, String membershipSynchronizerStreamName, + String readerId, SynchronizerClientFactory clientFactory, + ScheduledExecutorService executor, long heartbeatIntervalMillis) { + final ReaderGroupPruner pruner = new ReaderGroupPruner(readerGroup, membershipSynchronizerStreamName, readerId, clientFactory, + executor, heartbeatIntervalMillis); + pruner.startAsync(); + pruner.awaitRunning(); + return pruner; + } + + public ReaderGroupPruner(ReaderGroup readerGroup, String membershipSynchronizerStreamName, String readerId, + SynchronizerClientFactory clientFactory, + ScheduledExecutorService executor, long heartbeatIntervalMillis) { + this.readerGroup = readerGroup; + this.membershipSynchronizer = new MembershipSynchronizer( + membershipSynchronizerStreamName, + readerId, + heartbeatIntervalMillis, + clientFactory, + executor, + new MembershipSynchronizer.MembershipListener() {}); + this.executor = executor; + this.heartbeatIntervalMillis = heartbeatIntervalMillis; + } + + private class PruneRunner implements Runnable { + @Override + public void run() { + try { + Set rgMembers = readerGroup.getOnlineReaders(); + Set msMembers = membershipSynchronizer.getCurrentMembers(); + log.debug("rgMembers={}", rgMembers); + log.debug("msMembers={}", msMembers); + rgMembers.removeAll(msMembers); + rgMembers.forEach(readerId -> { + log.info("Removing dead reader {} from reader group {}", readerId, readerGroup.getGroupName()); + readerGroup.readerOffline(readerId, null); + }); + } catch (Exception e) { + log.warn("Encountered an error while pruning reader group {}", readerGroup.getGroupName(), e); + // Ignore error. It will retry at the next iteration. + } + } + } + + @Override + protected void doStart() { + // We must ensure that we add this reader to the membership synchronizer before the reader group. + membershipSynchronizer.startAsync(); + membershipSynchronizer.awaitRunning(); + // Initial delay will be between 50% and 100% of the heartbeat interval. + // Although this is not needed for correctness, it reduces unnecessary load + // caused by multiple processes attempting to put the same reader offline. + final long initialDelay = (long) (heartbeatIntervalMillis * (0.5 + Math.random() * 0.5)); + // Period will between 90% and 100% of the heartbeat interval. + final long period = (long) (heartbeatIntervalMillis * (0.9 + Math.random() * 0.1)); + task = executor.scheduleAtFixedRate(new PruneRunner(), initialDelay, period, TimeUnit.MILLISECONDS); + notifyStarted(); + } + + @Override + protected void doStop() { + task.cancel(false); + membershipSynchronizer.stopAsync(); + } + + @VisibleForTesting + public void pause() { + log.warn("paused"); + task.cancel(false); + membershipSynchronizer.pause(); + } + + /** + * Not implemented. + */ + @VisibleForTesting + public void unpause() { + log.warn("unpause"); + membershipSynchronizer.unpause(); + } + + @Override + public void close() throws Exception { + stopAsync(); + } +} diff --git a/pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/SampleEvent.java b/pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/SampleEvent.java new file mode 100644 index 00000000..7a8e46d9 --- /dev/null +++ b/pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/SampleEvent.java @@ -0,0 +1,64 @@ +/* + * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + */ +package io.pravega.example.streamprocessing; + +import java.text.SimpleDateFormat; +import java.util.Date; + +public class SampleEvent { + final public long sequenceNumber; + final public String routingKey; + final public long intData; + final public long sum; + final public long timestamp; + final public String timestampStr; + final public long processedLatencyMs; + final public String processedBy; + + static final private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"); + + /// Create a new SampleEvent that represents an unprocessed event. + public SampleEvent(long sequenceNumber, String routingKey, long intData, long sum) { + this.sequenceNumber = sequenceNumber; + this.routingKey = routingKey; + this.intData = intData; + this.sum = sum; + this.timestamp = System.currentTimeMillis(); + this.timestampStr = dateFormat.format(new Date(this.timestamp)); + this.processedLatencyMs = 0; + this.processedBy = null; + } + + /// Create a new SampleEvent that represents a processed event. + public SampleEvent(SampleEvent event, String processedBy) { + this.sequenceNumber = event.sequenceNumber; + this.routingKey = event.routingKey; + this.intData = event.intData; + this.sum = event.sum; + this.timestamp = event.timestamp; + this.timestampStr = event.timestampStr; + this.processedLatencyMs = System.currentTimeMillis() - event.timestamp; + this.processedBy = processedBy; + } + + @Override + public String toString() { + return "SampleEvent{" + + "sequenceNumber=" + String.format("%6d", sequenceNumber) + + ", routingKey=" + routingKey + + ", intData=" + String.format("%3d", intData) + + ", sum=" + String.format("%8d", sum) + + ", timestampStr=" + timestampStr + + ", processedLatencyMs=" + String.format("%6d", processedLatencyMs) + + ", processedBy=" + processedBy + + '}'; + } +} diff --git a/pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/SampleEventProcessor.java b/pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/SampleEventProcessor.java new file mode 100644 index 00000000..2a6dc717 --- /dev/null +++ b/pravega-client-examples/src/main/java/io/pravega/example/streamprocessing/SampleEventProcessor.java @@ -0,0 +1,67 @@ +/* + * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + */ +package io.pravega.example.streamprocessing; + +import io.pravega.client.stream.EventRead; +import io.pravega.client.stream.EventStreamReader; +import io.pravega.client.stream.EventStreamWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.function.Supplier; + +/** + * This is an implementation of AtLeastOnceProcessor that performs a simple + * operation to demonstrate processing a SampleEvent. + * For this demonstration, we output to a Pravega stream the same event that was read but with + * the processedBy field set. + */ +public class SampleEventProcessor extends AtLeastOnceProcessor { + private static final Logger log = LoggerFactory.getLogger(SampleEventProcessor.class); + + private final String instanceId; + private final EventStreamWriter writer; + + public SampleEventProcessor(Supplier prunerSupplier, + Supplier> readerSupplier, + long readTimeoutMillis, + String instanceId, + EventStreamWriter writer) { + super(prunerSupplier, readerSupplier, readTimeoutMillis); + this.instanceId = instanceId; + this.writer = writer; + } + + /** + * Process an event that was read. + * Processing can be performed asynchronously after this method returns. + * This method must be stateless. + * + * @param eventRead The event read. + */ + @Override + public void process(EventRead eventRead) { + final SampleEvent event = eventRead.getEvent(); + // This is where we would do something useful with the event. + final SampleEvent processedEvent = new SampleEvent(event, instanceId); + log.info("{}", processedEvent); + writer.writeEvent(processedEvent.routingKey, processedEvent); + } + + /** + * If {@link #process} did not completely process prior events, it must do so before returning. + * If writing to a Pravega stream, this should call {@link EventStreamWriter#flush}. + */ + @Override + public void flush() { + writer.flush(); + } +} diff --git a/pravega-client-examples/src/test/java/io/pravega/example/streamprocessing/AtLeastOnceProcessorInstrumented.java b/pravega-client-examples/src/test/java/io/pravega/example/streamprocessing/AtLeastOnceProcessorInstrumented.java new file mode 100644 index 00000000..ee8e0425 --- /dev/null +++ b/pravega-client-examples/src/test/java/io/pravega/example/streamprocessing/AtLeastOnceProcessorInstrumented.java @@ -0,0 +1,116 @@ +package io.pravega.example.streamprocessing; + +import io.pravega.client.stream.EventRead; +import io.pravega.client.stream.EventStreamReader; +import io.pravega.client.stream.EventStreamWriter; +import io.pravega.common.util.ReusableLatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +/** + * An AtLeastOnceProcessor that is instrumented for testing. + */ +public class AtLeastOnceProcessorInstrumented extends AtLeastOnceProcessor { + private static final Logger log = LoggerFactory.getLogger(AtLeastOnceProcessorInstrumented.class); + + private final int instanceId; + private final EventStreamWriter writer; + + private final ReusableLatch pauseLatch = new ReusableLatch(true); + private final AtomicLong unflushedEventCount = new AtomicLong(0); + private final AtomicBoolean induceFailureDuringFlushFlag = new AtomicBoolean(false); + private final AtomicBoolean induceFailureDuringProcessFlag = new AtomicBoolean(false); + private final AtomicReference writeModeRef = new AtomicReference<>(); + // A queue containing unflushed events. + private final List queue = new ArrayList<>(); + + public AtLeastOnceProcessorInstrumented( + Supplier pruner, + Supplier> reader, + long readTimeoutMillis, + WriteMode writeMode, + int instanceId, + EventStreamWriter writer) { + super(pruner, reader, readTimeoutMillis); + writeModeRef.set(writeMode); + this.instanceId = instanceId; + this.writer = writer; + } + + @Override + public void process(EventRead eventRead) throws Exception { + final TestEvent event = eventRead.getEvent(); + event.processedByInstanceId = instanceId; + final WriteMode writeMode = writeModeRef.get(); + log.info("process: writeMode={}, event={}", writeMode, event); + if (induceFailureDuringProcessFlag.get()) { + throw new RuntimeException("induceFailureDuringProcess is set"); + } + if (writeMode == WriteMode.AlwaysHoldUntilFlushed) { + queue.add(event); + unflushedEventCount.incrementAndGet(); + } else { + final CompletableFuture future = writer.writeEvent(Integer.toString(event.key), event); + if (writeMode == WriteMode.AlwaysDurable) { + future.get(); + } else { + unflushedEventCount.incrementAndGet(); + } + } + } + + @Override + public void flush() { + if (induceFailureDuringFlushFlag.get()) { + log.warn("induceFailureDuringFlushFlag is set"); + throw new RuntimeException("induceFailureDuringFlushFlag is set"); + } + log.info("flush: Writing {} queued events", queue.size()); + queue.forEach((event) -> writer.writeEvent(Integer.toString(event.key), event)); + queue.clear(); + writer.flush(); + final long flushedEventCount = unflushedEventCount.getAndSet(0); + log.info("flush: Flushed {} events", flushedEventCount); + } + + @Override + protected void injectFault(ReaderGroupPruner pruner) throws Exception { + if (!pauseLatch.isReleased()) { + log.warn("injectFault: BEGIN"); + // Pause pruner (but do not close it). This will also pause the membership synchronizer. + pruner.pause(); + // Halt this processor thread until the latch is released. + pauseLatch.await(); + pruner.unpause(); + log.warn("injectFault: END"); + } + } + + public void pause() { + pauseLatch.reset(); + } + + public void unpause() { + pauseLatch.release(); + } + + public void induceFailureDuringProcess() { + induceFailureDuringProcessFlag.set(true); + } + + public void induceFailureDuringFlush() { + induceFailureDuringFlushFlag.set(true); + } + + public void setWriteMode(WriteMode writeMode) { + writeModeRef.set(writeMode); + } +} diff --git a/pravega-client-examples/src/test/java/io/pravega/example/streamprocessing/MissingEventException.java b/pravega-client-examples/src/test/java/io/pravega/example/streamprocessing/MissingEventException.java new file mode 100644 index 00000000..bcc46a48 --- /dev/null +++ b/pravega-client-examples/src/test/java/io/pravega/example/streamprocessing/MissingEventException.java @@ -0,0 +1,17 @@ +/* + * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + */ +package io.pravega.example.streamprocessing; + +class MissingEventException extends RuntimeException { + public MissingEventException(String s) { + super(s); + } +} diff --git a/pravega-client-examples/src/test/java/io/pravega/example/streamprocessing/NoMoreEventsException.java b/pravega-client-examples/src/test/java/io/pravega/example/streamprocessing/NoMoreEventsException.java new file mode 100644 index 00000000..b3c863cc --- /dev/null +++ b/pravega-client-examples/src/test/java/io/pravega/example/streamprocessing/NoMoreEventsException.java @@ -0,0 +1,17 @@ +/* + * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + */ +package io.pravega.example.streamprocessing; + +class NoMoreEventsException extends RuntimeException { + public NoMoreEventsException(String s) { + super(s); + } +} diff --git a/pravega-client-examples/src/test/java/io/pravega/example/streamprocessing/StreamProcessingTest.java b/pravega-client-examples/src/test/java/io/pravega/example/streamprocessing/StreamProcessingTest.java new file mode 100644 index 00000000..3ce14471 --- /dev/null +++ b/pravega-client-examples/src/test/java/io/pravega/example/streamprocessing/StreamProcessingTest.java @@ -0,0 +1,544 @@ +/* + * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + */ +package io.pravega.example.streamprocessing; + +import com.google.common.collect.Iterators; +import com.google.common.collect.Sets; +import com.google.gson.reflect.TypeToken; +import io.pravega.client.ClientConfig; +import io.pravega.client.EventStreamClientFactory; +import io.pravega.client.admin.ReaderGroupManager; +import io.pravega.client.admin.StreamManager; +import io.pravega.client.stream.EventStreamReader; +import io.pravega.client.stream.EventStreamWriter; +import io.pravega.client.stream.EventWriterConfig; +import io.pravega.client.stream.ReaderConfig; +import io.pravega.client.stream.ReaderGroupConfig; +import io.pravega.client.stream.Serializer; +import io.pravega.client.stream.Stream; +import io.pravega.utils.EventStreamReaderIterator; +import io.pravega.utils.SetupUtils; +import lombok.Builder; +import lombok.Cleanup; +import lombok.RequiredArgsConstructor; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.text.MessageFormat; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public class StreamProcessingTest { + static final Logger log = LoggerFactory.getLogger(StreamProcessingTest.class); + + protected static final AtomicReference SETUP_UTILS = new AtomicReference<>(); + + @BeforeClass + public static void setup() throws Exception { + SETUP_UTILS.set(new SetupUtils()); + SETUP_UTILS.get().startAllServices(); + } + + @AfterClass + public static void tearDown() throws Exception { + SETUP_UTILS.get().stopAllServices(); + } + + void sleep(long millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @RequiredArgsConstructor + static class TestContext { + final EventStreamWriter writer; + final EventStreamReaderIterator readerIterator; + final TestEventGenerator generator; + final TestEventValidator validator; + final WorkerProcessGroup workerProcessGroup; + final long checkpointPeriodMs; + } + + /** + * Write the given number of events to the Pravega input stream. + * + * @param ctx provides access to the generator, writer, etc. + * @param numEvents number of events to write + */ + void writeEvents(TestContext ctx, int numEvents) { + Iterators.limit(ctx.generator, numEvents).forEachRemaining(event -> ctx.writer.writeEvent(Integer.toString(event.key), event)); + } + + /** + * Read events from the Pravega output stream to ensure that the processors produced the correct result. + * + * @param ctx provides access to the validator, reader, etc. + * @param expectedInstanceIds All read events must have a processedByInstanceId in this set. + */ + void validateEvents(TestContext ctx, int[] expectedInstanceIds) { + log.info("validateEvents: BEGIN"); + // Read events from output stream. Return when complete or throw exception if out of order or timeout. + ctx.validator.clearCounters(); + ctx.validator.validate(ctx.readerIterator, ctx.generator.getLastSequenceNumbers()); + // Confirm that only instances in expectedInstanceIds have processed the events. + final Map eventCountByInstanceId = ctx.validator.getEventCountByInstanceId(); + final Set actualInstanceIds = eventCountByInstanceId.keySet(); + final Set expectedInstanceIdsSet = Arrays.stream(expectedInstanceIds).boxed().collect(Collectors.toCollection(HashSet::new)); + log.info("validateEvents: eventCountByInstanceId={}, expectedInstanceIdsSet={}", eventCountByInstanceId, expectedInstanceIdsSet); + Assert.assertTrue(MessageFormat.format("eventCountByInstanceId={0}, expectedInstanceIdsSet={1}", eventCountByInstanceId, expectedInstanceIdsSet), + Sets.difference(actualInstanceIds, expectedInstanceIdsSet).isEmpty()); + // Warn if any instances are idle. This cannot be an assertion because this may happen under normal conditions. + final Sets.SetView idleInstanceIds = Sets.difference(expectedInstanceIdsSet, actualInstanceIds); + if (!idleInstanceIds.isEmpty()) { + log.warn("validateEvents: Some instances processed no events; eventCountByInstanceId={}, expectedInstanceIdsSet={}", + eventCountByInstanceId, expectedInstanceIdsSet); + } + log.info("validateEvents: END"); + } + + /** + * Write the given number of events to the Pravega input stream. + * Then read events from the Pravega output stream to ensure that the processors produced the correct result. + * + * @param ctx provides access to the generator, writer, etc. + * @param numEvents number of events to write + * @param expectedInstanceIds All read events must have a processedByInstanceId in this set. + */ + void writeEventsAndValidate(TestContext ctx, int numEvents, int[] expectedInstanceIds) { + writeEvents(ctx, numEvents); + validateEvents(ctx, expectedInstanceIds); + } + + /** + * Write events to a Pravega stream, read events from the same stream, and validate expected ordering. + */ + @Test + public void noProcessorTest() throws Exception { + final String methodName = (new Object() {}).getClass().getEnclosingMethod().getName(); + log.info("Test case: {}", methodName); + + // Create stream. + final String scope = SETUP_UTILS.get().getScope(); + final ClientConfig clientConfig = SETUP_UTILS.get().getClientConfig(); + final String inputStreamName = "stream-" + UUID.randomUUID().toString(); + SETUP_UTILS.get().createTestStream(inputStreamName, 6); + @Cleanup + final EventStreamClientFactory clientFactory = EventStreamClientFactory.withScope(scope, clientConfig); + + // Prepare writer that will write to the stream. + final Serializer serializer = new JSONSerializer<>(new TypeToken() {}.getType()); + final EventWriterConfig eventWriterConfig = EventWriterConfig.builder().build(); + @Cleanup + final EventStreamWriter writer = clientFactory.createEventWriter(inputStreamName, serializer, eventWriterConfig); + + // Prepare reader that will read from the stream. + final String outputStreamName = inputStreamName; + final String readerGroup = "rg" + UUID.randomUUID().toString().replace("-", ""); + final String readerId = "reader-" + UUID.randomUUID().toString(); + final ReaderConfig readerConfig = ReaderConfig.builder().build(); + final ReaderGroupConfig readerGroupConfig = ReaderGroupConfig.builder() + .stream(SETUP_UTILS.get().getStream(outputStreamName)) + .build(); + @Cleanup + final ReaderGroupManager readerGroupManager = ReaderGroupManager.withScope(scope, clientConfig); + readerGroupManager.createReaderGroup(readerGroup, readerGroupConfig); + @Cleanup + final EventStreamReader reader = clientFactory.createReader( + readerId, + readerGroup, + new JSONSerializer<>(new TypeToken() {}.getType()), + readerConfig); + EventStreamReaderIterator readerIterator = new EventStreamReaderIterator<>(reader, 30000); + + final TestEventGenerator generator = new TestEventGenerator(6); + final TestEventValidator validator = new TestEventValidator(); + final TestContext ctx = new TestContext(writer, readerIterator, generator, validator, null, 0); + + writeEventsAndValidate(ctx, 13, new int[]{-1}); + writeEventsAndValidate(ctx, 3, new int[]{-1}); + writeEventsAndValidate(ctx, 15, new int[]{-1}); + } + + @Builder + protected static class EndToEndTestConfig { + // number of stream segments + @Builder.Default + public final int numSegments = 1; + // number of unique routi + @Builder.Default + public final int numKeys = 1; + // number of initial processor instances + @Builder.Default + public final int numInitialInstances = 1; + @Builder.Default + public final long checkpointPeriodMs = 1000; + @Builder.Default + public final long heartbeatIntervalMillis = 1000; + @Builder.Default + public final WriteMode writeMode = WriteMode.Default; + // test-specific function to write and validate events, etc. + @Builder.Default + public final Consumer func = (ctx) -> {}; + } + + /** + * Write events to the input stream. Start multiple processors which can read from the input stream + * and write to the output stream. Validate events in the output stream. + * This method performs the setup and teardown. The provided function func performs the actual write and read + * of events; stops, pauses, and starts processor instances; and validates results. + */ + private void run(EndToEndTestConfig config) throws Exception { + final String methodName = (new Object() {}).getClass().getEnclosingMethod().getName(); + log.info("Test case: {}: BEGIN", methodName); + + final String scope = SETUP_UTILS.get().getScope(); + final ClientConfig clientConfig = SETUP_UTILS.get().getClientConfig(); + final String inputStreamName = "input-stream-" + UUID.randomUUID().toString(); + final String outputStreamName = "output-stream-" + UUID.randomUUID().toString(); + final String membershipSynchronizerStreamName = "ms-" + UUID.randomUUID().toString(); + final String inputStreamReaderGroupName = "rg" + UUID.randomUUID().toString().replace("-", ""); + + @Cleanup + StreamManager streamManager = StreamManager.create(clientConfig); + + final WorkerProcessConfig workerProcessConfig = WorkerProcessConfig.builder() + .scope(scope) + .clientConfig(clientConfig) + .readerGroupName(inputStreamReaderGroupName) + .inputStreamName(inputStreamName) + .outputStreamName(outputStreamName) + .membershipSynchronizerStreamName(membershipSynchronizerStreamName) + .numSegments(config.numSegments) + .checkpointPeriodMs(config.checkpointPeriodMs) + .heartbeatIntervalMillis(config.heartbeatIntervalMillis) + .writeMode(config.writeMode) + .build(); + @Cleanup + final WorkerProcessGroup workerProcessGroup = WorkerProcessGroup.builder().config(workerProcessConfig).build(); + + // Start initial set of processors. This will also create the necessary streams. + workerProcessGroup.start(IntStream.range(0, config.numInitialInstances).toArray()); + + // Prepare generator writer that will write to the stream read by the processor. + @Cleanup + final EventStreamClientFactory clientFactory = EventStreamClientFactory.withScope(scope, clientConfig); + final Serializer serializer = new JSONSerializer<>(new TypeToken(){}.getType()); + final EventWriterConfig eventWriterConfig = EventWriterConfig.builder().build(); + @Cleanup + final EventStreamWriter writer = clientFactory.createEventWriter(inputStreamName, serializer, eventWriterConfig); + + // Prepare validation reader that will read from the stream written by the processor. + final String validationReaderGroupName = "rg" + UUID.randomUUID().toString().replace("-", ""); + final String validationReaderId = "reader-" + UUID.randomUUID().toString(); + final ReaderConfig validationReaderConfig = ReaderConfig.builder().build(); + final ReaderGroupConfig validationReaderGroupConfig = ReaderGroupConfig.builder() + .stream(Stream.of(scope, outputStreamName)) + .build(); + @Cleanup + final ReaderGroupManager readerGroupManager = ReaderGroupManager.withScope(scope, clientConfig); + readerGroupManager.createReaderGroup(validationReaderGroupName, validationReaderGroupConfig); + @Cleanup + final EventStreamReader validationReader = clientFactory.createReader( + validationReaderId, + validationReaderGroupName, + new JSONSerializer<>(new TypeToken(){}.getType()), + validationReaderConfig); + final long readTimeoutMills = 60000; + EventStreamReaderIterator readerIterator = new EventStreamReaderIterator<>(validationReader, readTimeoutMills); + final TestEventGenerator generator = new TestEventGenerator(config.numKeys); + final TestEventValidator validator = new TestEventValidator(); + final TestContext ctx = new TestContext(writer, readerIterator, generator, validator, workerProcessGroup, config.checkpointPeriodMs); + config.func.accept(ctx); + + log.info("Test case: {}: CLEANUP", methodName); + workerProcessGroup.close(); + validationReader.close(); + readerGroupManager.deleteReaderGroup(inputStreamReaderGroupName); + readerGroupManager.deleteReaderGroup(validationReaderGroupName); + streamManager.sealStream(scope, inputStreamName); + streamManager.sealStream(scope, outputStreamName); + streamManager.sealStream(scope, membershipSynchronizerStreamName); + streamManager.deleteStream(scope, inputStreamName); + streamManager.deleteStream(scope, outputStreamName); + streamManager.deleteStream(scope, membershipSynchronizerStreamName); + log.info("Test case: {}: END", methodName); + } + + /** + * Write 20 events with 1 routing key, run 1 processor instance, and validate results. + * @throws Exception + */ + @Test + public void trivialTest() throws Exception { + run(EndToEndTestConfig.builder() + .numSegments(1) + .numKeys(1) + .numInitialInstances(1) + .writeMode(WriteMode.Default) + .func(ctx -> { + writeEventsAndValidate(ctx, 20, new int[]{0}); + Assert.assertEquals(0, ctx.validator.getDuplicateEventCount()); + }).build()); + } + + /** + * Gracefully stop 1 of 1 processor instances. + */ + @Test + public void gracefulRestart1of1Test() throws Exception { + run(EndToEndTestConfig.builder() + .numSegments(6) + .numKeys(24) + .numInitialInstances(1) + .writeMode(WriteMode.Default) + .func(ctx -> { + writeEventsAndValidate(ctx, 100, new int[]{0}); + ctx.workerProcessGroup.stop(0); + ctx.workerProcessGroup.start(1); + writeEventsAndValidate(ctx, 90, new int[]{1}); + Assert.assertEquals(0, ctx.validator.getDuplicateEventCount()); + }).build()); + } + + /** + * Gracefully stop 1 of 1 processor instances. + * Force each events to be durably written immediately. + */ + @Test + public void gracefulRestart1of1DurableTest() throws Exception { + run(EndToEndTestConfig.builder() + .numSegments(6) + .numKeys(24) + .numInitialInstances(1) + .writeMode(WriteMode.AlwaysDurable) + .func(ctx -> { + writeEventsAndValidate(ctx, 100, new int[]{0}); + ctx.workerProcessGroup.stop(0); + ctx.workerProcessGroup.start(1); + writeEventsAndValidate(ctx, 90, new int[]{1}); + Assert.assertEquals(0, ctx.validator.getDuplicateEventCount()); + }).build()); + } + + /** + * Gracefully stop 1 of 1 processor instances. + * Do not writes events to Pravega until flushed. + */ + @Test + public void gracefulRestart1of1DHoldUntilFlushedTest() throws Exception { + run(EndToEndTestConfig.builder() + .numSegments(6) + .numKeys(24) + .numInitialInstances(1) + .writeMode(WriteMode.AlwaysHoldUntilFlushed) + .func(ctx -> { + writeEventsAndValidate(ctx, 100, new int[]{0}); + ctx.workerProcessGroup.stop(0); + ctx.workerProcessGroup.start(1); + writeEventsAndValidate(ctx, 90, new int[]{1}); + Assert.assertEquals(0, ctx.validator.getDuplicateEventCount()); + }).build()); + } + + /** + * Gracefully stop 1 of 2 processor instances. + */ + @Test + public void gracefulStop1of2Test() throws Exception { + run(EndToEndTestConfig.builder() + .numSegments(6) + .numKeys(24) + .numInitialInstances(2) + .writeMode(WriteMode.Default) + .func(ctx -> { + writeEventsAndValidate(ctx, 100, new int[]{0, 1}); + ctx.workerProcessGroup.stop(0); + writeEventsAndValidate(ctx, 90, new int[]{1}); + Assert.assertEquals(0, ctx.validator.getDuplicateEventCount()); + }).build()); + } + + /** + * Kill and restart 1 of 1 processor instances. + * This is simulated by pausing the first instance until the test completes. + * This may produce duplicates. + */ + @Test + public void killAndRestart1of1Test() throws Exception { + run(EndToEndTestConfig.builder() + .numSegments(6) + .numKeys(24) + .numInitialInstances(1) + .writeMode(WriteMode.Default) + .func(ctx -> { + writeEventsAndValidate(ctx, 100, new int[]{0}); + ctx.workerProcessGroup.get(0).pause(); + ctx.workerProcessGroup.start(1); + writeEventsAndValidate(ctx, 90, new int[]{1}); + log.info("getDuplicateEventCount={}", ctx.validator.getDuplicateEventCount()); + }).build()); + } + + /** + * Kill and restart 1 of 1 processor instances. + * This is simulated by pausing the first instance until the test completes. + * This tests waits for 2x the checkpoint interval to ensure that the + * first processor writes its position before pausing. + * This will not produce duplicates. + */ + @Test + public void killAndRestart1of1WhenIdleTest() throws Exception { + run(EndToEndTestConfig.builder() + .numSegments(6) + .numKeys(24) + .numInitialInstances(1) + .writeMode(WriteMode.Default) + .func(ctx -> { + writeEventsAndValidate(ctx, 100, new int[]{0}); + Assert.assertEquals(0, ctx.validator.getDuplicateEventCount()); + // Wait for a while to ensure that a checkpoint occurs and all events have been flushed. + // This will update the reader group state to indicate that this reader has read up to this point. + sleep(2*ctx.checkpointPeriodMs); + // Kill the worker instance. + ctx.workerProcessGroup.get(0).pause(); + // Start a new worker instance. It should identify the dead worker and call readerOffline(null). + // The new worker should resume exactly where the killed worker left off, producing no duplicates. + ctx.workerProcessGroup.start(1); + writeEventsAndValidate(ctx, 19, new int[]{1}); + Assert.assertEquals(0, ctx.validator.getDuplicateEventCount()); + }).build()); + } + + @Test(timeout = 2*60*1000) + public void handleExceptionDuringProcessTest() throws Exception { + run(EndToEndTestConfig.builder() + .numSegments(1) + .numKeys(1) + .numInitialInstances(1) + .writeMode(WriteMode.AlwaysHoldUntilFlushed) + .func(ctx -> { + // Force process function to throw an exception. + ctx.workerProcessGroup.get(0).induceFailureDuringProcess(); + // Write an event. + writeEvents(ctx, 1); + // Wait for process function to throw an exception. + sleep(2*ctx.checkpointPeriodMs); + // Start a new worker instance so that we can determine where it reads from. + // The event should have been processed exactly once. + ctx.workerProcessGroup.start(1); + validateEvents(ctx, new int[]{1}); + Assert.assertEquals(0, ctx.validator.getDuplicateEventCount()); + }).build()); + } + + @Test(timeout = 2*60*1000) + public void handleExceptionDuringFlushTest() throws Exception { + run(EndToEndTestConfig.builder() + .numSegments(6) + .numKeys(24) + .numInitialInstances(1) + .writeMode(WriteMode.AlwaysHoldUntilFlushed) + .func(ctx -> { + writeEventsAndValidate(ctx, 100, new int[]{0}); + Assert.assertEquals(0, ctx.validator.getDuplicateEventCount()); + // Wait for a while to ensure that a checkpoint occurs and all events have been flushed. + // This will update the reader group state to indicate that this reader has read up to this point. + sleep(2*ctx.checkpointPeriodMs); + // Force an exception during flush. + ctx.workerProcessGroup.get(0).induceFailureDuringFlush(); + // Write some events that will be processed and queued for writing (they will not be written though). + final int newEventCount = 3; + writeEvents(ctx, newEventCount); + // Wait for a while so that flush is called and throws an exception. + sleep(2*ctx.checkpointPeriodMs); + // Start a new worker instance so that we can determine where it reads from. + ctx.workerProcessGroup.start(1); + validateEvents(ctx, new int[]{1}); + Assert.assertEquals(0, ctx.validator.getDuplicateEventCount()); + }).build()); + } + + /** + * This test processes events and then kills the processor before it receives a checkpoint. + * This is expected to produce duplicates. + */ + @Test(timeout = 3*60*1000) + public void killAndRestart1of1ForcingDuplicatesTest() throws Exception { + for (;;) { + try { + run(EndToEndTestConfig.builder() + .numSegments(1) + .numKeys(1) + .numInitialInstances(1) + .writeMode(WriteMode.AlwaysDurable) + .func(ctx -> { + writeEventsAndValidate(ctx, 100, new int[]{0}); + Assert.assertEquals(0, ctx.validator.getDuplicateEventCount()); + // Wait for a while to ensure that a checkpoint occurs and all events have been flushed. + // This will update the reader group state to indicate that this reader has read up to this point. + sleep(2*ctx.checkpointPeriodMs); + // Write some events that will be processed and durably written. + final int expectedDuplicateEventCount = 3; + writeEventsAndValidate(ctx, expectedDuplicateEventCount, new int[]{0}); + Assert.assertEquals(0, ctx.validator.getDuplicateEventCount()); + // Kill the worker instance before it checkpoints and updates the reader group state. + // There is no control mechanism to prevent the checkpoint. + // If a checkpoint occurs here, this test will fail because duplicates will not be produced. + ctx.workerProcessGroup.get(0).pause(); + // Start a new worker instance. It should identify the dead worker and call readerOffline(null). + // The new worker should produce duplicates. + ctx.workerProcessGroup.start(1); + writeEventsAndValidate(ctx, 90, new int[]{1}); + Assert.assertEquals(expectedDuplicateEventCount, ctx.validator.getDuplicateEventCount()); + }).build()); + break; + } catch (AssertionError e) { + // This test will occasionally fail because a checkpoint may have occurred. + // Such a failure should be rare so retrying should eventually work. + log.warn("Retrying failed test", e); + } + } + } + + /** + * Kill 5 of 6 processor instances. + */ + @Test + public void kill5of6Test() throws Exception { + run(EndToEndTestConfig.builder() + .numSegments(6) + .numKeys(24) + .numInitialInstances(6) + .heartbeatIntervalMillis(200) + .writeMode(WriteMode.Default) + .func(ctx -> { + writeEventsAndValidate(ctx, 100, new int[]{0, 1, 2, 3, 4, 5}); + log.info("kill5of6Test: PAUSING WORKERS"); + IntStream.rangeClosed(0, 4).forEach(i -> ctx.workerProcessGroup.get(i).pause()); + log.info("kill5of6Test: WRITING MORE EVENTS"); + writeEventsAndValidate(ctx, 90, new int[]{5}); + }).build()); + } +} diff --git a/pravega-client-examples/src/test/java/io/pravega/example/streamprocessing/TestEvent.java b/pravega-client-examples/src/test/java/io/pravega/example/streamprocessing/TestEvent.java new file mode 100644 index 00000000..cd41b4cf --- /dev/null +++ b/pravega-client-examples/src/test/java/io/pravega/example/streamprocessing/TestEvent.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + */ +package io.pravega.example.streamprocessing; + +public class TestEvent { + public int key; + public long sequenceNumber; + public int processedByInstanceId; + + public TestEvent(int key, long sequenceNumber) { + this.key = key; + this.sequenceNumber = sequenceNumber; + this.processedByInstanceId = -1; + } + + public TestEvent(int key, long sequenceNumber, int processedByInstanceId) { + this.key = key; + this.sequenceNumber = sequenceNumber; + this.processedByInstanceId = processedByInstanceId; + } + + @Override + public String toString() { + return "TestEvent{" + + "key=" + key + + ", sequenceNumber=" + sequenceNumber + + ", processedByInstanceId=" + processedByInstanceId + + '}'; + } +} diff --git a/pravega-client-examples/src/test/java/io/pravega/example/streamprocessing/TestEventGenerator.java b/pravega-client-examples/src/test/java/io/pravega/example/streamprocessing/TestEventGenerator.java new file mode 100644 index 00000000..26128644 --- /dev/null +++ b/pravega-client-examples/src/test/java/io/pravega/example/streamprocessing/TestEventGenerator.java @@ -0,0 +1,57 @@ +/* + * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + */ +package io.pravega.example.streamprocessing; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +/** + * An Iterator that generates TestEvent elements. + * Events will have routing keys between 0 and numKeys - 1. + * For each routing key, sequence numbers will begin at 0 and increment by 1. + */ +public class TestEventGenerator implements Iterator { + static final Logger log = LoggerFactory.getLogger(TestEventGenerator.class); + + private final int numKeys; + private int lastKey; + // map from routing key to sequence number + private final Map lastSequenceNumbers; + + public TestEventGenerator(int numKeys) { + this.numKeys = numKeys; + this.lastKey = numKeys - 1; + lastSequenceNumbers = new HashMap<>(); + } + + @Override + public boolean hasNext() { + return true; + } + + @Override + public TestEvent next() { + lastKey = (lastKey + 1) % numKeys;; + final long sequenceNumber = lastSequenceNumbers.getOrDefault(lastKey, -1L) + 1; + lastSequenceNumbers.put(lastKey, sequenceNumber); + final TestEvent event = new TestEvent(lastKey, sequenceNumber); + log.info("event={}", event); + return event; + } + + public Map getLastSequenceNumbers() { + return new HashMap<>(lastSequenceNumbers); + } +} diff --git a/pravega-client-examples/src/test/java/io/pravega/example/streamprocessing/TestEventValidator.java b/pravega-client-examples/src/test/java/io/pravega/example/streamprocessing/TestEventValidator.java new file mode 100644 index 00000000..7551faef --- /dev/null +++ b/pravega-client-examples/src/test/java/io/pravega/example/streamprocessing/TestEventValidator.java @@ -0,0 +1,85 @@ +/* + * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + */ +package io.pravega.example.streamprocessing; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.text.MessageFormat; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +/** + * This validates that the sequence of events provided is consistent with the guarantees provided by the + * AtLeastOnceProcessor. + */ +public class TestEventValidator { + static final Logger log = LoggerFactory.getLogger(TestEventValidator.class); + + // Map from routing key to highest received sequence number. + private final Map receivedSequenceNumbers = new HashMap<>(); + // Map from instanceId to count of events processed by the instance, excluding duplicates. + private final Map eventCountByInstanceId = new HashMap<>(); + private long duplicateEventCount; + + public void validate(Iterator events, Map expectedLastSequenceNumbers) { + // pendingSequenceNumbers contains a map from key to sequence number for events that have been generated but not yet received by validate. + // A key is removed when all events up to the generated sequence number for that key are received. + final Map pendingSequenceNumbers = new HashMap<>(); + expectedLastSequenceNumbers.forEach((key, sequenceNumber) -> { + if (receivedSequenceNumbers.getOrDefault(key, -1L) < sequenceNumber) { + pendingSequenceNumbers.put(key, sequenceNumber); + } + }); + log.info("pendingSequenceNumbers={}", pendingSequenceNumbers); + while (events.hasNext()) { + final TestEvent event = events.next(); + final long lastReceivedSequenceNumber = receivedSequenceNumbers.getOrDefault(event.key, -1L); + log.info("event={}, lastReceivedSequenceNumber={}", event, lastReceivedSequenceNumber); + if (event.sequenceNumber <= lastReceivedSequenceNumber) { + log.warn("Duplicate event; event={}, lastReceivedSequenceNumber={}", event, lastReceivedSequenceNumber); + duplicateEventCount++; + } else if (event.sequenceNumber > lastReceivedSequenceNumber + 1) { + throw new MissingEventException("Detected missing event"); + } else { + receivedSequenceNumbers.put(event.key, event.sequenceNumber); + eventCountByInstanceId.merge(event.processedByInstanceId, 1L, Long::sum); // increment counter + if (pendingSequenceNumbers.getOrDefault(event.key, -1L) <= event.sequenceNumber) { + pendingSequenceNumbers.remove(event.key); + if (pendingSequenceNumbers.size() == 0) { + log.info("All data received; receivedSequenceNumbers={}", receivedSequenceNumbers); + return; + } + } + } + } + final String msg = MessageFormat.format( + "No more events but all expected events were not received; " + + "receivedSequenceNumbers={0}, expectedLastSequenceNumbers={1}", + receivedSequenceNumbers, expectedLastSequenceNumbers); + log.error(msg); + throw new NoMoreEventsException(msg); + } + + public void clearCounters() { + eventCountByInstanceId.clear(); + duplicateEventCount = 0; + } + + public Map getEventCountByInstanceId() { + return eventCountByInstanceId; + } + + public long getDuplicateEventCount() { + return duplicateEventCount; + } +} diff --git a/pravega-client-examples/src/test/java/io/pravega/example/streamprocessing/TestEventValidatorUnitTests.java b/pravega-client-examples/src/test/java/io/pravega/example/streamprocessing/TestEventValidatorUnitTests.java new file mode 100644 index 00000000..6ff6fbfb --- /dev/null +++ b/pravega-client-examples/src/test/java/io/pravega/example/streamprocessing/TestEventValidatorUnitTests.java @@ -0,0 +1,161 @@ +/* + * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + */ +package io.pravega.example.streamprocessing; + +import com.google.common.collect.ImmutableMap; +import io.pravega.test.common.AssertExtensions; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +/** + * Unit tests for TestEventValidator. + */ +public class TestEventValidatorUnitTests { + static final Logger log = LoggerFactory.getLogger(TestEventValidatorUnitTests.class); + + @Test + public void exactlyOnceTest() { + List events = Arrays.asList( + new TestEvent(0, 0L, 10), + new TestEvent(1, 0L, 10), + new TestEvent(1, 1L, 10), + new TestEvent(0, 1L, 10), + new TestEvent(0, 2L, 11), + new TestEvent(0, 3L, 11), + new TestEvent(1, 2L, 11)); + Map expectedLastSequenceNumbers = ImmutableMap.of( + 0, 3L, + 1, 2L); + Map expectedEventCountByInstanceId = ImmutableMap.of( + 10, 4L, + 11, 3L); + TestEventValidator validator = new TestEventValidator(); + validator.validate(events.iterator(), expectedLastSequenceNumbers); + Assert.assertEquals(expectedEventCountByInstanceId, validator.getEventCountByInstanceId()); + Assert.assertEquals(0, validator.getDuplicateEventCount()); + } + + @Test + public void duplicateEventTest() { + List events = Arrays.asList( + new TestEvent(0, 0L, 10), + new TestEvent(1, 0L, 10), + new TestEvent(1, 1L, 10), + new TestEvent(0, 0L, 11), // duplicate + new TestEvent(0, 1L, 11), + new TestEvent(0, 2L, 11), + new TestEvent(0, 3L, 11), + new TestEvent(1, 2L, 11)); + Map expectedLastSequenceNumbers = ImmutableMap.of( + 0, 3L, + 1, 2L); + Map expectedEventCountByInstanceId = ImmutableMap.of( + 10, 3L, + 11, 4L); + TestEventValidator validator = new TestEventValidator(); + validator.validate(events.iterator(), expectedLastSequenceNumbers); + Assert.assertEquals(expectedEventCountByInstanceId, validator.getEventCountByInstanceId()); + Assert.assertEquals(1, validator.getDuplicateEventCount()); + } + + @Test + public void rewindTest() { + List events = Arrays.asList( + new TestEvent(0, 0L, 10), + new TestEvent(1, 0L, 10), + new TestEvent(1, 1L, 10), + new TestEvent(1, 0L, 11), // rewind, duplicate + new TestEvent(1, 1L, 11), // duplicate + new TestEvent(0, 1L, 11), + new TestEvent(0, 2L, 11), + new TestEvent(0, 3L, 11), + new TestEvent(1, 2L, 11)); + Map expectedLastSequenceNumbers = ImmutableMap.of( + 0, 3L, + 1, 2L); + Map expectedEventCountByInstanceId = ImmutableMap.of( + 10, 3L, + 11, 4L); + TestEventValidator validator = new TestEventValidator(); + validator.validate(events.iterator(), expectedLastSequenceNumbers); + Assert.assertEquals(expectedEventCountByInstanceId, validator.getEventCountByInstanceId()); + Assert.assertEquals(2, validator.getDuplicateEventCount()); + } + + @Test + public void clearCountersTest() { + List events1 = Arrays.asList( + new TestEvent(0, 0L, 10), + new TestEvent(1, 0L, 10), + new TestEvent(1, 1L, 10), + new TestEvent(1, 0L, 11), // rewind, duplicate + new TestEvent(1, 1L, 11), // duplicate + new TestEvent(0, 1L, 11), + new TestEvent(0, 2L, 11), + new TestEvent(0, 3L, 11), + new TestEvent(1, 2L, 11)); + Map expectedLastSequenceNumbers1 = ImmutableMap.of( + 0, 3L, + 1, 2L); + TestEventValidator validator = new TestEventValidator(); + validator.validate(events1.iterator(), expectedLastSequenceNumbers1); + validator.clearCounters(); + List events2 = Arrays.asList( + new TestEvent(100, 0L, 10), + new TestEvent(100, 0L, 10), // duplicate + new TestEvent(100, 1L, 10)); + Map expectedLastSequenceNumbers2 = ImmutableMap.of( + 100, 1L); + Map expectedEventCountByInstanceId2 = ImmutableMap.of( + 10, 2L); + validator.validate(events2.iterator(), expectedLastSequenceNumbers2); + Assert.assertEquals(expectedEventCountByInstanceId2, validator.getEventCountByInstanceId()); + Assert.assertEquals(1, validator.getDuplicateEventCount()); + } + + @Test + public void missingEventTest() { + List events = Arrays.asList( + new TestEvent(0, 0L, 10), + new TestEvent(1, 0L, 10), + new TestEvent(1, 1L, 10), + new TestEvent(0, 2L, 11), // missing prior event + new TestEvent(0, 3L, 11), + new TestEvent(1, 2L, 11)); + Map expectedLastSequenceNumbers = ImmutableMap.of( + 0, 3L, + 1, 2L); + TestEventValidator validator = new TestEventValidator(); + AssertExtensions.assertThrows(MissingEventException.class, () -> validator.validate(events.iterator(), expectedLastSequenceNumbers)); + } + + @Test + public void missingFinalEventTest() { + List events = Arrays.asList( + new TestEvent(0, 0L, 10), + new TestEvent(1, 0L, 10), + new TestEvent(1, 1L, 10), // missing following event + new TestEvent(0, 1L, 10), + new TestEvent(0, 2L, 11), + new TestEvent(0, 3L, 11)); + Map expectedLastSequenceNumbers = ImmutableMap.of( + 0, 3L, + 1, 2L); + TestEventValidator validator = new TestEventValidator(); + AssertExtensions.assertThrows(NoMoreEventsException.class, () -> validator.validate(events.iterator(), expectedLastSequenceNumbers)); + } +} diff --git a/pravega-client-examples/src/test/java/io/pravega/example/streamprocessing/WorkerProcess.java b/pravega-client-examples/src/test/java/io/pravega/example/streamprocessing/WorkerProcess.java new file mode 100644 index 00000000..78c4fec9 --- /dev/null +++ b/pravega-client-examples/src/test/java/io/pravega/example/streamprocessing/WorkerProcess.java @@ -0,0 +1,136 @@ +/* + * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + */ +package io.pravega.example.streamprocessing; + +import com.google.common.util.concurrent.AbstractExecutionThreadService; +import com.google.gson.reflect.TypeToken; +import io.pravega.client.EventStreamClientFactory; +import io.pravega.client.SynchronizerClientFactory; +import io.pravega.client.admin.ReaderGroupManager; +import io.pravega.client.admin.StreamManager; +import io.pravega.client.stream.EventStreamWriter; +import io.pravega.client.stream.EventWriterConfig; +import io.pravega.client.stream.ReaderConfig; +import io.pravega.client.stream.ReaderGroup; +import io.pravega.client.stream.ReaderGroupConfig; +import io.pravega.client.stream.ScalingPolicy; +import io.pravega.client.stream.Serializer; +import io.pravega.client.stream.Stream; +import io.pravega.client.stream.StreamConfiguration; +import lombok.Builder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; + +/** + * This simulates an independent process running an AtLeastOnceProcessor. + */ +@Builder +public class WorkerProcess extends AbstractExecutionThreadService { + private static final Logger log = LoggerFactory.getLogger(WorkerProcess.class); + + private final WorkerProcessConfig config; + private final int instanceId; + + private final AtomicReference processor = new AtomicReference<>(); + + // Create the input, output, and state synchronizer streams (ignored if they already exist). + public void init() { + log.info("init: BEGIN"); + try (StreamManager streamManager = StreamManager.create(config.clientConfig)) { + streamManager.createScope(config.scope); + final StreamConfiguration streamConfig = StreamConfiguration.builder() + .scalingPolicy(ScalingPolicy.fixed(config.numSegments)).build(); + streamManager.createStream(config.scope, config.inputStreamName, streamConfig); + streamManager.createStream(config.scope, config.outputStreamName, streamConfig); + streamManager.createStream( + config.scope, + config.membershipSynchronizerStreamName, + StreamConfiguration.builder().scalingPolicy(ScalingPolicy.fixed(1)).build()); + } + final ReaderGroupConfig readerGroupConfig = ReaderGroupConfig.builder() + .stream(Stream.of(config.scope, config.inputStreamName)) + .automaticCheckpointIntervalMillis(config.checkpointPeriodMs) + .build(); + try (ReaderGroupManager readerGroupManager = ReaderGroupManager.withScope(config.scope, config.clientConfig)) { + // Create the Reader Group (ignored if it already exists) + readerGroupManager.createReaderGroup(config.readerGroupName, readerGroupConfig); + } + log.info("init: END"); + } + + @Override + protected void run() throws Exception { + Serializer serializer = new JSONSerializer<>(new TypeToken() {}.getType()); + try (ReaderGroupManager readerGroupManager = ReaderGroupManager.withScope(config.scope, config.clientConfig)) { + final ReaderGroup readerGroup = readerGroupManager.getReaderGroup(config.readerGroupName); + // Create client factories. + try (EventStreamClientFactory eventStreamClientFactory = EventStreamClientFactory.withScope(config.scope, config.clientConfig); + SynchronizerClientFactory synchronizerClientFactory = SynchronizerClientFactory.withScope(config.scope, config.clientConfig); + // Create a Pravega stream writer that we will send our processed output to. + EventStreamWriter writer = eventStreamClientFactory.createEventWriter( + config.outputStreamName, + serializer, + EventWriterConfig.builder().build())) { + + final AtLeastOnceProcessorInstrumented proc = new AtLeastOnceProcessorInstrumented( + () -> ReaderGroupPruner.create( + readerGroup, + config.membershipSynchronizerStreamName, + Integer.toString(instanceId), + synchronizerClientFactory, + Executors.newScheduledThreadPool(1), + config.heartbeatIntervalMillis), + () -> eventStreamClientFactory.createReader( + Integer.toString(instanceId), + readerGroup.getGroupName(), + serializer, + ReaderConfig.builder().build()), + config.readTimeoutMillis, + config.writeMode, + instanceId, + writer); + processor.set(proc); + proc.startAsync(); + proc.awaitTerminated(); + } + } + } + + @Override + protected void triggerShutdown() { + log.info("triggerShutdown: BEGIN"); + final AtLeastOnceProcessorInstrumented proc = processor.getAndSet(null); + if (proc != null) { + proc.stopAsync(); + proc.unpause(); + } + log.info("triggerShutdown: END"); + } + + public void pause() { + processor.get().pause(); + } + + public void induceFailureDuringFlush() { + processor.get().induceFailureDuringFlush(); + } + + public void induceFailureDuringProcess() { + processor.get().induceFailureDuringProcess(); + } + + public void setWriteMode(WriteMode mode) { + processor.get().setWriteMode(mode); + } +} diff --git a/pravega-client-examples/src/test/java/io/pravega/example/streamprocessing/WorkerProcessConfig.java b/pravega-client-examples/src/test/java/io/pravega/example/streamprocessing/WorkerProcessConfig.java new file mode 100644 index 00000000..16d79b6e --- /dev/null +++ b/pravega-client-examples/src/test/java/io/pravega/example/streamprocessing/WorkerProcessConfig.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + */ +package io.pravega.example.streamprocessing; + +import io.pravega.client.ClientConfig; +import lombok.Builder; + +@Builder +public class WorkerProcessConfig { + public final String scope; + public final ClientConfig clientConfig; + public final String readerGroupName; + public final String inputStreamName; + public final String outputStreamName; + public final String membershipSynchronizerStreamName; + public final int numSegments; + @Builder.Default + public final long checkpointPeriodMs = 1000; + @Builder.Default + public final long heartbeatIntervalMillis = 1000; + @Builder.Default + public final long readTimeoutMillis = 1000; + @Builder.Default + public final WriteMode writeMode = WriteMode.Default; +} diff --git a/pravega-client-examples/src/test/java/io/pravega/example/streamprocessing/WorkerProcessGroup.java b/pravega-client-examples/src/test/java/io/pravega/example/streamprocessing/WorkerProcessGroup.java new file mode 100644 index 00000000..d9675495 --- /dev/null +++ b/pravega-client-examples/src/test/java/io/pravega/example/streamprocessing/WorkerProcessGroup.java @@ -0,0 +1,84 @@ +/* + * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + */ +package io.pravega.example.streamprocessing; + +import lombok.Builder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.stream.IntStream; + +/** + * This manages a group of WorkerProcess instances. + * The testing framework in {@link StreamProcessingTest} uses this class to + * start and stop multiple instances of {@link AtLeastOnceProcessorInstrumented} + * in a single process. + */ +@Builder +public class WorkerProcessGroup implements AutoCloseable { + private static final Logger log = LoggerFactory.getLogger(WorkerProcessGroup.class); + + private final WorkerProcessConfig config; + private final Map workers = new HashMap<>(); + private WriteMode writeMode; + + public WorkerProcess get(int instanceId) { + return workers.get(instanceId); + } + + /** + * Streams are guaranteed to exist after calling this method. + */ + public void start(int... instanceIds) { + IntStream.of(instanceIds).forEach(instanceId -> { + log.info("start: instanceId={}", instanceId); + workers.putIfAbsent(instanceId, WorkerProcess.builder().config(config).instanceId(instanceId).build()); + }); + IntStream.of(instanceIds).parallel().forEach(instanceId -> { + final WorkerProcess worker = workers.get(instanceId); + worker.init(); + worker.startAsync(); + }); + } + + /** + * Processors are guaranteed to not process events after this method returns. + */ + public void stop(int... instanceIds) { + IntStream.of(instanceIds).forEach(instanceId -> { + workers.get(instanceId).stopAsync(); + }); + IntStream.of(instanceIds).forEach(instanceId -> { + try { + workers.get(instanceId).awaitTerminated(); + } catch (Exception e) { + log.warn("stop", e); + } + workers.remove(instanceId); + }); + } + + protected int[] getInstanceIds() { + return workers.keySet().stream().mapToInt(i -> i).toArray(); + } + + public void stopAll() { + log.info("stopAll: workers={}", workers); + stop(getInstanceIds()); + } + + @Override + public void close() throws Exception { + stopAll(); + } +} diff --git a/pravega-client-examples/src/test/java/io/pravega/example/streamprocessing/WriteMode.java b/pravega-client-examples/src/test/java/io/pravega/example/streamprocessing/WriteMode.java new file mode 100644 index 00000000..0346d7ce --- /dev/null +++ b/pravega-client-examples/src/test/java/io/pravega/example/streamprocessing/WriteMode.java @@ -0,0 +1,17 @@ +/* + * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + */ +package io.pravega.example.streamprocessing; + +public enum WriteMode { + Default, + AlwaysHoldUntilFlushed, + AlwaysDurable, +} diff --git a/pravega-client-examples/src/test/java/io/pravega/utils/EventStreamReaderIterator.java b/pravega-client-examples/src/test/java/io/pravega/utils/EventStreamReaderIterator.java new file mode 100644 index 00000000..0b28be21 --- /dev/null +++ b/pravega-client-examples/src/test/java/io/pravega/utils/EventStreamReaderIterator.java @@ -0,0 +1,70 @@ +/* + * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + */ +package io.pravega.utils; + +import io.pravega.client.stream.EventRead; +import io.pravega.client.stream.EventStreamReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Iterator; +import java.util.Optional; + +/** + * An Iterator for reading from a Pravega stream. + */ +public class EventStreamReaderIterator implements Iterator { + static final Logger log = LoggerFactory.getLogger(EventStreamReaderIterator.class); + + private final EventStreamReader reader; + private final long timeoutMillis; + private Optional nextEvent = Optional.empty(); + + public EventStreamReaderIterator(EventStreamReader reader, long timeoutMillis) { + this.reader = reader; + this.timeoutMillis = timeoutMillis; + } + + @Override + public boolean hasNext() { + readIfNeeded(); + return nextEvent.isPresent(); + } + + @Override + public T next() { + readIfNeeded(); + if (nextEvent.isPresent()) { + final T event = nextEvent.get(); + nextEvent = Optional.empty(); + return event; + } else { + throw new RuntimeException("Timeout"); + } + } + + private void readIfNeeded() { + if (!nextEvent.isPresent()) { + final long t0 = System.nanoTime(); + long nextTimeoutMillis = timeoutMillis; + while (nextTimeoutMillis >= 0) { + final EventRead eventRead = reader.readNextEvent(nextTimeoutMillis); + if (!eventRead.isCheckpoint()) { + if (eventRead.getEvent() != null) { + nextEvent = Optional.of(eventRead.getEvent()); + } + return; + } + nextTimeoutMillis = timeoutMillis - (System.nanoTime() - t0) / 1000 / 1000; + } + } + } +} diff --git a/pravega-client-examples/src/test/java/io/pravega/utils/SetupUtils.java b/pravega-client-examples/src/test/java/io/pravega/utils/SetupUtils.java new file mode 100644 index 00000000..a51713dc --- /dev/null +++ b/pravega-client-examples/src/test/java/io/pravega/utils/SetupUtils.java @@ -0,0 +1,280 @@ +/* + * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + */ +package io.pravega.utils; + +import com.google.common.base.Preconditions; +import io.pravega.client.ClientConfig; +import io.pravega.client.admin.StreamManager; +import io.pravega.client.stream.ScalingPolicy; +import io.pravega.client.stream.Stream; +import io.pravega.client.stream.StreamConfiguration; +import io.pravega.common.concurrent.ExecutorServiceHelpers; +import io.pravega.local.InProcPravegaCluster; +import lombok.Cleanup; +import lombok.Getter; +import lombok.Setter; +import org.apache.commons.lang3.RandomStringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.NotThreadSafe; +import java.io.IOException; +import java.net.ServerSocket; +import java.net.URI; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Utility functions for creating an in-process Pravega server or connecting to an external Pravega server. + */ +@NotThreadSafe +public final class SetupUtils { + static Logger log = LoggerFactory.getLogger(SetupUtils.class); + private static final ScheduledExecutorService DEFAULT_SCHEDULED_EXECUTOR_SERVICE = ExecutorServiceHelpers.newScheduledThreadPool(3, "SetupUtils"); + + private final PravegaGateway gateway; + + // Manage the state of the class. + private final AtomicBoolean started = new AtomicBoolean(false); + + // auth enabled by default. Set it to false to disable Pravega authentication and authorization. + @Setter + private boolean enableAuth = false; + + // Set to true to enable TLS + @Setter + private boolean enableTls = false; + + private boolean enableRestServer = true; + + // The test Scope name. + @Getter + private final String scope = RandomStringUtils.randomAlphabetic(20); + + public SetupUtils() { + this(System.getProperty("pravega.uri")); + } + + public SetupUtils(String externalUri) { + log.info("SetupUtils constructor"); + if (externalUri != null) { + log.info("Using Pravega services at {}.", externalUri); + gateway = new ExternalPravegaGateway(URI.create(externalUri)); + } else { + log.info("Starting in-process Pravega services."); + gateway = new InProcPravegaGateway(); + } + log.info("Done with constructor {}", gateway.toString()); + } + + + /** + * Start all pravega related services required for the test deployment. + * + * @throws Exception on any errors. + */ + public void startAllServices() throws Exception { + log.info("Starting all services"); + if (!this.started.compareAndSet(false, true)) { + log.warn("Services already started, not attempting to start again"); + return; + } + log.info("Starting gateway"); + gateway.start(); + } + + /** + * Stop the pravega cluster and release all resources. + * + * @throws Exception on any errors. + */ + public void stopAllServices() throws Exception { + if (!this.started.compareAndSet(true, false)) { + log.warn("Services not yet started or already stopped, not attempting to stop"); + return; + } + + try { + gateway.stop(); + } catch (Exception e) { + log.warn("Services did not stop cleanly (" + e.getMessage() + ")", e); + } + } + + /** + * Fetch the client configuration with which to connect to the controller. + */ + public ClientConfig getClientConfig() { + return this.gateway.getClientConfig(); + } + + /** + * Create the test stream. + * + * @param streamName Name of the test stream. + * @param numSegments Number of segments to be created for this stream. + * + * @throws Exception on any errors. + */ + public void createTestStream(final String streamName, final int numSegments) + throws Exception { + Preconditions.checkState(this.started.get(), "Services not yet started"); + Preconditions.checkNotNull(streamName); + Preconditions.checkArgument(numSegments > 0); + + @Cleanup + StreamManager streamManager = StreamManager.create(getClientConfig()); + streamManager.createScope(this.scope); + streamManager.createStream(this.scope, streamName, + StreamConfiguration.builder() + .scalingPolicy(ScalingPolicy.fixed(numSegments)) + .build()); + log.info("Created stream: " + streamName); + } + + /** + * Get the stream. + * + * @param streamName Name of the test stream. + * + * @return a Stream + */ + public Stream getStream(final String streamName) { + return Stream.of(this.scope, streamName); + } + + /** + * A gateway interface to Pravega for integration test purposes. + */ + private interface PravegaGateway { + /** + * Starts the gateway. + */ + void start() throws Exception; + + /** + * Stops the gateway. + */ + void stop() throws Exception; + + /** + * Gets the client configuration with which to connect to the controller. + */ + ClientConfig getClientConfig(); + } + + class InProcPravegaGateway implements PravegaGateway { + + // The pravega cluster. + private InProcPravegaCluster inProcPravegaCluster = null; + + @Override + public void start() throws Exception { + log.info("Starting gateway"); + int zkPort = PortUtils.getAvailableListenPort(); + int controllerPort = PortUtils.getAvailableListenPort(); + int hostPort = PortUtils.getAvailableListenPort(); + int restPort = PortUtils.getAvailableListenPort(); + + log.info("Building"); + this.inProcPravegaCluster = InProcPravegaCluster.builder() + .isInProcZK(true) + .secureZK(enableTls) //configure ZK for security + .zkUrl("localhost:" + zkPort) + .zkPort(zkPort) + .isInMemStorage(true) + .isInProcController(true) + .controllerCount(1) + .restServerPort(restPort) + .enableRestServer(enableRestServer) + .isInProcSegmentStore(true) + .segmentStoreCount(1) + .containerCount(4) + .enableMetrics(false) + .enableAuth(enableAuth) + .enableTls(enableTls) + .build(); + log.info("Done building"); + this.inProcPravegaCluster.setControllerPorts(new int[]{controllerPort}); + this.inProcPravegaCluster.setSegmentStorePorts(new int[]{hostPort}); + this.inProcPravegaCluster.start(); + log.info("Initialized Pravega Cluster"); + log.info("Controller port is {}", controllerPort); + log.info("Host port is {}", hostPort); + log.info("REST server port is {}", restPort); + } + + @Override + public void stop() throws Exception { + inProcPravegaCluster.close(); + } + + @Override + public ClientConfig getClientConfig() { + log.info("Getting client config"); + return ClientConfig.builder() + .controllerURI(URI.create(inProcPravegaCluster.getControllerURI())) + .build(); + } + } + + class ExternalPravegaGateway implements PravegaGateway { + + private final URI controllerUri; + + public ExternalPravegaGateway(URI controllerUri) { + this.controllerUri = controllerUri; + } + + @Override + public void start() throws Exception { + } + + @Override + public void stop() throws Exception { + } + + @Override + public ClientConfig getClientConfig() { + return ClientConfig.builder() + .controllerURI(controllerUri) + .build(); + } + } + + static class PortUtils { + // Linux uses ports from range 32768 - 61000. + private static final int BASE_PORT = 32768; + private static final int MAX_PORT_COUNT = 28232; + private static final AtomicInteger NEXT_PORT = new AtomicInteger(1); + + /** + * A helper method to get a random free port. + * + * @return free port. + */ + public static int getAvailableListenPort() { + for (int i = 0; i < MAX_PORT_COUNT; i++) { + int candidatePort = BASE_PORT + NEXT_PORT.getAndIncrement() % MAX_PORT_COUNT; + try { + ServerSocket serverSocket = new ServerSocket(candidatePort); + serverSocket.close(); + return candidatePort; + } catch (IOException e) { + // Do nothing. Try another port. + } + } + throw new IllegalStateException( + String.format("Could not assign port in range %d - %d", BASE_PORT, MAX_PORT_COUNT + BASE_PORT)); + } + } +} diff --git a/scripts/runAtLeastOnceApp.sh b/scripts/runAtLeastOnceApp.sh new file mode 100755 index 00000000..becf6d8b --- /dev/null +++ b/scripts/runAtLeastOnceApp.sh @@ -0,0 +1,20 @@ +#!/usr/bin/env bash +set -ex +ROOT_DIR=$(dirname $0)/.. +cd ${ROOT_DIR} +./gradlew pravega-client-examples:distTar +cd pravega-client-examples/build/distributions +tar -xf pravega-client-examples-0.10.0-SNAPSHOT.tar +cd pravega-client-examples-0.10.0-SNAPSHOT + +NUM_INSTANCES=${1:-1} + +for i in $(seq -w 01 $NUM_INSTANCES); do + LOG_FILE=/tmp/atLeastOnceApp-$i.log + echo Logging to ${LOG_FILE} + INSTANCE_ID=$i bin/atLeastOnceApp >& ${LOG_FILE} & +done + +echo Instances have been started as background jobs. + +tail -f /tmp/atLeastOnceApp-*.log