Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 89: Add Batch Reader Support #92

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

maddisondavid
Copy link
Contributor

Change log description
Adds support for using the BatchReader instead of the StreamingReader within consumers

Purpose of the change
Fixes #89

What the code does
When reading historic data from a stream data can be read using either the Streaming Reader or the Batch (bounded) Reader. This adds a new -batchreaders option that signifies that the stream should be consumed using the BatchClient. In this mode all the segments from with a Stream bound are supplied by the Pravega Client and the consumers are free to consume them in any order.

The existing consumer has been renamed PravegaStreamingReaderWorker and a new PravegaBatchReaderWorker has been introduced. Each reader is assigned a certain number of the Batch segments with an attempt to give each consumer an equal number of the segments. If the segment count is lower than the number of specified consumers then the consumer count is reduced so that there is one consumer per Batch segment.

 -batchreaders                       signifies the consumers should all be
                                     Batch Readers rather than Streaming
                                     readers

How to verify
When running with the -batchreaders option the test will log the consumer to segment assignments

>pravega-benchmark -controller tcp://localhost:9090 -consumers 10 -scope dave -stream test6 -segments 5 -time 20 -batchreaders
...
...
...
2020-01-12 13:16:49:359 +0000 [main] INFO io.pravega.perf.PravegaPerfTest - Limiting To 5 consumers due to small number of segment ranges
2020-01-12 13:16:49:359 +0000 [main] INFO io.pravega.perf.PravegaPerfTest - Segment Assignment 0 -> test6 0(0:2215032)
2020-01-12 13:16:49:359 +0000 [main] INFO io.pravega.perf.PravegaPerfTest - Segment Assignment 1 -> test6 1(0:2175864)
2020-01-12 13:16:49:359 +0000 [main] INFO io.pravega.perf.PravegaPerfTest - Segment Assignment 2 -> test6 2(0:2194224)
2020-01-12 13:16:49:359 +0000 [main] INFO io.pravega.perf.PravegaPerfTest - Segment Assignment 3 -> test6 3(0:2177904)
2020-01-12 13:16:49:359 +0000 [main] INFO io.pravega.perf.PravegaPerfTest - Segment Assignment 4 -> test6 4(0:2189328)

As the consumers progress through thier assigned segments ranges they will also log progress:

2020-01-12 13:16:49:379 +0000 [ForkJoinPool-2-worker-4] INFO io.pravega.perf.PravegaBatchReaderWorker - id:3 Starting Segment test6, 3(0:2177904)
2020-01-12 13:16:49:379 +0000 [ForkJoinPool-2-worker-3] INFO io.pravega.perf.PravegaBatchReaderWorker - id:2 Starting Segment test6, 2(0:2194224)
2020-01-12 13:16:49:379 +0000 [ForkJoinPool-2-worker-1] INFO io.pravega.perf.PravegaBatchReaderWorker - id:0 Starting Segment test6, 0(0:2215032)
2020-01-12 13:16:49:379 +0000 [ForkJoinPool-2-worker-5] INFO io.pravega.perf.PravegaBatchReaderWorker - id:4 Starting Segment test6, 4(0:2189328)
2020-01-12 13:16:49:379 +0000 [ForkJoinPool-2-worker-2] INFO io.pravega.perf.PravegaBatchReaderWorker - id:1 Starting Segment test6, 1(0:2175864)
...
...
2020-01-12 13:16:49:595 +0000 [ForkJoinPool-2-worker-2] INFO io.pravega.perf.PravegaBatchReaderWorker - id:1 Completed Segment test6, 1(0:2175864)
2020-01-12 13:16:49:595 +0000 [ForkJoinPool-2-worker-2] INFO io.pravega.perf.PravegaBatchReaderWorker - id:1 Completed all assigned assignedSegments
2020-01-12 13:16:49:601 +0000 [ForkJoinPool-2-worker-4] INFO io.pravega.perf.PravegaBatchReaderWorker - id:3 Completed Segment test6, 3(0:2177904)
2020-01-12 13:16:49:601 +0000 [ForkJoinPool-2-worker-3] INFO io.pravega.perf.PravegaBatchReaderWorker - id:2 Completed Segment test6, 2(0:2194224)
2020-01-12 13:16:49:601 +0000 [ForkJoinPool-2-worker-4] INFO io.pravega.perf.PravegaBatchReaderWorker - id:3 Completed all assigned assignedSegments
2020-01-12 13:16:49:601 +0000 [ForkJoinPool-2-worker-3] INFO io.pravega.perf.PravegaBatchReaderWorker - id:2 Completed all assigned assignedSegments
...
...

Signed-off-by: David Maddison [email protected]

@maddisondavid maddisondavid changed the title Issue 89 Add Batch Reader Support Issue 89: Add Batch Reader Support Jan 12, 2020
Copy link

@claudiofahey claudiofahey left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. I have a few comments.

return batchReaders ? getBatchConsumers() : getStreamingConsumers();
}

public List<ReaderWorker> getBatchConsumers() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A key difference between the batch and streaming readers in this benchmark is that there can be multiple benchmark processes reading together through a reader group. Adding more processes will increase the parallelism and data will not be read multiple times.
The batch reader does not perform this coordination so if one attempts to run multiple reader benchmark processes, each process will read the entire stream.
This behavior is reasonable but it should be documented.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point and it was a subtly in the code that I missed, i.e. that the ReaderGroup name is devised from the steam name itself and therefore there is implicit coordination between multiple distributed processes.

https://github.com/pravega/pravega-benchmark/blob/master/src/main/java/io/pravega/perf/PravegaPerfTest.java#L290-L293

            if (recreate) {
                rdGrpName = streamName + startTime;
            } else {
                rdGrpName = streamName + "RdGrp";
            }

Obviously, as you point out, this is a bit harder to achieve with the BatchClient as the coordination is basically left up to the processor itself. I'll document this current restriction and find a way of distributing the processing in another PR.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you need to distribute the processing (in another PR), consider accepting parameters for the process index and the number of processes. Then each process can sort the available segment ranges and take a deterministic subset.

*
* @return A list of lists, each list representing the segments assigned to that consumer
*/
private List<List<SegmentRange>> assignSegmentsToConsumers(List<SegmentRange> segmentRanges, int consumers) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using com.google.common.collect.Lists.partition instead of this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally I did use Google Lists.partition but the problem with that API is that partitions the list based on a specific partition size and NOT (as I'd assumed) by the number of required partitions.

This lead to getting into calculating the required size of partitions along with handling the cases where the number of SegmentRanges wasn't completely divisible by the number of consumers. In the end it felt a lot simpler to partition the list as done here.

Copy link
Contributor

@RaulGracia RaulGracia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, I left some comments.

src/main/java/io/pravega/perf/WorkerCompleteException.java Outdated Show resolved Hide resolved
@@ -100,6 +100,7 @@ public void EventsReaderRW() throws IOException {
i++;
}
}
} catch(WorkerCompleteException ignore) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to log anything here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, because the Exception only exists to indicate to the logic that the reader has completed, it's not an actual exception or an error condition.

The PravegaBatchReaderWorker (which is the only worker that throws this exception) will log that it competed everything, so another log would just be redundant.

src/main/java/io/pravega/perf/PravegaPerfTest.java Outdated Show resolved Hide resolved
)
.collect(Collectors.toList());
} else {
readers = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this case possible? If the user does not specify consumers, we could take as default the number of segments or just 1. But if the user sets consumerCount <= 0, the tool should throw an error even before reaching this point telling the user that the input is incorrect, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic follows the same pattern as the streaming consumers. The problem is that the test doesn't know if consumers are required, it only knows that because consumers > 0. If consumers was defaulted to 1 then a test would always have a consumer, which is not desirable.

src/main/java/io/pravega/perf/PravegaPerfTest.java Outdated Show resolved Hide resolved
Signed-off-by: David Maddison <[email protected]>
@RaulGracia
Copy link
Contributor

@maddisondavid can you please have a look to the conflicts so we can merge this one?

@RaulGracia
Copy link
Contributor

@maddisondavid any updates on this one?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add support for reading with BatchClient
3 participants