Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][misc] Sync commits from apache into 3.1_ds #328

Closed
wants to merge 6 commits into from
Prev Previous commit
Next Next commit
[improve][test] Added message properties tests for batch and non-batc…
…h messages (apache#23473)

(cherry picked from commit 8de27a2)
(cherry picked from commit 986a4db)
codelipenghui authored and nikhil-ctds committed Oct 28, 2024
commit 5b3c084359e803747cc1ba0e4dc81ca1c385fb6c
Original file line number Diff line number Diff line change
@@ -1893,4 +1893,76 @@ public void testCreateMissingPartitions() throws Exception {
String topicName = "persistent://" + testTenant + "/" + testNamespaceLocal + "/testCreateMissingPartitions";
assertThrows(PulsarAdminException.NotFoundException.class, () -> admin.topics().createMissedPartitions(topicName));
}

@Test
public void testPeekMessageWithProperties() throws Exception {
String topicName = "persistent://" + testTenant + "/" + testNamespaceLocal + "/testPeekMessageWithProperties";
admin.topics().createNonPartitionedTopic(topicName);

// Test non-batch messages
@Cleanup
Producer<String> nonBatchProducer = pulsarClient.newProducer(Schema.STRING)
.topic(topicName)
.enableBatching(false)
.create();

Map<String, String> props1 = new HashMap<>();
props1.put("key1", "value1");
props1.put("KEY2", "VALUE2");
props1.put("KeY3", "VaLuE3");

nonBatchProducer.newMessage()
.properties(props1)
.value("non-batch-message")
.send();

Message<byte[]> peekedMessage = admin.topics().peekMessages(topicName, "sub-peek", 1).get(0);
assertEquals(new String(peekedMessage.getData()), "non-batch-message");
assertEquals(peekedMessage.getProperties().size(), 3);
assertEquals(peekedMessage.getProperties().get("key1"), "value1");
assertEquals(peekedMessage.getProperties().get("KEY2"), "VALUE2");
assertEquals(peekedMessage.getProperties().get("KeY3"), "VaLuE3");

admin.topics().truncate(topicName);

// Test batch messages
@Cleanup
Producer<String> batchProducer = pulsarClient.newProducer(Schema.STRING)
.topic(topicName)
.enableBatching(true)
.batchingMaxPublishDelay(1, TimeUnit.SECONDS)
.batchingMaxMessages(2)
.create();

Map<String, String> props2 = new HashMap<>();
props2.put("batch-key1", "batch-value1");
props2.put("BATCH-KEY2", "BATCH-VALUE2");
props2.put("BaTcH-kEy3", "BaTcH-vAlUe3");

batchProducer.newMessage()
.properties(props2)
.value("batch-message-1")
.sendAsync();

batchProducer.newMessage()
.properties(props2)
.value("batch-message-2")
.send();

List<Message<byte[]>> peekedMessages = admin.topics().peekMessages(topicName, "sub-peek", 2);
assertEquals(peekedMessages.size(), 2);

for (int i = 0; i < 2; i++) {
Message<byte[]> batchMessage = peekedMessages.get(i);
assertEquals(new String(batchMessage.getData()), "batch-message-" + (i + 1));
assertEquals(batchMessage.getProperties().size(),
3 + 2 // 3 properties from the message + 2 properties from the batch
);
assertEquals(batchMessage.getProperties().get("X-Pulsar-num-batch-message"), "2");
assertNotNull(batchMessage.getProperties().get("X-Pulsar-batch-size"));
assertEquals(batchMessage.getProperties().get("batch-key1"), "batch-value1");
assertEquals(batchMessage.getProperties().get("BATCH-KEY2"), "BATCH-VALUE2");
assertEquals(batchMessage.getProperties().get("BaTcH-kEy3"), "BaTcH-vAlUe3");
}
}
}