diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java index 6441230fad87b..45b4ebf6e17cc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java @@ -33,6 +33,7 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.client.impl.OpSendMsgStats; import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.SendCallback; @@ -173,7 +174,7 @@ private static final class ProducerSendCallback implements SendCallback { private MessageImpl msg; @Override - public void sendComplete(Exception exception) { + public void sendComplete(Throwable exception, OpSendMsgStats opSendMsgStats) { if (exception != null) { log.error("[{}] Error producing on remote broker", replicator.replicatorId, exception); } else { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index 33e883ab9406a..b3d7546beed81 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -60,6 +60,7 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.client.impl.OpSendMsgStats; import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.SendCallback; @@ -377,7 +378,7 @@ protected static final class ProducerSendCallback implements SendCallback { private MessageImpl msg; @Override - public void sendComplete(Exception exception) { + public void sendComplete(Throwable exception, OpSendMsgStats opSendMsgStats) { if (exception != null && !(exception instanceof PulsarClientException.InvalidMessageException)) { log.error("[{}] Error producing on remote broker", replicator.replicatorId, exception); // cursor should be rewinded since it was incremented when readMoreEntries diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProduceWithMessageIdTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProduceWithMessageIdTest.java index b8efdeb99696a..45f9a9c52e871 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProduceWithMessageIdTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProduceWithMessageIdTest.java @@ -18,14 +18,18 @@ */ package org.apache.pulsar.client.impl; +import static org.apache.pulsar.client.impl.AbstractBatchMessageContainer.INITIAL_BATCH_BUFFER_SIZE; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MockBrokerService; +import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.api.proto.MessageMetadata; @@ -38,13 +42,20 @@ @Test(groups = "broker-impl") @Slf4j -public class ProduceWithMessageIdTest { +public class ProduceWithMessageIdTest extends ProducerConsumerBase { MockBrokerService mockBrokerService; @BeforeClass(alwaysRun = true) - public void setup() { + public void setup() throws Exception { mockBrokerService = new MockBrokerService(); mockBrokerService.start(); + super.internalSetup(); + super.producerBaseSetup(); + } + + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); } @AfterClass(alwaysRun = true) @@ -86,7 +97,7 @@ public void testSend() throws Exception { AtomicBoolean result = new AtomicBoolean(false); producer.sendAsync(msg, new SendCallback() { @Override - public void sendComplete(Exception e) { + public void sendComplete(Throwable e, OpSendMsgStats opSendMsgStats) { log.info("sendComplete", e); result.set(e == null); } @@ -115,4 +126,72 @@ public CompletableFuture getFuture() { // the result is true only if broker received right message id. Awaitility.await().untilTrue(result); } + + @Test + public void sendWithCallBack() throws Exception { + + int batchSize = 10; + + String topic = "persistent://public/default/testSendWithCallBack"; + ProducerImpl producer = + (ProducerImpl) pulsarClient.newProducer().topic(topic) + .enableBatching(true) + .batchingMaxMessages(batchSize) + .create(); + + CountDownLatch cdl = new CountDownLatch(1); + AtomicReference sendMsgStats = new AtomicReference<>(); + SendCallback sendComplete = new SendCallback() { + @Override + public void sendComplete(Throwable e, OpSendMsgStats opSendMsgStats) { + log.info("sendComplete", e); + if (e == null){ + cdl.countDown(); + sendMsgStats.set(opSendMsgStats); + } + } + + @Override + public void addCallback(MessageImpl msg, SendCallback scb) { + + } + + @Override + public SendCallback getNextSendCallback() { + return null; + } + + @Override + public MessageImpl getNextMessage() { + return null; + } + + @Override + public CompletableFuture getFuture() { + return null; + } + }; + int totalReadabled = 0; + int totalUncompressedSize = 0; + for (int i = 0; i < batchSize; i++) { + MessageMetadata metadata = new MessageMetadata(); + ByteBuffer buffer = ByteBuffer.wrap("data".getBytes(StandardCharsets.UTF_8)); + MessageImpl msg = MessageImpl.create(metadata, buffer, Schema.BYTES, topic); + msg.getDataBuffer().retain(); + totalReadabled += msg.getDataBuffer().readableBytes(); + totalUncompressedSize += msg.getUncompressedSize(); + producer.sendAsync(msg, sendComplete); + } + + cdl.await(); + OpSendMsgStats opSendMsgStats = sendMsgStats.get(); + Assert.assertEquals(opSendMsgStats.getUncompressedSize(), totalUncompressedSize + INITIAL_BATCH_BUFFER_SIZE); + Assert.assertEquals(opSendMsgStats.getSequenceId(), 0); + Assert.assertEquals(opSendMsgStats.getRetryCount(), 1); + Assert.assertEquals(opSendMsgStats.getBatchSizeByte(), totalReadabled); + Assert.assertEquals(opSendMsgStats.getNumMessagesInBatch(), batchSize); + Assert.assertEquals(opSendMsgStats.getHighestSequenceId(), batchSize-1); + Assert.assertEquals(opSendMsgStats.getTotalChunks(), 0); + Assert.assertEquals(opSendMsgStats.getChunkId(), -1); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java index a3c9d1bc9ab48..44f1fb274655a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java @@ -229,7 +229,7 @@ public void discard(Exception ex) { try { // Need to protect ourselves from any exception being thrown in the future handler from the application if (firstCallback != null) { - firstCallback.sendComplete(ex); + firstCallback.sendComplete(ex, null); } if (batchedMessageMetadataAndPayload != null) { ReferenceCountUtil.safeRelease(batchedMessageMetadataAndPayload); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/OpSendMsgStats.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/OpSendMsgStats.java new file mode 100644 index 0000000000000..dc28df50f2886 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/OpSendMsgStats.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + + +public interface OpSendMsgStats { + long getUncompressedSize(); + + long getSequenceId(); + + int getRetryCount(); + + long getBatchSizeByte(); + + int getNumMessagesInBatch(); + + long getHighestSequenceId(); + + int getTotalChunks(); + + int getChunkId(); +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/OpSendMsgStatsImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/OpSendMsgStatsImpl.java new file mode 100644 index 0000000000000..41bb742776caa --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/OpSendMsgStatsImpl.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import lombok.Builder; + +@Builder +public class OpSendMsgStatsImpl implements OpSendMsgStats { + private long uncompressedSize; + private long sequenceId; + private int retryCount; + private long batchSizeByte; + private int numMessagesInBatch; + private long highestSequenceId; + private int totalChunks; + private int chunkId; + + @Override + public long getUncompressedSize() { + return uncompressedSize; + } + + @Override + public long getSequenceId() { + return sequenceId; + } + + @Override + public int getRetryCount() { + return retryCount; + } + + @Override + public long getBatchSizeByte() { + return batchSizeByte; + } + + @Override + public int getNumMessagesInBatch() { + return numMessagesInBatch; + } + + @Override + public long getHighestSequenceId() { + return highestSequenceId; + } + + @Override + public int getTotalChunks() { + return totalChunks; + } + + @Override + public int getChunkId() { + return chunkId; + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 6d5a81454631f..5c46057ae308d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -414,7 +414,7 @@ public MessageImpl getNextMessage() { } @Override - public void sendComplete(Exception e) { + public void sendComplete(Throwable e, OpSendMsgStats opSendMsgStats) { SendCallback loopingCallback = this; MessageImpl loopingMsg = currentMsg; while (loopingCallback != null) { @@ -424,7 +424,7 @@ public void sendComplete(Exception e) { } } - private void onSendComplete(Exception e, SendCallback sendCallback, MessageImpl msg) { + private void onSendComplete(Throwable e, SendCallback sendCallback, MessageImpl msg) { long createdAt = (sendCallback instanceof ProducerImpl.DefaultSendMessageCallback) ? ((DefaultSendMessageCallback) sendCallback).createdAt : this.createdAt; long latencyNanos = System.nanoTime() - createdAt; @@ -842,7 +842,7 @@ private void tryRegisterSchema(ClientCnx cnx, MessageImpl msg, SendCallback call log.warn("[{}] [{}] GetOrCreateSchema error", topic, producerName, t); if (t instanceof PulsarClientException.IncompatibleSchemaException) { msg.setSchemaState(MessageImpl.SchemaState.Broken); - callback.sendComplete((PulsarClientException.IncompatibleSchemaException) t); + callback.sendComplete(t, null); } } else { log.info("[{}] [{}] GetOrCreateSchema succeed", topic, producerName); @@ -985,19 +985,19 @@ private boolean isValidProducerState(SendCallback callback, long sequenceId) { case Closing: case Closed: callback.sendComplete( - new PulsarClientException.AlreadyClosedException("Producer already closed", sequenceId)); + new PulsarClientException.AlreadyClosedException("Producer already closed", sequenceId), null); return false; case ProducerFenced: - callback.sendComplete(new PulsarClientException.ProducerFencedException("Producer was fenced")); + callback.sendComplete(new PulsarClientException.ProducerFencedException("Producer was fenced"), null); return false; case Terminated: callback.sendComplete( - new PulsarClientException.TopicTerminatedException("Topic was terminated", sequenceId)); + new PulsarClientException.TopicTerminatedException("Topic was terminated", sequenceId), null); return false; case Failed: case Uninitialized: default: - callback.sendComplete(new PulsarClientException.NotConnectedException(sequenceId)); + callback.sendComplete(new PulsarClientException.NotConnectedException(sequenceId), null); return false; } } @@ -1012,20 +1012,20 @@ private boolean canEnqueueRequest(SendCallback callback, long sequenceId, int pa } else { if (!semaphore.map(Semaphore::tryAcquire).orElse(true)) { callback.sendComplete(new PulsarClientException.ProducerQueueIsFullError( - "Producer send queue is full", sequenceId)); + "Producer send queue is full", sequenceId), null); return false; } if (!client.getMemoryLimitController().tryReserveMemory(payloadSize)) { semaphore.ifPresent(Semaphore::release); callback.sendComplete(new PulsarClientException.MemoryBufferIsFullError( - "Client memory buffer is full", sequenceId)); + "Client memory buffer is full", sequenceId), null); return false; } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); - callback.sendComplete(new PulsarClientException(e, sequenceId)); + callback.sendComplete(new PulsarClientException(e, sequenceId), null); return false; } @@ -1302,7 +1302,7 @@ private void releaseSemaphoreForSendOp(OpSendMsg op) { private void completeCallbackAndReleaseSemaphore(long payloadSize, SendCallback callback, Exception exception) { semaphore.ifPresent(Semaphore::release); client.getMemoryLimitController().releaseMemory(payloadSize); - callback.sendComplete(exception); + callback.sendComplete(exception, null); } /** @@ -1595,7 +1595,17 @@ void sendComplete(final Exception e) { rpcLatencyHistogram.recordFailure(now - this.lastSentAt); } - callback.sendComplete(finalEx); + OpSendMsgStats opSendMsgStats = OpSendMsgStatsImpl.builder() + .uncompressedSize(uncompressedSize) + .sequenceId(sequenceId) + .retryCount(retryCount) + .batchSizeByte(batchSizeByte) + .numMessagesInBatch(numMessagesInBatch) + .highestSequenceId(highestSequenceId) + .totalChunks(totalChunks) + .chunkId(chunkId) + .build(); + callback.sendComplete(finalEx, opSendMsgStats); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SendCallback.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SendCallback.java index 369bb34a29a79..f55d7ae79129c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SendCallback.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SendCallback.java @@ -20,18 +20,21 @@ import java.util.concurrent.CompletableFuture; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.common.classification.InterfaceStability; /** * */ +@InterfaceStability.Evolving public interface SendCallback { /** * invoked when send operation completes. * * @param e + * @param opSendMsgStats stats associated with the send operation */ - void sendComplete(Exception e); + void sendComplete(Throwable e, OpSendMsgStats opSendMsgStats); /** * used to specify a callback to be invoked on completion of a send operation for individual messages sent in a