From 0960047a662b03ab924a7e8c083f15f4fefa4853 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Mon, 10 Apr 2023 21:00:28 +0800 Subject: [PATCH] Restore the tests related to MultiMessageIdImpl --- .../pulsar/client/api/TopicReaderTest.java | 10 +- .../client/impl/TopicsConsumerImplTest.java | 37 +++ .../client/impl/BatchMessageIdImplTest.java | 13 + .../client/impl/MessageIdCompareToTest.java | 222 ++++++++++++++++++ 4 files changed, 279 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java index 8b82b7641cd56..424081b904c81 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java @@ -1092,12 +1092,16 @@ public void testHasMessageAvailable() throws Exception { assertFalse(messageId instanceof BatchMessageIdImpl); ReaderImpl reader = (ReaderImpl)pulsarClient.newReader().topic(topicName) .startMessageId(messageId).startMessageIdInclusive().create(); + MessageIdImpl lastMsgId = (MessageIdImpl) reader.getConsumer().getLastMessageId(); + assertFalse(lastMsgId instanceof BatchMessageIdImpl); + assertEquals(lastMsgId.getLedgerId(), messageId.getLedgerId()); + assertEquals(lastMsgId.getEntryId(), messageId.getEntryId()); List lastMsgIds = reader.getConsumer().getLastMessageIds(); assertEquals(lastMsgIds.size(), 1); assertEquals(lastMsgIds.get(0).getOwnerTopic(), topicName); - MessageIdAdv lastMsgId = (MessageIdAdv) lastMsgIds.get(0); - assertEquals(lastMsgId.getLedgerId(), messageId.getLedgerId()); - assertEquals(lastMsgId.getEntryId(), messageId.getEntryId()); + MessageIdAdv lastMsgIdAdv = (MessageIdAdv) lastMsgIds.get(0); + assertEquals(lastMsgIdAdv.getLedgerId(), messageId.getLedgerId()); + assertEquals(lastMsgIdAdv.getEntryId(), messageId.getEntryId()); reader.close(); CountDownLatch latch = new CountDownLatch(numOfMessage); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java index 2b180b954d928..9fc323f837de9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java @@ -1136,6 +1136,24 @@ public void testGetLastMessageId() throws Exception { producer3.send((messagePredicate + "producer3-" + i).getBytes()); } + MessageId messageId = consumer.getLastMessageId(); + assertTrue(messageId instanceof MultiMessageIdImpl); + MultiMessageIdImpl multiMessageId = (MultiMessageIdImpl) messageId; + Map map = multiMessageId.getMap(); + assertEquals(map.size(), 6); + map.forEach((k, v) -> { + log.info("topic: {}, messageId:{} ", k, v.toString()); + assertTrue(v instanceof MessageIdImpl); + MessageIdImpl messageId1 = (MessageIdImpl) v; + if (k.contains(topicName1)) { + assertEquals(messageId1.entryId, totalMessages - 1); + } else if (k.contains(topicName2)) { + assertEquals(messageId1.entryId, totalMessages / 2 - 1); + } else { + assertEquals(messageId1.entryId, totalMessages / 3 - 1); + } + }); + List msgIds = consumer.getLastMessageIds(); assertEquals(msgIds.size(), 6); assertEquals(msgIds.stream().map(TopicMessageId::getOwnerTopic).collect(Collectors.toSet()), topics); @@ -1156,6 +1174,25 @@ public void testGetLastMessageId() throws Exception { producer3.send((messagePredicate + "producer3-" + i).getBytes()); } + + messageId = consumer.getLastMessageId(); + assertTrue(messageId instanceof MultiMessageIdImpl); + MultiMessageIdImpl multiMessageId2 = (MultiMessageIdImpl) messageId; + Map map2 = multiMessageId2.getMap(); + assertEquals(map2.size(), 6); + map2.forEach((k, v) -> { + log.info("topic: {}, messageId:{} ", k, v.toString()); + assertTrue(v instanceof MessageIdImpl); + MessageIdImpl messageId1 = (MessageIdImpl) v; + if (k.contains(topicName1)) { + assertEquals(messageId1.entryId, totalMessages * 2 - 1); + } else if (k.contains(topicName2)) { + assertEquals(messageId1.entryId, totalMessages - 1); + } else { + assertEquals(messageId1.entryId, totalMessages * 2 / 3 - 1); + } + }); + msgIds = consumer.getLastMessageIds(); assertEquals(msgIds.size(), 6); assertEquals(msgIds.stream().map(TopicMessageId::getOwnerTopic).collect(Collectors.toSet()), topics); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageIdImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageIdImplTest.java index 07e89c1dc7ab8..10d805cdc4db3 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageIdImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageIdImplTest.java @@ -21,6 +21,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotEquals; import java.io.IOException; +import java.util.Collections; import org.testng.annotations.Test; public class BatchMessageIdImplTest { @@ -67,6 +68,18 @@ public void equalsTest() { assertEquals(msgId, batchMsgId4); } + @Test + public void notEqualsMultiTest() { + BatchMessageIdImpl batchMsgId = new BatchMessageIdImpl(0, 0, 0, 0); + MessageIdImpl msgId = new MessageIdImpl(0, 0, 0); + MultiMessageIdImpl multiMsgId = new MultiMessageIdImpl(Collections.singletonMap("topic", msgId)); + + assertNotEquals(msgId, multiMsgId); + assertNotEquals(multiMsgId, msgId); + assertNotEquals(batchMsgId, multiMsgId); + assertNotEquals(multiMsgId, batchMsgId); + } + @Test public void compareToUnbatchedTest() { MessageIdImpl msgId = new MessageIdImpl(1, 2, 3); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java index 99435118b2863..4f0eca6ea4af8 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java @@ -19,8 +19,14 @@ package org.apache.pulsar.client.impl; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.apache.pulsar.client.api.MessageId; import org.testng.annotations.Test; /** @@ -182,4 +188,220 @@ public void testBatchMessageIdImplCompareToTopicMessageId() { assertTrue(topicMessageId2.compareTo(messageIdImpl2) < 0, "Expected to be less than"); assertTrue(topicMessageId2.compareTo(messageIdImpl2) < 0, "Expected to be less than"); } + + @Test + public void testMultiMessageIdEqual() { + // null + MultiMessageIdImpl null1 = new MultiMessageIdImpl(null); + MultiMessageIdImpl null2 = new MultiMessageIdImpl(null); + assertEquals(null1, null2); + + // empty + MultiMessageIdImpl empty1 = new MultiMessageIdImpl(Collections.emptyMap()); + MultiMessageIdImpl empty2 = new MultiMessageIdImpl(Collections.emptyMap()); + assertEquals(empty1, empty2); + + // null empty + assertEquals(null1, empty2); + assertEquals(empty2, null1); + + // 1 item + String topic1 = "topicName1"; + MessageIdImpl messageIdImpl1 = new MessageIdImpl(123L, 345L, 567); + MessageIdImpl messageIdImpl2 = new MessageIdImpl(123L, 345L, 567); + MessageIdImpl messageIdImpl3 = new MessageIdImpl(345L, 456L, 567); + + MultiMessageIdImpl item1 = new MultiMessageIdImpl(Collections.singletonMap(topic1, messageIdImpl1)); + MultiMessageIdImpl item2 = new MultiMessageIdImpl(Collections.singletonMap(topic1, messageIdImpl2)); + assertEquals(item1, item2); + + // 1 item, empty not equal + assertNotEquals(item1, null1); + assertNotEquals(null1, item1); + + // key not equal + String topic2 = "topicName2"; + MultiMessageIdImpl item3 = new MultiMessageIdImpl(Collections.singletonMap(topic2, messageIdImpl2)); + assertNotEquals(item1, item3); + assertNotEquals(item3, item1); + + // value not equal + MultiMessageIdImpl item4 = new MultiMessageIdImpl(Collections.singletonMap(topic1, messageIdImpl3)); + assertNotEquals(item1, item4); + assertNotEquals(item4, item1); + + // key value not equal + assertNotEquals(item3, item4); + assertNotEquals(item4, item3); + + // 2 items + Map map1 = new HashMap<>(); + Map map2 = new HashMap<>(); + map1.put(topic1, messageIdImpl1); + map1.put(topic2, messageIdImpl2); + map2.put(topic2, messageIdImpl2); + map2.put(topic1, messageIdImpl1); + + MultiMessageIdImpl item5 = new MultiMessageIdImpl(map1); + MultiMessageIdImpl item6 = new MultiMessageIdImpl(map2); + + assertEquals(item5, item6); + + assertNotEquals(item5, null1); + assertNotEquals(item5, empty1); + assertNotEquals(item5, item1); + assertNotEquals(item5, item3); + assertNotEquals(item5, item4); + + assertNotEquals(null1, item5); + assertNotEquals(empty1, item5); + assertNotEquals(item1, item5); + assertNotEquals(item3, item5); + assertNotEquals(item4, item5); + + map2.put(topic1, messageIdImpl3); + MultiMessageIdImpl item7 = new MultiMessageIdImpl(map2); + assertNotEquals(item5, item7); + assertNotEquals(item7, item5); + } + + @Test + public void testMultiMessageIdCompareto() { + // null + MultiMessageIdImpl null1 = new MultiMessageIdImpl(null); + MultiMessageIdImpl null2 = new MultiMessageIdImpl(null); + assertEquals(0, null1.compareTo(null2)); + + // empty + MultiMessageIdImpl empty1 = new MultiMessageIdImpl(Collections.emptyMap()); + MultiMessageIdImpl empty2 = new MultiMessageIdImpl(Collections.emptyMap()); + assertEquals(0, empty1.compareTo(empty2)); + + // null empty + assertEquals(0, null1.compareTo(empty2)); + assertEquals(0, empty2.compareTo(null1)); + + // 1 item + String topic1 = "topicName1"; + MessageIdImpl messageIdImpl1 = new MessageIdImpl(123L, 345L, 567); + MessageIdImpl messageIdImpl2 = new MessageIdImpl(123L, 345L, 567); + MessageIdImpl messageIdImpl3 = new MessageIdImpl(345L, 456L, 567); + + MultiMessageIdImpl item1 = new MultiMessageIdImpl(Collections.singletonMap(topic1, messageIdImpl1)); + MultiMessageIdImpl item2 = new MultiMessageIdImpl(Collections.singletonMap(topic1, messageIdImpl2)); + assertEquals(0, item1.compareTo(item2)); + + // 1 item, empty not equal + try { + item1.compareTo(null1); + fail("should throw exception for not comparable"); + } catch (IllegalArgumentException e) { + // expected + } + try { + null1.compareTo(item1); + fail("should throw exception for not comparable"); + } catch (IllegalArgumentException e) { + // expected + } + + // key not equal + String topic2 = "topicName2"; + MultiMessageIdImpl item3 = new MultiMessageIdImpl(Collections.singletonMap(topic2, messageIdImpl2)); + try { + item1.compareTo(item3); + fail("should throw exception for not comparable"); + } catch (IllegalArgumentException e) { + // expected + } + try { + item3.compareTo(item1); + fail("should throw exception for not comparable"); + } catch (IllegalArgumentException e) { + // expected + } + + // value not equal + MultiMessageIdImpl item4 = new MultiMessageIdImpl(Collections.singletonMap(topic1, messageIdImpl3)); + assertTrue(item1.compareTo(item4) < 0); + assertTrue(item4.compareTo(item1) > 0); + + // key value not equal + try { + item3.compareTo(item4); + fail("should throw exception for not comparable"); + } catch (IllegalArgumentException e) { + // expected + } + try { + item4.compareTo(item3); + fail("should throw exception for not comparable"); + } catch (IllegalArgumentException e) { + // expected + } + + // 2 items + Map map1 = new HashMap<>(); + Map map2 = new HashMap<>(); + map1.put(topic1, messageIdImpl1); + map1.put(topic2, messageIdImpl2); + map2.put(topic2, messageIdImpl2); + map2.put(topic1, messageIdImpl1); + + MultiMessageIdImpl item5 = new MultiMessageIdImpl(map1); + MultiMessageIdImpl item6 = new MultiMessageIdImpl(map2); + + assertTrue(item5.compareTo(item6) == 0); + + try { + item5.compareTo(null1); + fail("should throw exception for not comparable"); + } catch (IllegalArgumentException e) { + // expected + } + + try { + item5.compareTo(empty1); + fail("should throw exception for not comparable"); + } catch (IllegalArgumentException e) { + // expected + } + + try { + item5.compareTo(item1); + fail("should throw exception for not comparable"); + } catch (IllegalArgumentException e) { + // expected + } + + try { + item5.compareTo(item3); + fail("should throw exception for not comparable"); + } catch (IllegalArgumentException e) { + // expected + } + + try { + item5.compareTo(item4); + fail("should throw exception for not comparable"); + } catch (IllegalArgumentException e) { + // expected + } + + map2.put(topic1, messageIdImpl3); + MultiMessageIdImpl item7 = new MultiMessageIdImpl(map2); + + assertTrue(item7.compareTo(item5) > 0); + assertTrue(item5.compareTo(item7) < 0); + + Map map3 = new HashMap<>(); + map3.put(topic1, messageIdImpl3); + map3.put(topic2, messageIdImpl3); + MultiMessageIdImpl item8 = new MultiMessageIdImpl(map3); + assertTrue(item8.compareTo(item5) > 0); + assertTrue(item8.compareTo(item7) > 0); + + assertTrue(item5.compareTo(item8) < 0); + assertTrue(item7.compareTo(item8) < 0); + } }