Skip to content

Commit

Permalink
[fix][test] Fix PerformanceProducer send count error (apache#21706)
Browse files Browse the repository at this point in the history
  • Loading branch information
Technoboy- authored Dec 12, 2023
1 parent 51fc406 commit 19b9344
Showing 1 changed file with 5 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ private static void runProducer(int producerId,
}
}
// Send messages on all topics/producers
long totalSent = 0;
AtomicLong totalSent = new AtomicLong(0);
AtomicLong numMessageSend = new AtomicLong(0);
Semaphore numMsgPerTxnLimit = new Semaphore(arguments.numMessagesPerTransaction);
while (true) {
Expand All @@ -587,7 +587,7 @@ private static void runProducer(int producerId,
}

if (numMessages > 0) {
if (totalSent++ >= numMessages) {
if (totalSent.get() >= numMessages) {
log.info("------------- DONE (reached the maximum number: {} of production) --------------"
, numMessages);
doneLatch.countDown();
Expand All @@ -604,7 +604,7 @@ private static void runProducer(int producerId,

if (arguments.payloadFilename != null) {
if (messageFormatter != null) {
payloadData = messageFormatter.formatMessage(arguments.producerName, totalSent,
payloadData = messageFormatter.formatMessage(arguments.producerName, totalSent.get(),
payloadByteList.get(ThreadLocalRandom.current().nextInt(payloadByteList.size())));
} else {
payloadData = payloadByteList.get(
Expand Down Expand Up @@ -642,13 +642,13 @@ private static void runProducer(int producerId,
if (msgKeyMode == MessageKeyGenerationMode.random) {
messageBuilder.key(String.valueOf(ThreadLocalRandom.current().nextInt()));
} else if (msgKeyMode == MessageKeyGenerationMode.autoIncrement) {
messageBuilder.key(String.valueOf(totalSent));
messageBuilder.key(String.valueOf(totalSent.get()));
}
PulsarClient pulsarClient = client;
messageBuilder.sendAsync().thenRun(() -> {
bytesSent.add(payloadData.length);
messagesSent.increment();

totalSent.incrementAndGet();
totalMessagesSent.increment();
totalBytesSent.add(payloadData.length);

Expand Down

0 comments on commit 19b9344

Please sign in to comment.