Skip to content

Commit

Permalink
[feat][broker] Add onMessagePublish method to BrokerInterceptor
Browse files Browse the repository at this point in the history
Signed-off-by: Zixuan Liu <[email protected]>
  • Loading branch information
nodece committed Nov 1, 2023
1 parent 7cb32a2 commit e29ff65
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,18 @@ default void consumerCreated(ServerCnx cnx,
Map<String, String> metadata) {
}

/**
* Intercept message when broker receive a send request.
*
* @param headersAndPayload entry's header and payload
* @param publishContext Publish Context
*/
default void onMessagePublish(Producer producer,
ByteBuf headersAndPayload,
Topic.PublishContext publishContext) {

}

/**
* Intercept after a message is produced.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ public void beforeSendMessage(Subscription subscription,
}
}

@Override
public void onMessagePublish(Producer producer, ByteBuf headersAndPayload,
Topic.PublishContext publishContext) {
try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
this.interceptor.onMessagePublish(producer, headersAndPayload, publishContext);
}
}

@Override
public void producerCreated(ServerCnx cnx, Producer producer,
Map<String, String> metadata){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,18 +249,24 @@ public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf he

private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, long batchSize, boolean isChunked,
boolean isMarker) {
topic.publishMessage(headersAndPayload,
MessagePublishContext.get(this, sequenceId, msgIn,
headersAndPayload.readableBytes(), batchSize,
isChunked, System.nanoTime(), isMarker));
MessagePublishContext messagePublishContext = MessagePublishContext.get(this, sequenceId, msgIn,
headersAndPayload.readableBytes(), batchSize,
isChunked, System.nanoTime(), isMarker);
if (brokerInterceptor != null) {
brokerInterceptor.onMessagePublish(this, headersAndPayload, messagePublishContext);
}
topic.publishMessage(headersAndPayload, messagePublishContext);
}

private void publishMessageToTopic(ByteBuf headersAndPayload, long lowestSequenceId, long highestSequenceId,
long batchSize, boolean isChunked, boolean isMarker) {
topic.publishMessage(headersAndPayload,
MessagePublishContext.get(this, lowestSequenceId,
highestSequenceId, msgIn, headersAndPayload.readableBytes(), batchSize,
isChunked, System.nanoTime(), isMarker));
MessagePublishContext messagePublishContext = MessagePublishContext.get(this, lowestSequenceId,
highestSequenceId, msgIn, headersAndPayload.readableBytes(), batchSize,
isChunked, System.nanoTime(), isMarker);
if (brokerInterceptor != null) {
brokerInterceptor.onMessagePublish(this, headersAndPayload, messagePublishContext);
}
topic.publishMessage(headersAndPayload, messagePublishContext);
}

private boolean verifyChecksum(ByteBuf headersAndPayload) {
Expand Down Expand Up @@ -733,9 +739,12 @@ public void checkEncryption() {
public void publishTxnMessage(TxnID txnID, long producerId, long sequenceId, long highSequenceId,
ByteBuf headersAndPayload, long batchSize, boolean isChunked, boolean isMarker) {
checkAndStartPublish(producerId, sequenceId, headersAndPayload, batchSize);
topic.publishTxnMessage(txnID, headersAndPayload,
MessagePublishContext.get(this, sequenceId, highSequenceId, msgIn,
headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker));
MessagePublishContext messagePublishContext = MessagePublishContext.get(this, sequenceId, highSequenceId, msgIn,
headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker);
if (brokerInterceptor != null) {
brokerInterceptor.onMessagePublish(this, headersAndPayload, messagePublishContext);
}
topic.publishTxnMessage(txnID, headersAndPayload, messagePublishContext);
}

public SchemaVersion getSchemaVersion() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,23 @@ public void testConsumerCreation() throws PulsarClientException {
Awaitility.await().until(() -> ((CounterBrokerInterceptor) listener).getConsumerCount() == 1);
}

@Test
public void testMessagePublishAndProduced() throws PulsarClientException {
BrokerInterceptor listener = pulsar.getBrokerInterceptor();
Assert.assertTrue(listener instanceof CounterBrokerInterceptor);

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic("test-before-send-message")
.create();

assertEquals(((CounterBrokerInterceptor)listener).getMessagePublishCount(),0);
assertEquals(((CounterBrokerInterceptor)listener).getMessageProducedCount(),0);
producer.send("hello world");
assertEquals(((CounterBrokerInterceptor)listener).getMessagePublishCount(),1);
assertEquals(((CounterBrokerInterceptor)listener).getMessageProducedCount(),1);
}

@Test
public void testBeforeSendMessage() throws PulsarClientException {
BrokerInterceptor listener = pulsar.getBrokerInterceptor();
Expand All @@ -170,10 +187,10 @@ public void testBeforeSendMessage() throws PulsarClientException {
.subscriptionName("test")
.subscribe();

assertEquals(((CounterBrokerInterceptor)listener).getMessagePublishCount(),0);
assertEquals(((CounterBrokerInterceptor)listener).getMessageProducedCount(),0);
assertEquals(((CounterBrokerInterceptor)listener).getMessageDispatchCount(),0);
producer.send("hello world");
assertEquals(((CounterBrokerInterceptor)listener).getMessagePublishCount(),1);
assertEquals(((CounterBrokerInterceptor)listener).getMessageProducedCount(),1);

Message<String> msg = consumer.receive();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
private AtomicInteger connectionCreationCount = new AtomicInteger();
private AtomicInteger producerCount = new AtomicInteger();
private AtomicInteger consumerCount = new AtomicInteger();
private AtomicInteger messagePublishCount = new AtomicInteger();
private AtomicInteger messageCount = new AtomicInteger();
private AtomicInteger messageDispatchCount = new AtomicInteger();
private AtomicInteger messageAckCount = new AtomicInteger();
Expand Down Expand Up @@ -108,6 +109,15 @@ public void consumerCreated(ServerCnx cnx,
consumerCount.incrementAndGet();
}

@Override
public void onMessagePublish(Producer producer, ByteBuf headersAndPayload, Topic.PublishContext publishContext) {
if (log.isDebugEnabled()) {
log.debug("Message broker received topic={}, producer={}",
producer.getTopic().getName(), producer.getProducerName());
}
messagePublishCount.incrementAndGet();
}

@Override
public void messageProduced(ServerCnx cnx, Producer producer, long startTimeNs, long ledgerId,
long entryId,
Expand Down Expand Up @@ -222,6 +232,10 @@ public int getConsumerCount() {
}

public int getMessagePublishCount() {
return messagePublishCount.get();
}

public int getMessageProducedCount() {
return messageCount.get();
}

Expand Down

0 comments on commit e29ff65

Please sign in to comment.