diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java index cac1e66b53f79..dbe86ebcdabcb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java @@ -89,6 +89,18 @@ default void consumerCreated(ServerCnx cnx, Map metadata) { } + /** + * Intercept message when broker receive a send request. + * + * @param headersAndPayload entry's header and payload + * @param publishContext Publish Context + */ + default void onMessagePublish(Producer producer, + ByteBuf headersAndPayload, + Topic.PublishContext publishContext) { + + } + /** * Intercept after a message is produced. * diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java index 0f8c182767c5d..8904bab03077a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java @@ -63,6 +63,14 @@ public void beforeSendMessage(Subscription subscription, } } + @Override + public void onMessagePublish(Producer producer, ByteBuf headersAndPayload, + Topic.PublishContext publishContext) { + try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { + this.interceptor.onMessagePublish(producer, headersAndPayload, publishContext); + } + } + @Override public void producerCreated(ServerCnx cnx, Producer producer, Map metadata){ diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index 6ad07a70a3796..9b1f57de2ebaa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -249,18 +249,24 @@ public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf he private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, long batchSize, boolean isChunked, boolean isMarker) { - topic.publishMessage(headersAndPayload, - MessagePublishContext.get(this, sequenceId, msgIn, - headersAndPayload.readableBytes(), batchSize, - isChunked, System.nanoTime(), isMarker)); + MessagePublishContext messagePublishContext = MessagePublishContext.get(this, sequenceId, msgIn, + headersAndPayload.readableBytes(), batchSize, + isChunked, System.nanoTime(), isMarker); + if (brokerInterceptor != null) { + brokerInterceptor.onMessagePublish(this, headersAndPayload, messagePublishContext); + } + topic.publishMessage(headersAndPayload, messagePublishContext); } private void publishMessageToTopic(ByteBuf headersAndPayload, long lowestSequenceId, long highestSequenceId, long batchSize, boolean isChunked, boolean isMarker) { - topic.publishMessage(headersAndPayload, - MessagePublishContext.get(this, lowestSequenceId, - highestSequenceId, msgIn, headersAndPayload.readableBytes(), batchSize, - isChunked, System.nanoTime(), isMarker)); + MessagePublishContext messagePublishContext = MessagePublishContext.get(this, lowestSequenceId, + highestSequenceId, msgIn, headersAndPayload.readableBytes(), batchSize, + isChunked, System.nanoTime(), isMarker); + if (brokerInterceptor != null) { + brokerInterceptor.onMessagePublish(this, headersAndPayload, messagePublishContext); + } + topic.publishMessage(headersAndPayload, messagePublishContext); } private boolean verifyChecksum(ByteBuf headersAndPayload) { @@ -733,9 +739,12 @@ public void checkEncryption() { public void publishTxnMessage(TxnID txnID, long producerId, long sequenceId, long highSequenceId, ByteBuf headersAndPayload, long batchSize, boolean isChunked, boolean isMarker) { checkAndStartPublish(producerId, sequenceId, headersAndPayload, batchSize); - topic.publishTxnMessage(txnID, headersAndPayload, - MessagePublishContext.get(this, sequenceId, highSequenceId, msgIn, - headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker)); + MessagePublishContext messagePublishContext = MessagePublishContext.get(this, sequenceId, highSequenceId, msgIn, + headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker); + if (brokerInterceptor != null) { + brokerInterceptor.onMessagePublish(this, headersAndPayload, messagePublishContext); + } + topic.publishTxnMessage(txnID, headersAndPayload, messagePublishContext); } public SchemaVersion getSchemaVersion() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java index 37c0f8fddc185..13d6af250c698 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java @@ -18,6 +18,18 @@ */ package org.apache.pulsar.broker.intercept; +import static org.mockito.ArgumentMatchers.same; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.testng.Assert.assertEquals; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; import lombok.Cleanup; import okhttp3.Call; import okhttp3.Callback; @@ -39,19 +51,6 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import java.io.IOException; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CompletableFuture; - -import static org.mockito.ArgumentMatchers.same; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.testng.Assert.assertEquals; - @Test(groups = "broker") public class BrokerInterceptorTest extends ProducerConsumerBase { @@ -170,10 +169,10 @@ public void testBeforeSendMessage() throws PulsarClientException { .subscriptionName("test") .subscribe(); - assertEquals(((CounterBrokerInterceptor)listener).getMessagePublishCount(),0); + assertEquals(((CounterBrokerInterceptor)listener).getMessageProducedCount(),0); assertEquals(((CounterBrokerInterceptor)listener).getMessageDispatchCount(),0); producer.send("hello world"); - assertEquals(((CounterBrokerInterceptor)listener).getMessagePublishCount(),1); + assertEquals(((CounterBrokerInterceptor)listener).getMessageProducedCount(),1); Message msg = consumer.receive(); @@ -243,4 +242,19 @@ public void requestInterceptorFailedTest() { } } + @Test + public void testOnMessagePublish() throws PulsarClientException { + BrokerInterceptor listener = pulsar.getBrokerInterceptor(); + Assert.assertTrue(listener instanceof CounterBrokerInterceptor); + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic("test-on-message-publish") + .create(); + producer.send("hello world"); + + Awaitility.await().untilAsserted(() -> { + assertEquals(((CounterBrokerInterceptor) listener).getMessagePublishCount(), 1); + }); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java index 42d3740a5480c..25bb8f8ff0f10 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java @@ -41,8 +41,8 @@ import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.common.api.proto.BaseCommand; -import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.api.proto.CommandAck; +import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.intercept.InterceptException; import org.eclipse.jetty.server.Response; @@ -55,7 +55,8 @@ public class CounterBrokerInterceptor implements BrokerInterceptor { private AtomicInteger connectionCreationCount = new AtomicInteger(); private AtomicInteger producerCount = new AtomicInteger(); private AtomicInteger consumerCount = new AtomicInteger(); - private AtomicInteger messageCount = new AtomicInteger(); + private AtomicInteger messagePublishCount = new AtomicInteger(); + private AtomicInteger messageProducedCount = new AtomicInteger(); private AtomicInteger messageDispatchCount = new AtomicInteger(); private AtomicInteger messageAckCount = new AtomicInteger(); @@ -65,7 +66,8 @@ public void reset() { connectionCreationCount.set(0); producerCount.set(0); consumerCount.set(0); - messageCount.set(0); + messagePublishCount.set(0); + messageProducedCount.set(0); messageDispatchCount.set(0); messageAckCount.set(0); } @@ -108,6 +110,15 @@ public void consumerCreated(ServerCnx cnx, consumerCount.incrementAndGet(); } + @Override + public void onMessagePublish(Producer producer, ByteBuf headersAndPayload, Topic.PublishContext publishContext) { + if (log.isDebugEnabled()) { + log.debug("Message publish topic={}, producer={}", + producer.getTopic().getName(), producer.getProducerName()); + } + messagePublishCount.incrementAndGet(); + } + @Override public void messageProduced(ServerCnx cnx, Producer producer, long startTimeNs, long ledgerId, long entryId, @@ -116,7 +127,7 @@ public void messageProduced(ServerCnx cnx, Producer producer, long startTimeNs, log.debug("Message published topic={}, producer={}", producer.getTopic().getName(), producer.getProducerName()); } - messageCount.incrementAndGet(); + messageProducedCount.incrementAndGet(); } @Override @@ -222,7 +233,11 @@ public int getConsumerCount() { } public int getMessagePublishCount() { - return messageCount.get(); + return messagePublishCount.get(); + } + + public int getMessageProducedCount() { + return messageProducedCount.get(); } public int getMessageDispatchCount() {