From 8fc4bed8531d773da26304faabb78986efe41693 Mon Sep 17 00:00:00 2001 From: Robin Zimmermann Date: Tue, 20 Mar 2018 12:49:19 -0700 Subject: [PATCH 1/2] Output count of message types in final stats report --- .../com/lc/df/kafka/client/KafkaClient.java | 3 +- .../lc/df/kafka/client/KafkaMsgSender.java | 2 +- src/main/java/com/lc/df/kafka/client/Kpi.java | 47 +++++++++++++++++-- .../java/com/lc/df/kafka/client/Utils.java | 6 +-- 4 files changed, 48 insertions(+), 10 deletions(-) diff --git a/src/main/java/com/lc/df/kafka/client/KafkaClient.java b/src/main/java/com/lc/df/kafka/client/KafkaClient.java index e44e637..df90b5e 100644 --- a/src/main/java/com/lc/df/kafka/client/KafkaClient.java +++ b/src/main/java/com/lc/df/kafka/client/KafkaClient.java @@ -1,5 +1,6 @@ package com.lc.df.kafka.client; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Properties; import java.util.concurrent.CountDownLatch; @@ -176,6 +177,7 @@ private static void resetStats() { Kpi.total_error_timeout = 0; Kpi.total_error_unknown = 0; Kpi.total_sent = 0; + Kpi.messageTypeCount = new HashMap(); Kpi.total_success = 0; Kpi.startSending = KafkaMsgSender.start_Time; Kpi.stopSending = KafkaMsgSender.end_Time; @@ -213,7 +215,6 @@ public void run() t3 = t1 / t2; long tps_last = Math.round(t3); com.lc.df.kafka.client.Logger.logInfoMessage("stats1: " + "TPS(avg):" + tps_avg + ", TPS(last):" + tps_last + ", TIME: " + sending_time + ", sent:" + total_sent + ", ok:" + total_success + ", error:" + total_error + "(c/t/d/o: " + total_error_connect + "/" + total_error_timeout + "/" + total_discarded + "/" + total_error_unknown +"), avg_latency:" + avg_latency); - com.lc.df.kafka.client.Logger.logInfoMessage("stats: " + Kpi.getStats()); } }); diff --git a/src/main/java/com/lc/df/kafka/client/KafkaMsgSender.java b/src/main/java/com/lc/df/kafka/client/KafkaMsgSender.java index 1c0d2ab..03439e8 100644 --- a/src/main/java/com/lc/df/kafka/client/KafkaMsgSender.java +++ b/src/main/java/com/lc/df/kafka/client/KafkaMsgSender.java @@ -88,7 +88,7 @@ public void run() sendRequest(eventKey, newPayload); - stats.setSent(); + stats.setSent(pl.getType()); spent = System.currentTimeMillis() - start; count++; diff --git a/src/main/java/com/lc/df/kafka/client/Kpi.java b/src/main/java/com/lc/df/kafka/client/Kpi.java index 2ad8aa9..92b3ab5 100755 --- a/src/main/java/com/lc/df/kafka/client/Kpi.java +++ b/src/main/java/com/lc/df/kafka/client/Kpi.java @@ -5,10 +5,13 @@ import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; +import java.util.HashMap; import java.util.Hashtable; import java.util.Iterator; import java.util.LinkedList; +import java.util.Map; import java.util.Set; +import java.util.stream.Stream; import org.apache.commons.io.FileUtils; import org.apache.log4j.Logger; @@ -23,6 +26,7 @@ public class Kpi { private static LinkedList reportHTML = new LinkedList(); public static int total_sent = 0; + public static Map messageTypeCount = new HashMap(); public static int total_success = 0; public static int total_error = 0; public static int total_error_timeout = 0; @@ -77,11 +81,20 @@ public Kpi(String correlation) { log = KafkaClient.getLogger(); this.correlation = correlation; } - public void setSent() { + public void setSent(String messageType) { sent = true; timeSent = System.currentTimeMillis(); synchronized (lock_sent) { total_sent++; + Integer count = messageTypeCount.get(messageType); + if (count == null) + { + messageTypeCount.put(messageType, 1); + } + else + { + messageTypeCount.put(messageType, count + 1); + } } } @@ -183,6 +196,9 @@ private void logResult() { } public static String getStats() { + return getStats(false); + } + public static String getStats(boolean includeMessageCounts) { long stopWatch = stopSending; // if stopWatch = 0 it means threads are keep sending requests. @@ -220,10 +236,31 @@ public static String getStats() { KafkaClient.total_error_unknown = KafkaClient.total_error_unknown + total_error_unknown; KafkaClient.avg_latency = KafkaClient.avg_latency + avg_latency; - return "TPS(avg):" + tps_avg + ", TPS(last):" + tps_last + ", TIME: " + sending_time + ", sent:" + total_sent - + ", ok:" + total_success + ", error:" + total_error + "(c/t/d/o: " + total_error_connect + "/" - + total_error_timeout + "/" + total_discarded + "/" + total_error_unknown + "), avg_latency:" - + avg_latency; + String returnStr = "TPS(avg):" + tps_avg + ", TPS(last):" + tps_last + ", TIME: " + sending_time + ", sent:" + total_sent + + ", ok:" + total_success + ", error:" + total_error + "(c/t/d/o: " + total_error_connect + "/" + + total_error_timeout + "/" + total_discarded + "/" + total_error_unknown + "), avg_latency:" + + avg_latency; + + // If requested, add counts of each different message type. They are sorted in alphabetical + // order for convenience. + if (includeMessageCounts) + { + Stream> sorted = messageTypeCount.entrySet().stream() + .sorted(Map.Entry.comparingByKey()); + Iterator> iter = sorted.iterator(); + returnStr += "\n Message type counts:"; + while (iter.hasNext()) { + Map.Entry e = iter.next(); + returnStr += String.format("\n %s=%d", e.getKey(), e.getValue()); + } + Integer selectionUpdateCounts = messageTypeCount.get("selection-update"); + if (selectionUpdateCounts != null) + { + returnStr += String.format("\n Percentage of messages for selection updates: %.1f%%", (float) selectionUpdateCounts / total_sent * 100); + } + } + + return returnStr; } public long getLatency() { diff --git a/src/main/java/com/lc/df/kafka/client/Utils.java b/src/main/java/com/lc/df/kafka/client/Utils.java index e71cf77..4924bb8 100644 --- a/src/main/java/com/lc/df/kafka/client/Utils.java +++ b/src/main/java/com/lc/df/kafka/client/Utils.java @@ -359,7 +359,7 @@ else if (sendResults && !resultsComplete){ long eID = getRandomLongId(KafkaClientConfig.eventid_min, KafkaClientConfig.eventid_max); String eventKey = Long.toString(eID); String newPayload = getNewPayload(payload, eventKey, null,null,null,null, correlationId); - return new KafkaPayload(eventKey,"update-event",newPayload); + return new KafkaPayload(eventKey,"event-update",newPayload); } if ("M".equals(kPayload.getType())) { @@ -368,7 +368,7 @@ else if (sendResults && !resultsComplete){ String eventKey = Long.toString(eID); String marketKey = Integer.toString(mID) + eventKey; String newPayload = getNewPayload(payload, eventKey, marketKey,null,null,null, correlationId); - return new KafkaPayload(eventKey,"update-market",newPayload); + return new KafkaPayload(eventKey,"market-update",newPayload); } if ("S".equals(kPayload.getType())) { @@ -379,7 +379,7 @@ else if (sendResults && !resultsComplete){ String marketKey = Integer.toString(mID) + eventKey; String selectionKey = Integer.toString(sID) + marketKey; String newPayload = getNewPayload(payload, eventKey, marketKey,selectionKey,null,null, correlationId); - return new KafkaPayload(eventKey,"update-selection",newPayload); + return new KafkaPayload(eventKey,"selection-update",newPayload); } } } From a7678713d2ab9b6915cbeb1da61349f66395a68d Mon Sep 17 00:00:00 2001 From: Robin Zimmermann Date: Tue, 20 Mar 2018 14:28:05 -0700 Subject: [PATCH 2/2] Fix to include message counts in final report --- src/main/java/com/lc/df/kafka/client/KafkaClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/lc/df/kafka/client/KafkaClient.java b/src/main/java/com/lc/df/kafka/client/KafkaClient.java index df90b5e..8d30a3a 100644 --- a/src/main/java/com/lc/df/kafka/client/KafkaClient.java +++ b/src/main/java/com/lc/df/kafka/client/KafkaClient.java @@ -62,7 +62,7 @@ public void run() e.printStackTrace(); } - com.lc.df.kafka.client.Logger.logInfoMessage("Printing Final Stats: ........" + Kpi.getStats()); + com.lc.df.kafka.client.Logger.logInfoMessage("Printing Final Stats: ........" + Kpi.getStats(true)); } private void prepareConnectivity() throws Exception