Skip to content

Commit

Permalink
Restore the tests related to MultiMessageIdImpl
Browse files Browse the repository at this point in the history
  • Loading branch information
BewareMyPower committed Apr 10, 2023
1 parent 3435801 commit fd129bb
Show file tree
Hide file tree
Showing 4 changed files with 277 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1092,12 +1092,14 @@ public void testHasMessageAvailable() throws Exception {
assertFalse(messageId instanceof BatchMessageIdImpl);
ReaderImpl<byte[]> reader = (ReaderImpl<byte[]>)pulsarClient.newReader().topic(topicName)
.startMessageId(messageId).startMessageIdInclusive().create();
MessageIdImpl lastMsgId = (MessageIdImpl) reader.getConsumer().getLastMessageId();
assertFalse(lastMsgId instanceof BatchMessageIdImpl);
List<TopicMessageId> lastMsgIds = reader.getConsumer().getLastMessageIds();
assertEquals(lastMsgIds.size(), 1);
assertEquals(lastMsgIds.get(0).getOwnerTopic(), topicName);
MessageIdAdv lastMsgId = (MessageIdAdv) lastMsgIds.get(0);
assertEquals(lastMsgId.getLedgerId(), messageId.getLedgerId());
assertEquals(lastMsgId.getEntryId(), messageId.getEntryId());
MessageIdAdv lastMsgIdAdv = (MessageIdAdv) lastMsgIds.get(0);
assertEquals(lastMsgIdAdv.getLedgerId(), messageId.getLedgerId());
assertEquals(lastMsgIdAdv.getEntryId(), messageId.getEntryId());
reader.close();

CountDownLatch latch = new CountDownLatch(numOfMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1136,6 +1136,24 @@ public void testGetLastMessageId() throws Exception {
producer3.send((messagePredicate + "producer3-" + i).getBytes());
}

MessageId messageId = consumer.getLastMessageId();
assertTrue(messageId instanceof MultiMessageIdImpl);
MultiMessageIdImpl multiMessageId = (MultiMessageIdImpl) messageId;
Map<String, MessageId> map = multiMessageId.getMap();
assertEquals(map.size(), 6);
map.forEach((k, v) -> {
log.info("topic: {}, messageId:{} ", k, v.toString());
assertTrue(v instanceof MessageIdImpl);
MessageIdImpl messageId1 = (MessageIdImpl) v;
if (k.contains(topicName1)) {
assertEquals(messageId1.entryId, totalMessages - 1);
} else if (k.contains(topicName2)) {
assertEquals(messageId1.entryId, totalMessages / 2 - 1);
} else {
assertEquals(messageId1.entryId, totalMessages / 3 - 1);
}
});

List<TopicMessageId> msgIds = consumer.getLastMessageIds();
assertEquals(msgIds.size(), 6);
assertEquals(msgIds.stream().map(TopicMessageId::getOwnerTopic).collect(Collectors.toSet()), topics);
Expand All @@ -1156,6 +1174,25 @@ public void testGetLastMessageId() throws Exception {
producer3.send((messagePredicate + "producer3-" + i).getBytes());
}


messageId = consumer.getLastMessageId();
assertTrue(messageId instanceof MultiMessageIdImpl);
MultiMessageIdImpl multiMessageId2 = (MultiMessageIdImpl) messageId;
Map<String, MessageId> map2 = multiMessageId2.getMap();
assertEquals(map2.size(), 6);
map2.forEach((k, v) -> {
log.info("topic: {}, messageId:{} ", k, v.toString());
assertTrue(v instanceof MessageIdImpl);
MessageIdImpl messageId1 = (MessageIdImpl) v;
if (k.contains(topicName1)) {
assertEquals(messageId1.entryId, totalMessages * 2 - 1);
} else if (k.contains(topicName2)) {
assertEquals(messageId1.entryId, totalMessages - 1);
} else {
assertEquals(messageId1.entryId, totalMessages * 2 / 3 - 1);
}
});

msgIds = consumer.getLastMessageIds();
assertEquals(msgIds.size(), 6);
assertEquals(msgIds.stream().map(TopicMessageId::getOwnerTopic).collect(Collectors.toSet()), topics);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import java.io.IOException;
import java.util.Collections;
import org.testng.annotations.Test;

public class BatchMessageIdImplTest {
Expand Down Expand Up @@ -67,6 +68,18 @@ public void equalsTest() {
assertEquals(msgId, batchMsgId4);
}

@Test
public void notEqualsMultiTest() {
BatchMessageIdImpl batchMsgId = new BatchMessageIdImpl(0, 0, 0, 0);
MessageIdImpl msgId = new MessageIdImpl(0, 0, 0);
MultiMessageIdImpl multiMsgId = new MultiMessageIdImpl(Collections.singletonMap("topic", msgId));

assertNotEquals(msgId, multiMsgId);
assertNotEquals(multiMsgId, msgId);
assertNotEquals(batchMsgId, multiMsgId);
assertNotEquals(multiMsgId, batchMsgId);
}

@Test
public void compareToUnbatchedTest() {
MessageIdImpl msgId = new MessageIdImpl(1, 2, 3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,14 @@
package org.apache.pulsar.client.impl;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.pulsar.client.api.MessageId;
import org.testng.annotations.Test;

/**
Expand Down Expand Up @@ -182,4 +188,220 @@ public void testBatchMessageIdImplCompareToTopicMessageId() {
assertTrue(topicMessageId2.compareTo(messageIdImpl2) < 0, "Expected to be less than");
assertTrue(topicMessageId2.compareTo(messageIdImpl2) < 0, "Expected to be less than");
}

@Test
public void testMultiMessageIdEqual() {
// null
MultiMessageIdImpl null1 = new MultiMessageIdImpl(null);
MultiMessageIdImpl null2 = new MultiMessageIdImpl(null);
assertEquals(null1, null2);

// empty
MultiMessageIdImpl empty1 = new MultiMessageIdImpl(Collections.emptyMap());
MultiMessageIdImpl empty2 = new MultiMessageIdImpl(Collections.emptyMap());
assertEquals(empty1, empty2);

// null empty
assertEquals(null1, empty2);
assertEquals(empty2, null1);

// 1 item
String topic1 = "topicName1";
MessageIdImpl messageIdImpl1 = new MessageIdImpl(123L, 345L, 567);
MessageIdImpl messageIdImpl2 = new MessageIdImpl(123L, 345L, 567);
MessageIdImpl messageIdImpl3 = new MessageIdImpl(345L, 456L, 567);

MultiMessageIdImpl item1 = new MultiMessageIdImpl(Collections.singletonMap(topic1, messageIdImpl1));
MultiMessageIdImpl item2 = new MultiMessageIdImpl(Collections.singletonMap(topic1, messageIdImpl2));
assertEquals(item1, item2);

// 1 item, empty not equal
assertNotEquals(item1, null1);
assertNotEquals(null1, item1);

// key not equal
String topic2 = "topicName2";
MultiMessageIdImpl item3 = new MultiMessageIdImpl(Collections.singletonMap(topic2, messageIdImpl2));
assertNotEquals(item1, item3);
assertNotEquals(item3, item1);

// value not equal
MultiMessageIdImpl item4 = new MultiMessageIdImpl(Collections.singletonMap(topic1, messageIdImpl3));
assertNotEquals(item1, item4);
assertNotEquals(item4, item1);

// key value not equal
assertNotEquals(item3, item4);
assertNotEquals(item4, item3);

// 2 items
Map<String, MessageId> map1 = new HashMap<>();
Map<String, MessageId> map2 = new HashMap<>();
map1.put(topic1, messageIdImpl1);
map1.put(topic2, messageIdImpl2);
map2.put(topic2, messageIdImpl2);
map2.put(topic1, messageIdImpl1);

MultiMessageIdImpl item5 = new MultiMessageIdImpl(map1);
MultiMessageIdImpl item6 = new MultiMessageIdImpl(map2);

assertEquals(item5, item6);

assertNotEquals(item5, null1);
assertNotEquals(item5, empty1);
assertNotEquals(item5, item1);
assertNotEquals(item5, item3);
assertNotEquals(item5, item4);

assertNotEquals(null1, item5);
assertNotEquals(empty1, item5);
assertNotEquals(item1, item5);
assertNotEquals(item3, item5);
assertNotEquals(item4, item5);

map2.put(topic1, messageIdImpl3);
MultiMessageIdImpl item7 = new MultiMessageIdImpl(map2);
assertNotEquals(item5, item7);
assertNotEquals(item7, item5);
}

@Test
public void testMultiMessageIdCompareto() {
// null
MultiMessageIdImpl null1 = new MultiMessageIdImpl(null);
MultiMessageIdImpl null2 = new MultiMessageIdImpl(null);
assertEquals(0, null1.compareTo(null2));

// empty
MultiMessageIdImpl empty1 = new MultiMessageIdImpl(Collections.emptyMap());
MultiMessageIdImpl empty2 = new MultiMessageIdImpl(Collections.emptyMap());
assertEquals(0, empty1.compareTo(empty2));

// null empty
assertEquals(0, null1.compareTo(empty2));
assertEquals(0, empty2.compareTo(null1));

// 1 item
String topic1 = "topicName1";
MessageIdImpl messageIdImpl1 = new MessageIdImpl(123L, 345L, 567);
MessageIdImpl messageIdImpl2 = new MessageIdImpl(123L, 345L, 567);
MessageIdImpl messageIdImpl3 = new MessageIdImpl(345L, 456L, 567);

MultiMessageIdImpl item1 = new MultiMessageIdImpl(Collections.singletonMap(topic1, messageIdImpl1));
MultiMessageIdImpl item2 = new MultiMessageIdImpl(Collections.singletonMap(topic1, messageIdImpl2));
assertEquals(0, item1.compareTo(item2));

// 1 item, empty not equal
try {
item1.compareTo(null1);
fail("should throw exception for not comparable");
} catch (IllegalArgumentException e) {
// expected
}
try {
null1.compareTo(item1);
fail("should throw exception for not comparable");
} catch (IllegalArgumentException e) {
// expected
}

// key not equal
String topic2 = "topicName2";
MultiMessageIdImpl item3 = new MultiMessageIdImpl(Collections.singletonMap(topic2, messageIdImpl2));
try {
item1.compareTo(item3);
fail("should throw exception for not comparable");
} catch (IllegalArgumentException e) {
// expected
}
try {
item3.compareTo(item1);
fail("should throw exception for not comparable");
} catch (IllegalArgumentException e) {
// expected
}

// value not equal
MultiMessageIdImpl item4 = new MultiMessageIdImpl(Collections.singletonMap(topic1, messageIdImpl3));
assertTrue(item1.compareTo(item4) < 0);
assertTrue(item4.compareTo(item1) > 0);

// key value not equal
try {
item3.compareTo(item4);
fail("should throw exception for not comparable");
} catch (IllegalArgumentException e) {
// expected
}
try {
item4.compareTo(item3);
fail("should throw exception for not comparable");
} catch (IllegalArgumentException e) {
// expected
}

// 2 items
Map<String, MessageId> map1 = new HashMap<>();
Map<String, MessageId> map2 = new HashMap<>();
map1.put(topic1, messageIdImpl1);
map1.put(topic2, messageIdImpl2);
map2.put(topic2, messageIdImpl2);
map2.put(topic1, messageIdImpl1);

MultiMessageIdImpl item5 = new MultiMessageIdImpl(map1);
MultiMessageIdImpl item6 = new MultiMessageIdImpl(map2);

assertTrue(item5.compareTo(item6) == 0);

try {
item5.compareTo(null1);
fail("should throw exception for not comparable");
} catch (IllegalArgumentException e) {
// expected
}

try {
item5.compareTo(empty1);
fail("should throw exception for not comparable");
} catch (IllegalArgumentException e) {
// expected
}

try {
item5.compareTo(item1);
fail("should throw exception for not comparable");
} catch (IllegalArgumentException e) {
// expected
}

try {
item5.compareTo(item3);
fail("should throw exception for not comparable");
} catch (IllegalArgumentException e) {
// expected
}

try {
item5.compareTo(item4);
fail("should throw exception for not comparable");
} catch (IllegalArgumentException e) {
// expected
}

map2.put(topic1, messageIdImpl3);
MultiMessageIdImpl item7 = new MultiMessageIdImpl(map2);

assertTrue(item7.compareTo(item5) > 0);
assertTrue(item5.compareTo(item7) < 0);

Map<String, MessageId> map3 = new HashMap<>();
map3.put(topic1, messageIdImpl3);
map3.put(topic2, messageIdImpl3);
MultiMessageIdImpl item8 = new MultiMessageIdImpl(map3);
assertTrue(item8.compareTo(item5) > 0);
assertTrue(item8.compareTo(item7) > 0);

assertTrue(item5.compareTo(item8) < 0);
assertTrue(item7.compareTo(item8) < 0);
}
}

0 comments on commit fd129bb

Please sign in to comment.