Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…Generator into develop
  • Loading branch information
Robin Zimmermann committed Mar 22, 2018
2 parents f972180 + a767871 commit ab23125
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 11 deletions.
5 changes: 3 additions & 2 deletions src/main/java/com/lc/df/kafka/client/KafkaClient.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.lc.df.kafka.client;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -61,7 +62,7 @@ public void run()
e.printStackTrace();
}

com.lc.df.kafka.client.Logger.logInfoMessage("Printing Final Stats: ........" + Kpi.getStats());
com.lc.df.kafka.client.Logger.logInfoMessage("Printing Final Stats: ........" + Kpi.getStats(true));
}

private void prepareConnectivity() throws Exception
Expand Down Expand Up @@ -176,6 +177,7 @@ private static void resetStats() {
Kpi.total_error_timeout = 0;
Kpi.total_error_unknown = 0;
Kpi.total_sent = 0;
Kpi.messageTypeCount = new HashMap<String, Integer>();
Kpi.total_success = 0;
Kpi.startSending = KafkaMsgSender.start_Time;
Kpi.stopSending = KafkaMsgSender.end_Time;
Expand Down Expand Up @@ -213,7 +215,6 @@ public void run()
t3 = t1 / t2;
long tps_last = Math.round(t3);
com.lc.df.kafka.client.Logger.logInfoMessage("stats1: " + "TPS(avg):" + tps_avg + ", TPS(last):" + tps_last + ", TIME: " + sending_time + ", sent:" + total_sent + ", ok:" + total_success + ", error:" + total_error + "(c/t/d/o: " + total_error_connect + "/" + total_error_timeout + "/" + total_discarded + "/" + total_error_unknown +"), avg_latency:" + avg_latency);
com.lc.df.kafka.client.Logger.logInfoMessage("stats: " + Kpi.getStats());
}
});

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/lc/df/kafka/client/KafkaMsgSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void run()

sendRequest(eventKey, newPayload);

stats.setSent();
stats.setSent(pl.getType());

spent = System.currentTimeMillis() - start;
count++;
Expand Down
47 changes: 42 additions & 5 deletions src/main/java/com/lc/df/kafka/client/Kpi.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;

import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
Expand All @@ -23,6 +26,7 @@ public class Kpi {
private static LinkedList<ReportHtml> reportHTML = new LinkedList<ReportHtml>();

public static int total_sent = 0;
public static Map<String, Integer> messageTypeCount = new HashMap<String, Integer>();
public static int total_success = 0;
public static int total_error = 0;
public static int total_error_timeout = 0;
Expand Down Expand Up @@ -77,11 +81,20 @@ public Kpi(String correlation) {
log = KafkaClient.getLogger();
this.correlation = correlation;
}
public void setSent() {
public void setSent(String messageType) {
sent = true;
timeSent = System.currentTimeMillis();
synchronized (lock_sent) {
total_sent++;
Integer count = messageTypeCount.get(messageType);
if (count == null)
{
messageTypeCount.put(messageType, 1);
}
else
{
messageTypeCount.put(messageType, count + 1);
}
}
}

Expand Down Expand Up @@ -183,6 +196,9 @@ private void logResult() {
}

public static String getStats() {
return getStats(false);
}
public static String getStats(boolean includeMessageCounts) {
long stopWatch = stopSending;

// if stopWatch = 0 it means threads are keep sending requests.
Expand Down Expand Up @@ -220,10 +236,31 @@ public static String getStats() {
KafkaClient.total_error_unknown = KafkaClient.total_error_unknown + total_error_unknown;
KafkaClient.avg_latency = KafkaClient.avg_latency + avg_latency;

return "TPS(avg):" + tps_avg + ", TPS(last):" + tps_last + ", TIME: " + sending_time + ", sent:" + total_sent
+ ", ok:" + total_success + ", error:" + total_error + "(c/t/d/o: " + total_error_connect + "/"
+ total_error_timeout + "/" + total_discarded + "/" + total_error_unknown + "), avg_latency:"
+ avg_latency;
String returnStr = "TPS(avg):" + tps_avg + ", TPS(last):" + tps_last + ", TIME: " + sending_time + ", sent:" + total_sent
+ ", ok:" + total_success + ", error:" + total_error + "(c/t/d/o: " + total_error_connect + "/"
+ total_error_timeout + "/" + total_discarded + "/" + total_error_unknown + "), avg_latency:"
+ avg_latency;

// If requested, add counts of each different message type. They are sorted in alphabetical
// order for convenience.
if (includeMessageCounts)
{
Stream<Map.Entry<String, Integer>> sorted = messageTypeCount.entrySet().stream()
.sorted(Map.Entry.comparingByKey());
Iterator<Map.Entry<String, Integer>> iter = sorted.iterator();
returnStr += "\n Message type counts:";
while (iter.hasNext()) {
Map.Entry<String, Integer> e = iter.next();
returnStr += String.format("\n %s=%d", e.getKey(), e.getValue());
}
Integer selectionUpdateCounts = messageTypeCount.get("selection-update");
if (selectionUpdateCounts != null)
{
returnStr += String.format("\n Percentage of messages for selection updates: %.1f%%", (float) selectionUpdateCounts / total_sent * 100);
}
}

return returnStr;
}

public long getLatency() {
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/com/lc/df/kafka/client/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ else if (sendResults && !resultsComplete){
long eID = getRandomLongId(KafkaClientConfig.eventid_min, KafkaClientConfig.eventid_max);
String eventKey = Long.toString(eID);
String newPayload = getNewPayload(payload, eventKey, null,null,null,null, correlationId);
return new KafkaPayload(eventKey,"update-event",newPayload);
return new KafkaPayload(eventKey,"event-update",newPayload);
}
if ("M".equals(kPayload.getType()))
{
Expand All @@ -368,7 +368,7 @@ else if (sendResults && !resultsComplete){
String eventKey = Long.toString(eID);
String marketKey = Integer.toString(mID) + eventKey;
String newPayload = getNewPayload(payload, eventKey, marketKey,null,null,null, correlationId);
return new KafkaPayload(eventKey,"update-market",newPayload);
return new KafkaPayload(eventKey,"market-update",newPayload);
}
if ("S".equals(kPayload.getType()))
{
Expand All @@ -379,7 +379,7 @@ else if (sendResults && !resultsComplete){
String marketKey = Integer.toString(mID) + eventKey;
String selectionKey = Integer.toString(sID) + marketKey;
String newPayload = getNewPayload(payload, eventKey, marketKey,selectionKey,null,null, correlationId);
return new KafkaPayload(eventKey,"update-selection",newPayload);
return new KafkaPayload(eventKey,"selection-update",newPayload);
}
}
}
Expand Down

0 comments on commit ab23125

Please sign in to comment.