diff --git a/.gitignore b/.gitignore index ea25619..1d147f2 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,4 @@ logs target/ .classpath .project -.settings +.settings/ diff --git a/src/main/java/com/lc/df/kafka/client/KafkaClient.java b/src/main/java/com/lc/df/kafka/client/KafkaClient.java index e44e637..8d30a3a 100644 --- a/src/main/java/com/lc/df/kafka/client/KafkaClient.java +++ b/src/main/java/com/lc/df/kafka/client/KafkaClient.java @@ -1,5 +1,6 @@ package com.lc.df.kafka.client; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Properties; import java.util.concurrent.CountDownLatch; @@ -61,7 +62,7 @@ public void run() e.printStackTrace(); } - com.lc.df.kafka.client.Logger.logInfoMessage("Printing Final Stats: ........" + Kpi.getStats()); + com.lc.df.kafka.client.Logger.logInfoMessage("Printing Final Stats: ........" + Kpi.getStats(true)); } private void prepareConnectivity() throws Exception @@ -176,6 +177,7 @@ private static void resetStats() { Kpi.total_error_timeout = 0; Kpi.total_error_unknown = 0; Kpi.total_sent = 0; + Kpi.messageTypeCount = new HashMap(); Kpi.total_success = 0; Kpi.startSending = KafkaMsgSender.start_Time; Kpi.stopSending = KafkaMsgSender.end_Time; @@ -213,7 +215,6 @@ public void run() t3 = t1 / t2; long tps_last = Math.round(t3); com.lc.df.kafka.client.Logger.logInfoMessage("stats1: " + "TPS(avg):" + tps_avg + ", TPS(last):" + tps_last + ", TIME: " + sending_time + ", sent:" + total_sent + ", ok:" + total_success + ", error:" + total_error + "(c/t/d/o: " + total_error_connect + "/" + total_error_timeout + "/" + total_discarded + "/" + total_error_unknown +"), avg_latency:" + avg_latency); - com.lc.df.kafka.client.Logger.logInfoMessage("stats: " + Kpi.getStats()); } }); diff --git a/src/main/java/com/lc/df/kafka/client/KafkaMsgSender.java b/src/main/java/com/lc/df/kafka/client/KafkaMsgSender.java index 1c0d2ab..03439e8 100644 --- a/src/main/java/com/lc/df/kafka/client/KafkaMsgSender.java +++ b/src/main/java/com/lc/df/kafka/client/KafkaMsgSender.java @@ -88,7 +88,7 @@ public void run() sendRequest(eventKey, newPayload); - stats.setSent(); + stats.setSent(pl.getType()); spent = System.currentTimeMillis() - start; count++; diff --git a/src/main/java/com/lc/df/kafka/client/Kpi.java b/src/main/java/com/lc/df/kafka/client/Kpi.java index 2ad8aa9..92b3ab5 100755 --- a/src/main/java/com/lc/df/kafka/client/Kpi.java +++ b/src/main/java/com/lc/df/kafka/client/Kpi.java @@ -5,10 +5,13 @@ import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; +import java.util.HashMap; import java.util.Hashtable; import java.util.Iterator; import java.util.LinkedList; +import java.util.Map; import java.util.Set; +import java.util.stream.Stream; import org.apache.commons.io.FileUtils; import org.apache.log4j.Logger; @@ -23,6 +26,7 @@ public class Kpi { private static LinkedList reportHTML = new LinkedList(); public static int total_sent = 0; + public static Map messageTypeCount = new HashMap(); public static int total_success = 0; public static int total_error = 0; public static int total_error_timeout = 0; @@ -77,11 +81,20 @@ public Kpi(String correlation) { log = KafkaClient.getLogger(); this.correlation = correlation; } - public void setSent() { + public void setSent(String messageType) { sent = true; timeSent = System.currentTimeMillis(); synchronized (lock_sent) { total_sent++; + Integer count = messageTypeCount.get(messageType); + if (count == null) + { + messageTypeCount.put(messageType, 1); + } + else + { + messageTypeCount.put(messageType, count + 1); + } } } @@ -183,6 +196,9 @@ private void logResult() { } public static String getStats() { + return getStats(false); + } + public static String getStats(boolean includeMessageCounts) { long stopWatch = stopSending; // if stopWatch = 0 it means threads are keep sending requests. @@ -220,10 +236,31 @@ public static String getStats() { KafkaClient.total_error_unknown = KafkaClient.total_error_unknown + total_error_unknown; KafkaClient.avg_latency = KafkaClient.avg_latency + avg_latency; - return "TPS(avg):" + tps_avg + ", TPS(last):" + tps_last + ", TIME: " + sending_time + ", sent:" + total_sent - + ", ok:" + total_success + ", error:" + total_error + "(c/t/d/o: " + total_error_connect + "/" - + total_error_timeout + "/" + total_discarded + "/" + total_error_unknown + "), avg_latency:" - + avg_latency; + String returnStr = "TPS(avg):" + tps_avg + ", TPS(last):" + tps_last + ", TIME: " + sending_time + ", sent:" + total_sent + + ", ok:" + total_success + ", error:" + total_error + "(c/t/d/o: " + total_error_connect + "/" + + total_error_timeout + "/" + total_discarded + "/" + total_error_unknown + "), avg_latency:" + + avg_latency; + + // If requested, add counts of each different message type. They are sorted in alphabetical + // order for convenience. + if (includeMessageCounts) + { + Stream> sorted = messageTypeCount.entrySet().stream() + .sorted(Map.Entry.comparingByKey()); + Iterator> iter = sorted.iterator(); + returnStr += "\n Message type counts:"; + while (iter.hasNext()) { + Map.Entry e = iter.next(); + returnStr += String.format("\n %s=%d", e.getKey(), e.getValue()); + } + Integer selectionUpdateCounts = messageTypeCount.get("selection-update"); + if (selectionUpdateCounts != null) + { + returnStr += String.format("\n Percentage of messages for selection updates: %.1f%%", (float) selectionUpdateCounts / total_sent * 100); + } + } + + return returnStr; } public long getLatency() { diff --git a/src/main/java/com/lc/df/kafka/client/Utils.java b/src/main/java/com/lc/df/kafka/client/Utils.java index 0ed7934..4924bb8 100644 --- a/src/main/java/com/lc/df/kafka/client/Utils.java +++ b/src/main/java/com/lc/df/kafka/client/Utils.java @@ -97,7 +97,7 @@ public static String getNewPayload(String payload,String eventid,String marketid if (selectionid != null) { int num = getRandomIntInRange(PRICE_NUM_LIMIT); - int den = getRandomIntInRange(PRICE_DEN_LIMIT - 1) + 1; //prevent divide by zero + int den = getRandomIntInRange(PRICE_DEN_LIMIT - 1) + 1; // prevent denominator of zero double decimal = 1 + ((double)num/(double)den); DecimalFormat f = new DecimalFormat("##.00"); pl = payload.replaceAll("\\$\\{selection\\}", selectionid) @@ -237,15 +237,16 @@ else if (currentMarketPrefixID > -1 && currentSelectionPrefixID > -1) String marketID = marketIDPrefixString + eventID; String selectionIDPrefixString = Integer.toString(currentSelectionPrefixID); String selectionID = selectionIDPrefixString + marketID; - if (currentSelectionPrefixID == KafkaClientConfig.selectionid_prefix_max) + currentSelectionPrefixID++; + if (currentSelectionPrefixID > KafkaClientConfig.selectionid_prefix_max) { - currentMarketPrefixID = -1; currentSelectionPrefixID = -1; - currentEventID++; - } - else - { - currentSelectionPrefixID++; + currentMarketPrefixID++; + if (currentMarketPrefixID > KafkaClientConfig.marketid_prefix_max) + { + currentMarketPrefixID = -1; + currentEventID++; + } } createsCompleted = currentEventID > KafkaClientConfig.eventid_max; return new KafkaPayload(eventID,"selection-create",getNewPayload(SEL_CREATE_PAYLOAD, eventID, marketID, selectionID,marketIDPrefixString,selectionIDPrefixString, correlationId)); @@ -308,6 +309,30 @@ else if (currentMarketPrefixID > -1 && currentSelectionPrefixID > -1) return null; } + private static long getRandomLongId(long min, long max) + { + if (min < max) + { + return ThreadLocalRandom.current().nextLong(min, max); + } + else + { + // Handle the case where the min and the max are the same. + return min; + } + } + private static int getRandomIntId(int min, int max) + { + if (min < max) + { + return ThreadLocalRandom.current().nextInt(min, max); + } + else + { + // Handle the case where the min and the max are the same. + return min; + } + } public static KafkaPayload getRandomPayload(String correlationId) { if (resultsComplete){ @@ -331,30 +356,30 @@ else if (sendResults && !resultsComplete){ String payload = kPayload.getPayload(); if ("E".equals(kPayload.getType())) { - long eID = ThreadLocalRandom.current().nextLong(KafkaClientConfig.eventid_min, KafkaClientConfig.eventid_max); + long eID = getRandomLongId(KafkaClientConfig.eventid_min, KafkaClientConfig.eventid_max); String eventKey = Long.toString(eID); String newPayload = getNewPayload(payload, eventKey, null,null,null,null, correlationId); - return new KafkaPayload(eventKey,"update-event",newPayload); + return new KafkaPayload(eventKey,"event-update",newPayload); } if ("M".equals(kPayload.getType())) { - long eID = ThreadLocalRandom.current().nextLong(KafkaClientConfig.eventid_min, KafkaClientConfig.eventid_max); - int mID = ThreadLocalRandom.current().nextInt(KafkaClientConfig.marketid_prefix_min, KafkaClientConfig.marketid_prefix_max); + long eID = getRandomLongId(KafkaClientConfig.eventid_min, KafkaClientConfig.eventid_max); + int mID = getRandomIntId(KafkaClientConfig.marketid_prefix_min, KafkaClientConfig.marketid_prefix_max); String eventKey = Long.toString(eID); String marketKey = Integer.toString(mID) + eventKey; String newPayload = getNewPayload(payload, eventKey, marketKey,null,null,null, correlationId); - return new KafkaPayload(eventKey,"update-market",newPayload); + return new KafkaPayload(eventKey,"market-update",newPayload); } if ("S".equals(kPayload.getType())) { - long eID = ThreadLocalRandom.current().nextLong(KafkaClientConfig.eventid_min, KafkaClientConfig.eventid_max); - int mID = ThreadLocalRandom.current().nextInt(KafkaClientConfig.marketid_prefix_min, KafkaClientConfig.marketid_prefix_max); - int sID = ThreadLocalRandom.current().nextInt(KafkaClientConfig.selectionid_prefix_min, KafkaClientConfig.selectionid_prefix_max); + long eID = getRandomLongId(KafkaClientConfig.eventid_min, KafkaClientConfig.eventid_max); + int mID = getRandomIntId(KafkaClientConfig.marketid_prefix_min, KafkaClientConfig.marketid_prefix_max); + int sID = getRandomIntId(KafkaClientConfig.selectionid_prefix_min, KafkaClientConfig.selectionid_prefix_max); String eventKey = Long.toString(eID); String marketKey = Integer.toString(mID) + eventKey; String selectionKey = Integer.toString(sID) + marketKey; String newPayload = getNewPayload(payload, eventKey, marketKey,selectionKey,null,null, correlationId); - return new KafkaPayload(eventKey,"update-selection",newPayload); + return new KafkaPayload(eventKey,"selection-update",newPayload); } } } @@ -376,4 +401,4 @@ public static synchronized void startResults(){ currentSelectionPrefixID = KafkaClientConfig.selectionid_prefix_min; } } -} \ No newline at end of file +}