Skip to content

Commit

Permalink
[fix] [Perf] PerformanceProducer do not produce expected number of me…
Browse files Browse the repository at this point in the history
…ssages. (#19775)

Co-authored-by: tison <[email protected]>
  • Loading branch information
2 people authored and Technoboy- committed Jul 3, 2023
1 parent df440ae commit 46d057c
Showing 1 changed file with 12 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,7 @@ public static void main(String[] args) throws Exception {

oldTime = now;
}
PerfClientUtils.exit(0);
}

private static void executorShutdownNow() {
Expand Down Expand Up @@ -553,6 +554,7 @@ private static void runProducer(int producerId,
byte[] payloadBytes,
CountDownLatch doneLatch) {
PulsarClient client = null;
boolean produceEnough = false;
try {
// Now processing command line arguments
List<Future<Producer<byte[]>>> futures = new ArrayList<>();
Expand Down Expand Up @@ -617,14 +619,18 @@ private static void runProducer(int producerId,
AtomicLong numMessageSend = new AtomicLong(0);
Semaphore numMsgPerTxnLimit = new Semaphore(arguments.numMessagesPerTransaction);
while (true) {
if (produceEnough) {
break;
}
for (Producer<byte[]> producer : producers) {
if (arguments.testTime > 0) {
if (System.nanoTime() > testEndTime) {
log.info("------------- DONE (reached the maximum duration: [{} seconds] of production) "
+ "--------------", arguments.testTime);
doneLatch.countDown();
Thread.sleep(5000);
PerfClientUtils.exit(0);
produceEnough = true;
break;
}
}

Expand All @@ -634,7 +640,8 @@ private static void runProducer(int producerId,
, numMessages);
doneLatch.countDown();
Thread.sleep(5000);
PerfClientUtils.exit(0);
produceEnough = true;
break;
}
}
rateLimiter.acquire();
Expand Down Expand Up @@ -763,10 +770,12 @@ private static void runProducer(int producerId,
} catch (Throwable t) {
log.error("Got error", t);
} finally {
if (!produceEnough) {
doneLatch.countDown();
}
if (null != client) {
try {
client.close();
PerfClientUtils.exit(1);
} catch (PulsarClientException e) {
log.error("Failed to close test client", e);
}
Expand Down

0 comments on commit 46d057c

Please sign in to comment.