From 7647f4dd33d63f5d609402ece4c7af53633b0b98 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Thu, 2 Nov 2023 08:44:54 +0800 Subject: [PATCH] [feat][broker] Add onMessagePublish method to BrokerInterceptor (#1) Signed-off-by: Zixuan Liu --- .../broker/intercept/BrokerInterceptor.java | 12 +++++++ .../BrokerInterceptorWithClassLoader.java | 8 +++++ .../pulsar/broker/service/Producer.java | 31 ++++++++++++------- .../intercept/BrokerInterceptorTest.java | 21 +++++++++++-- .../intercept/CounterBrokerInterceptor.java | 14 +++++++++ 5 files changed, 73 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java index cac1e66b53f79..dbe86ebcdabcb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java @@ -89,6 +89,18 @@ default void consumerCreated(ServerCnx cnx, Map metadata) { } + /** + * Intercept message when broker receive a send request. + * + * @param headersAndPayload entry's header and payload + * @param publishContext Publish Context + */ + default void onMessagePublish(Producer producer, + ByteBuf headersAndPayload, + Topic.PublishContext publishContext) { + + } + /** * Intercept after a message is produced. * diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java index 0f8c182767c5d..8904bab03077a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java @@ -63,6 +63,14 @@ public void beforeSendMessage(Subscription subscription, } } + @Override + public void onMessagePublish(Producer producer, ByteBuf headersAndPayload, + Topic.PublishContext publishContext) { + try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { + this.interceptor.onMessagePublish(producer, headersAndPayload, publishContext); + } + } + @Override public void producerCreated(ServerCnx cnx, Producer producer, Map metadata){ diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index 6ad07a70a3796..9b1f57de2ebaa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -249,18 +249,24 @@ public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf he private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, long batchSize, boolean isChunked, boolean isMarker) { - topic.publishMessage(headersAndPayload, - MessagePublishContext.get(this, sequenceId, msgIn, - headersAndPayload.readableBytes(), batchSize, - isChunked, System.nanoTime(), isMarker)); + MessagePublishContext messagePublishContext = MessagePublishContext.get(this, sequenceId, msgIn, + headersAndPayload.readableBytes(), batchSize, + isChunked, System.nanoTime(), isMarker); + if (brokerInterceptor != null) { + brokerInterceptor.onMessagePublish(this, headersAndPayload, messagePublishContext); + } + topic.publishMessage(headersAndPayload, messagePublishContext); } private void publishMessageToTopic(ByteBuf headersAndPayload, long lowestSequenceId, long highestSequenceId, long batchSize, boolean isChunked, boolean isMarker) { - topic.publishMessage(headersAndPayload, - MessagePublishContext.get(this, lowestSequenceId, - highestSequenceId, msgIn, headersAndPayload.readableBytes(), batchSize, - isChunked, System.nanoTime(), isMarker)); + MessagePublishContext messagePublishContext = MessagePublishContext.get(this, lowestSequenceId, + highestSequenceId, msgIn, headersAndPayload.readableBytes(), batchSize, + isChunked, System.nanoTime(), isMarker); + if (brokerInterceptor != null) { + brokerInterceptor.onMessagePublish(this, headersAndPayload, messagePublishContext); + } + topic.publishMessage(headersAndPayload, messagePublishContext); } private boolean verifyChecksum(ByteBuf headersAndPayload) { @@ -733,9 +739,12 @@ public void checkEncryption() { public void publishTxnMessage(TxnID txnID, long producerId, long sequenceId, long highSequenceId, ByteBuf headersAndPayload, long batchSize, boolean isChunked, boolean isMarker) { checkAndStartPublish(producerId, sequenceId, headersAndPayload, batchSize); - topic.publishTxnMessage(txnID, headersAndPayload, - MessagePublishContext.get(this, sequenceId, highSequenceId, msgIn, - headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker)); + MessagePublishContext messagePublishContext = MessagePublishContext.get(this, sequenceId, highSequenceId, msgIn, + headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker); + if (brokerInterceptor != null) { + brokerInterceptor.onMessagePublish(this, headersAndPayload, messagePublishContext); + } + topic.publishTxnMessage(txnID, headersAndPayload, messagePublishContext); } public SchemaVersion getSchemaVersion() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java index 37c0f8fddc185..93e3cc5e14e83 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java @@ -155,6 +155,23 @@ public void testConsumerCreation() throws PulsarClientException { Awaitility.await().until(() -> ((CounterBrokerInterceptor) listener).getConsumerCount() == 1); } + @Test + public void testMessagePublishAndProduced() throws PulsarClientException { + BrokerInterceptor listener = pulsar.getBrokerInterceptor(); + Assert.assertTrue(listener instanceof CounterBrokerInterceptor); + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic("test-before-send-message") + .create(); + + assertEquals(((CounterBrokerInterceptor)listener).getMessagePublishCount(),0); + assertEquals(((CounterBrokerInterceptor)listener).getMessageProducedCount(),0); + producer.send("hello world"); + assertEquals(((CounterBrokerInterceptor)listener).getMessagePublishCount(),1); + assertEquals(((CounterBrokerInterceptor)listener).getMessageProducedCount(),1); + } + @Test public void testBeforeSendMessage() throws PulsarClientException { BrokerInterceptor listener = pulsar.getBrokerInterceptor(); @@ -170,10 +187,10 @@ public void testBeforeSendMessage() throws PulsarClientException { .subscriptionName("test") .subscribe(); - assertEquals(((CounterBrokerInterceptor)listener).getMessagePublishCount(),0); + assertEquals(((CounterBrokerInterceptor)listener).getMessageProducedCount(),0); assertEquals(((CounterBrokerInterceptor)listener).getMessageDispatchCount(),0); producer.send("hello world"); - assertEquals(((CounterBrokerInterceptor)listener).getMessagePublishCount(),1); + assertEquals(((CounterBrokerInterceptor)listener).getMessageProducedCount(),1); Message msg = consumer.receive(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java index 42d3740a5480c..f50a0931a1b5a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java @@ -55,6 +55,7 @@ public class CounterBrokerInterceptor implements BrokerInterceptor { private AtomicInteger connectionCreationCount = new AtomicInteger(); private AtomicInteger producerCount = new AtomicInteger(); private AtomicInteger consumerCount = new AtomicInteger(); + private AtomicInteger messagePublishCount = new AtomicInteger(); private AtomicInteger messageCount = new AtomicInteger(); private AtomicInteger messageDispatchCount = new AtomicInteger(); private AtomicInteger messageAckCount = new AtomicInteger(); @@ -108,6 +109,15 @@ public void consumerCreated(ServerCnx cnx, consumerCount.incrementAndGet(); } + @Override + public void onMessagePublish(Producer producer, ByteBuf headersAndPayload, Topic.PublishContext publishContext) { + if (log.isDebugEnabled()) { + log.debug("Message broker received topic={}, producer={}", + producer.getTopic().getName(), producer.getProducerName()); + } + messagePublishCount.incrementAndGet(); + } + @Override public void messageProduced(ServerCnx cnx, Producer producer, long startTimeNs, long ledgerId, long entryId, @@ -222,6 +232,10 @@ public int getConsumerCount() { } public int getMessagePublishCount() { + return messagePublishCount.get(); + } + + public int getMessageProducedCount() { return messageCount.get(); }