Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 89: Add Batch Reader Support #92

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions src/main/java/io/pravega/perf/PravegaBatchReaderWorker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/**
* Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/

package io.pravega.perf;
maddisondavid marked this conversation as resolved.
Show resolved Hide resolved

import io.pravega.client.BatchClientFactory;
import io.pravega.client.batch.SegmentIterator;
import io.pravega.client.batch.SegmentRange;
import io.pravega.client.stream.impl.ByteArraySerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Iterator;
import java.util.List;

public class PravegaBatchReaderWorker extends ReaderWorker {
private static Logger log = LoggerFactory.getLogger(PravegaBatchReaderWorker.class);

private final BatchClientFactory batchClientFactory;
private final Iterator<SegmentRange> assignedSegments;

private SegmentIterator<byte[]> currentSegmentIterator;
private SegmentRange currentRange;
private boolean finished;

PravegaBatchReaderWorker(int readerId, int events, int secondsToRun, long start, PerfStats stats, String readerGrp, int timeout, boolean writeAndRead, BatchClientFactory batchClientFactory, List<SegmentRange> assignedSegments) {
super(readerId, events, secondsToRun, start, stats, readerGrp, timeout, writeAndRead);
this.batchClientFactory = batchClientFactory;

this.assignedSegments = assignedSegments.iterator();
}

@Override
public byte[] readData() throws WorkerCompleteException {
if (finished) {
throw new WorkerCompleteException(workerID);
}

if (currentSegmentIterator == null || !currentSegmentIterator.hasNext()) {
if (currentRange != null) {
currentSegmentIterator.close();

log.info("id:{} Completed Segment {}, {}({}:{})", workerID, currentRange.getStreamName(), currentRange.getSegmentId(), currentRange.getStartOffset(), currentRange.getEndOffset());
}

if (assignedSegments.hasNext()) {

currentRange = assignedSegments.next();
currentSegmentIterator = batchClientFactory.readSegment(currentRange, new ByteArraySerializer());

log.info("id:{} Starting Segment {}, {}({}:{})", workerID, currentRange.getStreamName(), currentRange.getSegmentId(), currentRange.getStartOffset(), currentRange.getEndOffset());
} else {
log.info("id:{} Completed all assigned assignedSegments", workerID);
currentSegmentIterator = null;
finished = true;
return null;
}
}

return currentSegmentIterator.next();
}

@Override
public void close() {
if (currentSegmentIterator != null) {
currentSegmentIterator.close();
}
}
}
86 changes: 83 additions & 3 deletions src/main/java/io/pravega/perf/PravegaPerfTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@

package io.pravega.perf;

import io.pravega.client.BatchClientFactory;
import io.pravega.client.ClientConfig;
import io.pravega.client.EventStreamClientFactory;
import io.pravega.client.batch.SegmentRange;
import io.pravega.client.stream.ReaderGroup;
import io.pravega.client.stream.impl.ClientFactoryImpl;
import io.pravega.client.stream.impl.ControllerImpl;
Expand All @@ -23,10 +25,13 @@
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
Expand All @@ -43,6 +48,8 @@
* Data format is in comma separated format as following: {TimeStamp, Sensor Id, Location, TempValue }.
*/
public class PravegaPerfTest {
private static Logger log = LoggerFactory.getLogger(PravegaPerfTest.class);

final static String BENCHMARKNAME = "pravega-benchmark";

public static void main(String[] args) {
Expand Down Expand Up @@ -88,6 +95,8 @@ public static void main(String[] args) {
"If -1 (default), watermarks will not be read.\n" +
"If >0, watermarks will be read with a period of this many milliseconds.");

options.addOption("batchreaders", false, "Use batch readers rather than Streaming readers for consumers");

options.addOption("help", false, "Help message");

parser = new DefaultParser();
Expand Down Expand Up @@ -186,7 +195,7 @@ static private abstract class Test {
final boolean recreate;
final boolean writeAndRead;
final int producerCount;
final int consumerCount;
int consumerCount;
final int segmentCount;
final int events;
final int eventsPerSec;
Expand All @@ -204,6 +213,7 @@ static private abstract class Test {
final boolean enableConnectionPooling;
final long writeWatermarkPeriodMillis;
final long readWatermarkPeriodMillis;
final boolean batchReaders;

Test(long startTime, CommandLine commandline) throws IllegalArgumentException {
this.startTime = startTime;
Expand Down Expand Up @@ -303,6 +313,8 @@ static private abstract class Test {
readFile = null;
}

batchReaders = commandline.hasOption("batchreaders");

maddisondavid marked this conversation as resolved.
Show resolved Hide resolved
enableConnectionPooling = Boolean.parseBoolean(commandline.getOptionValue("enableConnectionPooling", "true"));

writeWatermarkPeriodMillis = Long.parseLong(commandline.getOptionValue("writeWatermarkPeriodMillis", "-1"));
Expand Down Expand Up @@ -403,6 +415,7 @@ static private class PravegaTest extends Test {
final PravegaStreamHandler streamHandle;
final EventStreamClientFactory factory;
final ReaderGroup readerGroup;
final BatchClientFactory batchClientFactory;

PravegaTest(long startTime, CommandLine commandline) throws
IllegalArgumentException, URISyntaxException, InterruptedException, Exception {
Expand All @@ -426,8 +439,15 @@ static private class PravegaTest extends Test {
}
}
if (consumerCount > 0) {
readerGroup = streamHandle.createReaderGroup(!writeAndRead);
if (batchReaders) {
batchClientFactory = streamHandle.newBatchClientFactory();
readerGroup = null;
} else {
batchClientFactory = null;
readerGroup = streamHandle.createReaderGroup(!writeAndRead);
}
} else {
batchClientFactory = null;
readerGroup = null;
}

Expand Down Expand Up @@ -468,11 +488,43 @@ public List<WriterWorker> getProducers() {
}

public List<ReaderWorker> getConsumers() throws URISyntaxException {
return batchReaders ? getBatchConsumers() : getStreamingConsumers();
}

public List<ReaderWorker> getBatchConsumers() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A key difference between the batch and streaming readers in this benchmark is that there can be multiple benchmark processes reading together through a reader group. Adding more processes will increase the parallelism and data will not be read multiple times.
The batch reader does not perform this coordination so if one attempts to run multiple reader benchmark processes, each process will read the entire stream.
This behavior is reasonable but it should be documented.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point and it was a subtly in the code that I missed, i.e. that the ReaderGroup name is devised from the steam name itself and therefore there is implicit coordination between multiple distributed processes.

https://github.com/pravega/pravega-benchmark/blob/master/src/main/java/io/pravega/perf/PravegaPerfTest.java#L290-L293

            if (recreate) {
                rdGrpName = streamName + startTime;
            } else {
                rdGrpName = streamName + "RdGrp";
            }

Obviously, as you point out, this is a bit harder to achieve with the BatchClient as the coordination is basically left up to the processor itself. I'll document this current restriction and find a way of distributing the processing in another PR.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you need to distribute the processing (in another PR), consider accepting parameters for the process index and the number of processes. Then each process can sort the available segment ranges and take a deterministic subset.

final List<ReaderWorker> readers;
if (consumerCount > 0) {
List<SegmentRange> segmentRanges = streamHandle.getBatchSegmentRanges(batchClientFactory);

if (consumerCount > segmentRanges.size()) {
consumerCount = segmentRanges.size();

log.info("Limiting To {} consumers due to small number of segment ranges", consumerCount);
}

List<List<SegmentRange>> assignedRanges = assignSegmentsToConsumers(segmentRanges, consumerCount);

readers = IntStream.range(0, consumerCount)
.boxed()
.map(i ->
new PravegaBatchReaderWorker(i, eventsPerConsumer,
runtimeSec, startTime, consumeStats,
rdGrpName, TIMEOUT, writeAndRead, batchClientFactory, assignedRanges.get(i))
)
.collect(Collectors.toList());
} else {
readers = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this case possible? If the user does not specify consumers, we could take as default the number of segments or just 1. But if the user sets consumerCount <= 0, the tool should throw an error even before reaching this point telling the user that the input is incorrect, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic follows the same pattern as the streaming consumers. The problem is that the test doesn't know if consumers are required, it only knows that because consumers > 0. If consumers was defaulted to 1 then a test would always have a consumer, which is not desirable.

}
return readers;

}

public List<ReaderWorker> getStreamingConsumers() throws URISyntaxException {
final List<ReaderWorker> readers;
if (consumerCount > 0) {
readers = IntStream.range(0, consumerCount)
.boxed()
.map(i -> new PravegaReaderWorker(i, eventsPerConsumer,
.map(i -> new PravegaStreamingReaderWorker(i, eventsPerConsumer,
runtimeSec, startTime, consumeStats,
rdGrpName, TIMEOUT, writeAndRead, factory,
io.pravega.client.stream.Stream.of(scopeName, streamName),
Expand All @@ -491,5 +543,33 @@ public void closeReaderGroup() {
}
}

/**
* Chunks the list of segment ranges between the number consumers. If the number of segment ranges is
* divisible by the number of consumers then each consumer will recieve an equal number of segments, otherwise
* some consumers may receive more segments than others.
*
* @return A list of lists, each list representing the segments assigned to that consumer
*/
private List<List<SegmentRange>> assignSegmentsToConsumers(List<SegmentRange> segmentRanges, int consumers) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using com.google.common.collect.Lists.partition instead of this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally I did use Google Lists.partition but the problem with that API is that partitions the list based on a specific partition size and NOT (as I'd assumed) by the number of required partitions.

This lead to getting into calculating the required size of partitions along with handling the cases where the number of SegmentRanges wasn't completely divisible by the number of consumers. In the end it felt a lot simpler to partition the list as done here.

List<List<SegmentRange>> results = new ArrayList<>();
for (int f=0; f < consumers; f++) {
results.add(new ArrayList<>());
}

for (int f=0;f < segmentRanges.size(); f++) {
int consumerId = f % consumers;

if (results.size() < f) {
results.add(new ArrayList<>());
}

SegmentRange segmentRange = segmentRanges.get(f);
results.get(consumerId).add(segmentRange);

log.info("Segment Assignment {} -> {} {}({}:{})", consumerId, segmentRange.getStreamName(), segmentRange.getSegmentId(), segmentRange.getStartOffset(), segmentRange.getEndOffset());
}

return results;
}
}
}
21 changes: 21 additions & 0 deletions src/main/java/io/pravega/perf/PravegaStreamHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
package io.pravega.perf;

import java.net.URI;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Spliterators;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -21,7 +23,12 @@
import java.net.URISyntaxException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.stream.StreamSupport;

import com.google.common.collect.Streams;
import io.pravega.client.BatchClientFactory;
import io.pravega.client.batch.SegmentRange;
import io.pravega.client.stream.StreamCut;
import io.pravega.client.stream.impl.ControllerImpl;
import io.pravega.client.admin.StreamManager;
import io.pravega.client.stream.StreamConfiguration;
Expand Down Expand Up @@ -139,6 +146,8 @@ void recreate() throws InterruptedException, ExecutionException, TimeoutExceptio
}
}



ReaderGroup createReaderGroup(boolean reset) throws URISyntaxException {
if (readerGroupManager == null) {
readerGroupManager = ReaderGroupManager.withScope(scope,
Expand All @@ -154,6 +163,18 @@ ReaderGroup createReaderGroup(boolean reset) throws URISyntaxException {
return rdGroup;
}

public BatchClientFactory newBatchClientFactory() {
ClientConfig clientConfig = ClientConfig.builder().controllerURI(URI.create(controllerUri)).build();
return BatchClientFactory.withScope(scope, clientConfig);
}

public List<SegmentRange> getBatchSegmentRanges(BatchClientFactory batchFactory) {
Iterator<SegmentRange> segmentRangeIterator = batchFactory.getSegments(Stream.of(scope, stream), StreamCut.UNBOUNDED, StreamCut.UNBOUNDED)
.getIterator();

return Streams.stream(segmentRangeIterator).collect(Collectors.toList());
}

void deleteReaderGroup() {
try {
readerGroupManager.deleteReaderGroup(rdGrpName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
/**
* Class for Pravega reader/consumer.
*/
public class PravegaReaderWorker extends ReaderWorker {
private static Logger log = LoggerFactory.getLogger(PravegaReaderWorker.class);
public class PravegaStreamingReaderWorker extends ReaderWorker {
private static Logger log = LoggerFactory.getLogger(PravegaStreamingReaderWorker.class);

private final EventStreamReader<byte[]> reader;
private final Stream stream;
Expand All @@ -38,10 +38,11 @@ public class PravegaReaderWorker extends ReaderWorker {
*
* @param readWatermarkPeriodMillis If >0, watermarks will be read with a period of this many milliseconds.
*/
PravegaReaderWorker(int readerId, int events, int secondsToRun,
long start, PerfStats stats, String readergrp,
int timeout, boolean writeAndRead, EventStreamClientFactory factory,
Stream stream, long readWatermarkPeriodMillis) {

PravegaStreamingReaderWorker(int readerId, int events, int secondsToRun,
maddisondavid marked this conversation as resolved.
Show resolved Hide resolved
long start, PerfStats stats, String readergrp,
int timeout, boolean writeAndRead, EventStreamClientFactory factory,
Stream stream, long readWatermarkPeriodMillis) {
super(readerId, events, secondsToRun, start, stats, readergrp, timeout, writeAndRead);

final String readerSt = Integer.toString(readerId);
Expand Down
7 changes: 5 additions & 2 deletions src/main/java/io/pravega/perf/ReaderWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;

Expand Down Expand Up @@ -47,7 +46,7 @@ private Performance createBenchmark() {
/**
* read the data.
*/
public abstract byte[] readData();
public abstract byte[] readData() throws WorkerCompleteException;

/**
* close the consumer/reader.
Expand Down Expand Up @@ -78,6 +77,7 @@ public void EventsReader() throws IOException {
i++;
}
}
} catch(WorkerCompleteException ignore) {
} finally {
close();
}
Expand All @@ -100,6 +100,7 @@ public void EventsReaderRW() throws IOException {
i++;
}
}
} catch(WorkerCompleteException ignore) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to log anything here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, because the Exception only exists to indicate to the logic that the reader has completed, it's not an actual exception or an error condition.

The PravegaBatchReaderWorker (which is the only worker that throws this exception) will log that it competed everything, so another log would just be redundant.

} finally {
close();
}
Expand All @@ -119,6 +120,7 @@ public void EventsTimeReader() throws IOException {
stats.recordTime(time, System.currentTimeMillis(), ret.length);
}
}
} catch(WorkerCompleteException ignore) {
} finally {
close();
}
Expand All @@ -141,6 +143,7 @@ public void EventsTimeReaderRW() throws IOException {
stats.recordTime(start, time, ret.length);
}
}
} catch(WorkerCompleteException ignore) {
} finally {
close();
}
Expand Down
Loading