diff --git a/README.md b/README.md
index d4db67fbf..8be33f49a 100644
--- a/README.md
+++ b/README.md
@@ -10,7 +10,7 @@ You may obtain a copy of the License at
# Pravega Benchmark Tool
-The Pravega benchmark tool used for the performance benchmarking of pravega streaming storage cluster.
+The Pravega benchmark tool used for the performance benchmarking of pravega and Kafka streaming storage clusters.
This tool performs the throughput and latency analysis for the multi producers/writers and consumers/readers of pravega.
it also validates the end to end latency. The write and/or read latencies can be stored in a CSV file for later analysis.
At the end of the performance benchmarking, this tool outputs the 50th, 75th, 95th , 99th, 99.9th and 99.99th latency percentiles.
@@ -46,7 +46,6 @@ Running Pravega benchmark tool locally:
```
/pravega-benchmark$ ./run/pravega-benchmark/bin/pravega-benchmark -help
-usage: pravega-benchmark
-consumers Number of consumers
-controller Controller URI
-events Number of events/records if 'time' not
@@ -58,7 +57,9 @@ usage: pravega-benchmark
number of of events/records; Not
applicable, if both producers and
consumers are specified
+ -fork Use Fork join Pool
-help Help message
+ -kafka Kafka Benchmarking
-producers Number of producers
-readcsv CSV file to record read latencies
-recreate If the stream is already existing, delete
@@ -172,3 +173,7 @@ The -throughput -1 specifies the writes tries to write the events at the maximum
### Recording the latencies to CSV files
User can use the options "-writecsv " to record the latencies of writers and "-readcsv " for readers.
in case of End to End latency mode, if the user can supply only -readcsv to get the end to end latency in to the csv file.
+
+### Kafka Benchmarking
+User can set the option "-kafka true" for Kafka Benchmarking. User should create the topics manually before running this for kafka benchmarking. Unlike Pravega benchmarking, this tool does not create the topic automatically. This tools treats stream name as a topic name.
+
diff --git a/build.gradle b/build.gradle
index 101678b48..79a7516dc 100644
--- a/build.gradle
+++ b/build.gradle
@@ -31,7 +31,8 @@ buildscript {
compile "io.pravega:pravega-client:0.5.0",
"io.pravega:pravega-common:0.5.0",
"commons-cli:commons-cli:1.3.1",
- "org.apache.commons:commons-csv:1.5"
+ "org.apache.commons:commons-csv:1.5",
+ "org.apache.kafka:kafka-clients:2.3.0"
runtime "org.slf4j:slf4j-simple:1.7.14"
}
diff --git a/src/main/java/io/pravega/perf/KafkaReaderWorker.java b/src/main/java/io/pravega/perf/KafkaReaderWorker.java
new file mode 100644
index 000000000..c17222a16
--- /dev/null
+++ b/src/main/java/io/pravega/perf/KafkaReaderWorker.java
@@ -0,0 +1,46 @@
+/**
+ * Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.pravega.perf;
+
+import java.util.Properties;
+import java.util.Arrays;
+
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+
+/**
+ * Class for Kafka reader/consumer.
+ */
+public class KafkaReaderWorker extends ReaderWorker {
+ final private KafkaConsumer consumer;
+
+ KafkaReaderWorker(int readerId, int events, int secondsToRun,
+ long start, PerfStats stats, String partition,
+ int timeout, boolean writeAndRead, Properties consumerProps) {
+ super(readerId, events, secondsToRun, start, stats, partition, timeout, writeAndRead);
+
+ this.consumer = new KafkaConsumer<>(consumerProps);
+ this.consumer.subscribe(Arrays.asList(partition));
+ }
+
+ @Override
+ public byte[] readData() {
+ final ConsumerRecords records = consumer.poll(timeout);
+ if (records.isEmpty()) {
+ return null;
+ }
+ return records.iterator().next().value();
+ }
+
+ @Override
+ public void close() {
+ consumer.close();
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/io/pravega/perf/KafkaWriterWorker.java b/src/main/java/io/pravega/perf/KafkaWriterWorker.java
new file mode 100644
index 000000000..98a95e434
--- /dev/null
+++ b/src/main/java/io/pravega/perf/KafkaWriterWorker.java
@@ -0,0 +1,59 @@
+/**
+ * Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.pravega.perf;
+
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+/**
+ * Class for Kafka writer/producer.
+ */
+public class KafkaWriterWorker extends WriterWorker {
+ final private KafkaProducer producer;
+
+ KafkaWriterWorker(int sensorId, int events, int flushEvents,
+ int secondsToRun, boolean isRandomKey, int messageSize,
+ long start, PerfStats stats, String streamName,
+ int eventsPerSec, boolean writeAndRead, Properties producerProps) {
+
+ super(sensorId, events, flushEvents,
+ secondsToRun, isRandomKey, messageSize,
+ start, stats, streamName, eventsPerSec, writeAndRead);
+
+ this.producer = new KafkaProducer<>(producerProps);
+ }
+
+ public long recordWrite(byte[] data, TriConsumer record) {
+ final long time = System.currentTimeMillis();
+ producer.send(new ProducerRecord<>(streamName, data), (metadata, exception) -> {
+ final long endTime = System.currentTimeMillis();
+ record.accept(time, endTime, data.length);
+ });
+ return time;
+ }
+
+ @Override
+ public void writeData(byte[] data) {
+ producer.send(new ProducerRecord<>(streamName, data));
+ }
+
+
+ @Override
+ public void flush() {
+ producer.flush();
+ }
+
+ @Override
+ public synchronized void close() {
+ producer.close();
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/io/pravega/perf/PerfStats.java b/src/main/java/io/pravega/perf/PerfStats.java
index 30008c43a..c5ba4a6b8 100644
--- a/src/main/java/io/pravega/perf/PerfStats.java
+++ b/src/main/java/io/pravega/perf/PerfStats.java
@@ -13,9 +13,9 @@
import java.util.ArrayList;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.locks.LockSupport;
import java.nio.file.Files;
import java.nio.file.Paths;
@@ -38,7 +38,7 @@ public class PerfStats {
final private int messageSize;
final private int windowInterval;
final private ConcurrentLinkedQueue queue;
- final private ForkJoinPool executor;
+ final private ExecutorService executor;
@GuardedBy("this")
private Future ret;
@@ -66,13 +66,13 @@ private boolean isEnd() {
}
}
- public PerfStats(String action, int reportingInterval, int messageSize, String csvFile) {
+ public PerfStats(String action, int reportingInterval, int messageSize, String csvFile, ExecutorService executor) {
this.action = action;
this.messageSize = messageSize;
this.windowInterval = reportingInterval;
this.csvFile = csvFile;
+ this.executor = executor;
this.queue = new ConcurrentLinkedQueue<>();
- this.executor = new ForkJoinPool(1);
this.ret = null;
}
@@ -83,7 +83,7 @@ final private class QueueProcessor implements Callable {
final private static int NS_PER_MICRO = 1000;
final private static int MICROS_PER_MS = 1000;
final private static int NS_PER_MS = NS_PER_MICRO * MICROS_PER_MS;
- final private static int PARK_NS = NS_PER_MICRO;
+ final private static int PARK_NS = NS_PER_MS;
final private long startTime;
private QueueProcessor(long startTime) {
@@ -94,7 +94,7 @@ public Void call() throws IOException {
final TimeWindow window = new TimeWindow(action, startTime);
final LatencyWriter latencyRecorder = csvFile == null ? new LatencyWriter(action, messageSize, startTime) :
new CSVLatencyWriter(action, messageSize, startTime, csvFile);
- final int minWaitTimeMS = windowInterval / 50;
+ final int minWaitTimeMS = windowInterval / 10;
final long totalIdleCount = (NS_PER_MS / PARK_NS) * minWaitTimeMS;
boolean doWork = true;
long time = startTime;
@@ -351,7 +351,6 @@ public synchronized void shutdown(long endTime) throws ExecutionException, Inter
if (this.ret != null) {
queue.add(new TimeStamp(endTime));
ret.get();
- executor.shutdownNow();
queue.clear();
this.ret = null;
}
diff --git a/src/main/java/io/pravega/perf/PravegaPerfTest.java b/src/main/java/io/pravega/perf/PravegaPerfTest.java
index e90cc9638..93379fe81 100644
--- a/src/main/java/io/pravega/perf/PravegaPerfTest.java
+++ b/src/main/java/io/pravega/perf/PravegaPerfTest.java
@@ -17,6 +17,12 @@
import io.pravega.client.stream.impl.ControllerImplConfig;
import io.pravega.client.stream.impl.ClientFactoryImpl;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.requests.IsolationLevel;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+
import java.io.IOException;
import java.net.URISyntaxException;
@@ -39,6 +45,9 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.Properties;
+import java.util.Locale;
/**
* Performance benchmark for Pravega.
@@ -80,6 +89,8 @@ public static void main(String[] args) {
"if -1, get the maximum throughput");
options.addOption("writecsv", true, "CSV file to record write latencies");
options.addOption("readcsv", true, "CSV file to record read latencies");
+ options.addOption("fork", true, "Use Fork join Pool");
+ options.addOption("kafka", true, "Kafka Benchmarking");
options.addOption("help", false, "Help message");
@@ -102,7 +113,7 @@ public static void main(String[] args) {
System.exit(0);
}
- final ForkJoinPool executor = new ForkJoinPool();
+ final ExecutorService executor = perfTest.getExecutor();
try {
final List producers = perfTest.getProducers();
@@ -152,7 +163,12 @@ public void run() {
public static Test createTest(long startTime, CommandLine commandline, Options options) {
try {
- return new PravegaTest(startTime, commandline);
+ boolean runKafka = Boolean.parseBoolean(commandline.getOptionValue("kafka", "false"));
+ if (runKafka) {
+ return new KafkaTest(startTime, commandline);
+ } else {
+ return new PravegaTest(startTime, commandline);
+ }
} catch (IllegalArgumentException ex) {
ex.printStackTrace();
final HelpFormatter formatter = new HelpFormatter();
@@ -171,6 +187,7 @@ static private abstract class Test {
static final int TIMEOUT = 1000;
static final String SCOPE = "Scope";
+ final ExecutorService executor;
final String controllerUri;
final int messageSize;
final String streamName;
@@ -178,6 +195,7 @@ static private abstract class Test {
final String scopeName;
final boolean recreate;
final boolean writeAndRead;
+ final boolean fork;
final int producerCount;
final int consumerCount;
final int segmentCount;
@@ -195,39 +213,36 @@ static private abstract class Test {
final PerfStats consumeStats;
final long startTime;
+
Test(long startTime, CommandLine commandline) throws IllegalArgumentException {
this.startTime = startTime;
- if (commandline.hasOption("controller")) {
- controllerUri = commandline.getOptionValue("controller");
- } else {
- controllerUri = null;
- }
+ controllerUri = commandline.getOptionValue("controller", null);
+ streamName = commandline.getOptionValue("stream", null);
+ producerCount = Integer.parseInt(commandline.getOptionValue("producers", "0"));
+ consumerCount = Integer.parseInt(commandline.getOptionValue("consumers", "0"));
- if (commandline.hasOption("producers")) {
- producerCount = Integer.parseInt(commandline.getOptionValue("producers"));
- } else {
- producerCount = 0;
+ if (controllerUri == null) {
+ throw new IllegalArgumentException("Error: Must specify Controller IP address");
}
- if (commandline.hasOption("consumers")) {
- consumerCount = Integer.parseInt(commandline.getOptionValue("consumers"));
- } else {
- consumerCount = 0;
+ if (streamName == null) {
+ throw new IllegalArgumentException("Error: Must specify stream Name");
}
- if (commandline.hasOption("events")) {
- events = Integer.parseInt(commandline.getOptionValue("events"));
- } else {
- events = 0;
+ if (producerCount == 0 && consumerCount == 0) {
+ throw new IllegalArgumentException("Error: Must specify the number of producers or Consumers");
}
- if (commandline.hasOption("flush")) {
- int flushEvents = Integer.parseInt(commandline.getOptionValue("flush"));
- if (flushEvents > 0) {
- EventsPerFlush = flushEvents;
- } else {
- EventsPerFlush = Integer.MAX_VALUE;
- }
+ events = Integer.parseInt(commandline.getOptionValue("events", "0"));
+ messageSize = Integer.parseInt(commandline.getOptionValue("size","0"));
+ scopeName = commandline.getOptionValue("scope",SCOPE);
+ transactionPerCommit = Integer.parseInt(commandline.getOptionValue("transactionspercommit","0"));
+ fork = Boolean.parseBoolean(commandline.getOptionValue("fork", "true"));
+ writeFile = commandline.getOptionValue("writecsv",null);
+ readFile = commandline.getOptionValue("readcsv", null);
+ int flushEvents = Integer.parseInt(commandline.getOptionValue("flush", "0"));
+ if (flushEvents > 0) {
+ EventsPerFlush = flushEvents;
} else {
EventsPerFlush = Integer.MAX_VALUE;
}
@@ -240,30 +255,6 @@ static private abstract class Test {
runtimeSec = MAXTIME;
}
- if (commandline.hasOption("size")) {
- messageSize = Integer.parseInt(commandline.getOptionValue("size"));
- } else {
- messageSize = 0;
- }
-
- if (commandline.hasOption("stream")) {
- streamName = commandline.getOptionValue("stream");
- } else {
- streamName = null;
- }
-
- if (commandline.hasOption("scope")) {
- scopeName = commandline.getOptionValue("scope");
- } else {
- scopeName = SCOPE;
- }
-
- if (commandline.hasOption("transactionspercommit")) {
- transactionPerCommit = Integer.parseInt(commandline.getOptionValue("transactionspercommit"));
- } else {
- transactionPerCommit = 0;
- }
-
if (commandline.hasOption("segments")) {
segmentCount = Integer.parseInt(commandline.getOptionValue("segments"));
} else {
@@ -281,28 +272,11 @@ static private abstract class Test {
} else {
throughput = -1;
}
-
- if (commandline.hasOption("writecsv")) {
- writeFile = commandline.getOptionValue("writecsv");
- } else {
- writeFile = null;
- }
- if (commandline.hasOption("readcsv")) {
- readFile = commandline.getOptionValue("readcsv");
+ final int threadCount = producerCount + consumerCount + 6;
+ if (fork) {
+ executor = new ForkJoinPool(threadCount);
} else {
- readFile = null;
- }
-
- if (controllerUri == null) {
- throw new IllegalArgumentException("Error: Must specify Controller IP address");
- }
-
- if (streamName == null) {
- throw new IllegalArgumentException("Error: Must specify stream Name");
- }
-
- if (producerCount == 0 && consumerCount == 0) {
- throw new IllegalArgumentException("Error: Must specify the number of producers or Consumers");
+ executor = Executors.newFixedThreadPool(threadCount);
}
if (recreate) {
@@ -321,7 +295,7 @@ static private abstract class Test {
if (writeAndRead) {
produceStats = null;
} else {
- produceStats = new PerfStats("Writing", REPORTINGINTERVAL, messageSize, writeFile);
+ produceStats = new PerfStats("Writing", REPORTINGINTERVAL, messageSize, writeFile, executor);
}
eventsPerProducer = (events + producerCount - 1) / producerCount;
@@ -346,7 +320,7 @@ static private abstract class Test {
} else {
action = "Reading";
}
- consumeStats = new PerfStats(action, REPORTINGINTERVAL, messageSize, readFile);
+ consumeStats = new PerfStats(action, REPORTINGINTERVAL, messageSize, readFile, executor);
eventsPerConsumer = events / consumerCount;
} else {
consumeStats = null;
@@ -376,6 +350,10 @@ public void shutdown(long endTime) {
}
}
+ public ExecutorService getExecutor() {
+ return executor;
+ }
+
public abstract void closeReaderGroup();
public abstract List getProducers();
@@ -470,6 +448,95 @@ public void closeReaderGroup() {
readerGroup.close();
}
}
+ }
+
+ static private class KafkaTest extends Test {
+ final private Properties producerConfig;
+ final private Properties consumerConfig;
+
+ KafkaTest(long startTime, CommandLine commandline) throws
+ IllegalArgumentException, URISyntaxException, InterruptedException, Exception {
+ super(startTime, commandline);
+ producerConfig = createProducerConfig();
+ consumerConfig = createConsumerConfig();
+ }
+
+ private Properties createProducerConfig() {
+ if (producerCount < 1) {
+ return null;
+ }
+ final Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, controllerUri);
+ props.put(ProducerConfig.ACKS_CONFIG, "all");
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+ // Enabling the producer IDEMPOTENCE is must to compare between Kafka and Pravega
+ props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+ return props;
+ }
+
+ private Properties createConsumerConfig() {
+ if (consumerCount < 1) {
+ return null;
+ }
+ final Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, controllerUri);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+ props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
+ // Enabling the consumer to READ_COMMITTED is must to compare between Kafka and Pravega
+ props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
+ if (writeAndRead) {
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, streamName);
+ } else {
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, Long.toString(startTime));
+ }
+ return props;
+ }
+
+
+ public List getProducers() {
+ final List writers;
+
+ if (producerCount > 0) {
+ if (transactionPerCommit > 0) {
+ throw new IllegalArgumentException("Kafka Transactions are not supported");
+ } else {
+ writers = IntStream.range(0, producerCount)
+ .boxed()
+ .map(i -> new KafkaWriterWorker(i, eventsPerProducer,
+ EventsPerFlush, runtimeSec, false,
+ messageSize, startTime, produceStats,
+ streamName, eventsPerSec, writeAndRead, producerConfig))
+ .collect(Collectors.toList());
+ }
+ } else {
+ writers = null;
+ }
+ return writers;
+ }
+
+ public List getConsumers() throws URISyntaxException {
+ final List readers;
+ if (consumerCount > 0) {
+ readers = IntStream.range(0, consumerCount)
+ .boxed()
+ .map(i -> new KafkaReaderWorker(i, eventsPerConsumer,
+ runtimeSec, startTime, consumeStats,
+ streamName, TIMEOUT, writeAndRead, consumerConfig))
+ .collect(Collectors.toList());
+
+ } else {
+ readers = null;
+ }
+ return readers;
+ }
+ @Override
+ public void closeReaderGroup() {
+ }
}
+
}
diff --git a/src/main/java/io/pravega/perf/PravegaWriterWorker.java b/src/main/java/io/pravega/perf/PravegaWriterWorker.java
index 9ad293daf..5206006a8 100644
--- a/src/main/java/io/pravega/perf/PravegaWriterWorker.java
+++ b/src/main/java/io/pravega/perf/PravegaWriterWorker.java
@@ -43,7 +43,8 @@ public long recordWrite(byte[] data, TriConsumer record) {
final long time = System.currentTimeMillis();
ret = producer.writeEvent(data);
ret.thenAccept(d -> {
- record.accept(time, System.currentTimeMillis(), data.length);
+ final long endTime = System.currentTimeMillis();
+ record.accept(time, endTime, data.length);
});
return time;
}
diff --git a/src/main/java/io/pravega/perf/ReaderWorker.java b/src/main/java/io/pravega/perf/ReaderWorker.java
index f6d1d8b19..12229c19a 100644
--- a/src/main/java/io/pravega/perf/ReaderWorker.java
+++ b/src/main/java/io/pravega/perf/ReaderWorker.java
@@ -12,7 +12,6 @@
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@@ -74,7 +73,8 @@ public void EventsReader() throws IOException {
final long startTime = System.currentTimeMillis();
ret = readData();
if (ret != null) {
- stats.recordTime(startTime, System.currentTimeMillis(), ret.length);
+ final long endTime = System.currentTimeMillis();
+ stats.recordTime(startTime, endTime, ret.length);
i++;
}
}
@@ -116,7 +116,8 @@ public void EventsTimeReader() throws IOException {
time = System.currentTimeMillis();
ret = readData();
if (ret != null) {
- stats.recordTime(time, System.currentTimeMillis(), ret.length);
+ final long endTime = System.currentTimeMillis();
+ stats.recordTime(time, endTime, ret.length);
}
}
} finally {
diff --git a/src/main/java/io/pravega/perf/WriterWorker.java b/src/main/java/io/pravega/perf/WriterWorker.java
index 7a87f5b9c..54bce4a6a 100644
--- a/src/main/java/io/pravega/perf/WriterWorker.java
+++ b/src/main/java/io/pravega/perf/WriterWorker.java
@@ -194,7 +194,7 @@ private void EventsWriterTimeRW() throws InterruptedException, IOException {
for (int i = 0; (time - startTime) < msToRun; i++) {
time = System.currentTimeMillis();
- byte[] bytes = timeBuffer.putLong(0, System.currentTimeMillis()).array();
+ byte[] bytes = timeBuffer.putLong(0, time).array();
System.arraycopy(bytes, 0, payload, 0, bytes.length);
writeData(payload);
/*