diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java index c2eb957ee605d..8b82b7641cd56 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java @@ -1092,8 +1092,10 @@ public void testHasMessageAvailable() throws Exception { assertFalse(messageId instanceof BatchMessageIdImpl); ReaderImpl reader = (ReaderImpl)pulsarClient.newReader().topic(topicName) .startMessageId(messageId).startMessageIdInclusive().create(); - MessageIdImpl lastMsgId = (MessageIdImpl) reader.getConsumer().getLastMessageId(); - assertFalse(lastMsgId instanceof BatchMessageIdImpl); + List lastMsgIds = reader.getConsumer().getLastMessageIds(); + assertEquals(lastMsgIds.size(), 1); + assertEquals(lastMsgIds.get(0).getOwnerTopic(), topicName); + MessageIdAdv lastMsgId = (MessageIdAdv) lastMsgIds.get(0); assertEquals(lastMsgId.getLedgerId(), messageId.getLedgerId()); assertEquals(lastMsgId.getEntryId(), messageId.getEntryId()); reader.close(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java index ce4a0ae86ac4e..eb11e1a5bcdd8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java @@ -33,6 +33,7 @@ import org.apache.pulsar.client.api.ConsumerEventListener; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageIdAdv; import org.apache.pulsar.client.api.MessageRouter; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; @@ -42,7 +43,9 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.TopicMessageId; import org.apache.pulsar.client.api.TopicMetadata; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.PartitionedTopicStats; import org.apache.pulsar.common.policies.data.SubscriptionStats; @@ -75,6 +78,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertThrows; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -1097,6 +1101,11 @@ public void testGetLastMessageId() throws Exception { admin.topics().createPartitionedTopic(topicName2, 2); admin.topics().createPartitionedTopic(topicName3, 3); + final Set topics = new HashSet<>(); + topics.add(topicName1); + IntStream.range(0, 2).forEach(i -> topics.add(topicName2 + TopicName.PARTITIONED_TOPIC_SUFFIX + i)); + IntStream.range(0, 3).forEach(i -> topics.add(topicName3 + TopicName.PARTITIONED_TOPIC_SUFFIX + i)); + // 1. producer connect Producer producer1 = pulsarClient.newProducer().topic(topicName1) .enableBatching(false) @@ -1128,23 +1137,20 @@ public void testGetLastMessageId() throws Exception { producer3.send((messagePredicate + "producer3-" + i).getBytes()); } - MessageId messageId = consumer.getLastMessageId(); - assertTrue(messageId instanceof MultiMessageIdImpl); - MultiMessageIdImpl multiMessageId = (MultiMessageIdImpl) messageId; - Map map = multiMessageId.getMap(); - assertEquals(map.size(), 6); - map.forEach((k, v) -> { - log.info("topic: {}, messageId:{} ", k, v.toString()); - assertTrue(v instanceof MessageIdImpl); - MessageIdImpl messageId1 = (MessageIdImpl) v; - if (k.contains(topicName1)) { - assertEquals(messageId1.entryId, totalMessages - 1); - } else if (k.contains(topicName2)) { - assertEquals(messageId1.entryId, totalMessages / 2 - 1); + assertThrows(PulsarClientException.class, consumer::getLastMessageId); + List msgIds = consumer.getLastMessageIds(); + assertEquals(msgIds.size(), 6); + assertEquals(msgIds.stream().map(TopicMessageId::getOwnerTopic).collect(Collectors.toSet()), topics); + for (TopicMessageId msgId : msgIds) { + int numMessages = (int) ((MessageIdAdv) msgId).getEntryId() + 1; + if (msgId.getOwnerTopic().equals(topicName1)) { + assertEquals(numMessages, totalMessages); + } else if (msgId.getOwnerTopic().startsWith(topicName2)) { + assertEquals(numMessages, totalMessages / 2); } else { - assertEquals(messageId1.entryId, totalMessages / 3 - 1); + assertEquals(numMessages, totalMessages / 3); } - }); + } for (int i = 0; i < totalMessages; i++) { producer1.send((messagePredicate + "producer1-" + i).getBytes()); @@ -1152,23 +1158,20 @@ public void testGetLastMessageId() throws Exception { producer3.send((messagePredicate + "producer3-" + i).getBytes()); } - messageId = consumer.getLastMessageId(); - assertTrue(messageId instanceof MultiMessageIdImpl); - MultiMessageIdImpl multiMessageId2 = (MultiMessageIdImpl) messageId; - Map map2 = multiMessageId2.getMap(); - assertEquals(map2.size(), 6); - map2.forEach((k, v) -> { - log.info("topic: {}, messageId:{} ", k, v.toString()); - assertTrue(v instanceof MessageIdImpl); - MessageIdImpl messageId1 = (MessageIdImpl) v; - if (k.contains(topicName1)) { - assertEquals(messageId1.entryId, totalMessages * 2 - 1); - } else if (k.contains(topicName2)) { - assertEquals(messageId1.entryId, totalMessages - 1); + assertThrows(PulsarClientException.class, consumer::getLastMessageId); + msgIds = consumer.getLastMessageIds(); + assertEquals(msgIds.size(), 6); + assertEquals(msgIds.stream().map(TopicMessageId::getOwnerTopic).collect(Collectors.toSet()), topics); + for (TopicMessageId msgId : msgIds) { + int numMessages = (int) ((MessageIdAdv) msgId).getEntryId() + 1; + if (msgId.getOwnerTopic().equals(topicName1)) { + assertEquals(numMessages, totalMessages * 2); + } else if (msgId.getOwnerTopic().startsWith(topicName2)) { + assertEquals(numMessages, totalMessages / 2 * 2); } else { - assertEquals(messageId1.entryId, totalMessages * 2 / 3 - 1); + assertEquals(numMessages, totalMessages / 3 * 2); } - }); + } consumer.unsubscribe(); consumer.close(); diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java index 694099004965a..a97200ab0d4cb 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.api; import java.io.Closeable; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -536,19 +537,38 @@ CompletableFuture reconsumeLaterCumulativeAsync(Message message, CompletableFuture seekAsync(long timestamp); /** - * Get the last message id available for consume. + * Get the last message id of the topic subscribed. * - * @return the last message id. + * @return the last message id of the topic subscribed + * @throws PulsarClientException if multiple topics or partitioned topics are subscribed or failed because of a + * network issue + * NOTE: Use {@link Consumer#getLastMessageIds()} instead. */ + @Deprecated MessageId getLastMessageId() throws PulsarClientException; /** - * Get the last message id available for consume. - * - * @return a future that can be used to track the completion of the operation. + * The asynchronous version of {@link Consumer#getLastMessageId()}. + * NOTE: Use {@link Consumer#getLastMessageIdsAsync()} instead. */ + @Deprecated CompletableFuture getLastMessageIdAsync(); + /** + * Get all the last message id of the topics the consumer subscribed. + * + * @return the list of TopicMessageId instances of all the topics that the consumer subscribed + * @throws PulsarClientException if failed to get last message id. + * @apiNote It's guaranteed that the owner topic of each TopicMessageId in the returned list is different from owner + * topics of other TopicMessageId instances + */ + List getLastMessageIds() throws PulsarClientException; + + /** + * The asynchronous version of {@link Consumer#getLastMessageIds()}. + */ + CompletableFuture> getLastMessageIdsAsync(); + /** * @return Whether the consumer is connected to the broker */ diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 973b3302f4199..467a85030a01c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -53,6 +53,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.TopicMessageId; import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.impl.transaction.TransactionImpl; @@ -745,6 +746,18 @@ public MessageId getLastMessageId() throws PulsarClientException { @Override public abstract CompletableFuture getLastMessageIdAsync(); + @Override + public List getLastMessageIds() throws PulsarClientException { + try { + return getLastMessageIdsAsync().get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw PulsarClientException.unwrap(e); + } catch (ExecutionException e) { + throw PulsarClientException.unwrap(e); + } + } + private boolean isCumulativeAcknowledgementAllowed(SubscriptionType type) { return SubscriptionType.Shared != type && SubscriptionType.Key_Shared != type; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index fb372566426d3..1a557b6423493 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -2341,6 +2341,12 @@ public CompletableFuture getLastMessageIdAsync() { return internalGetLastMessageIdAsync().thenApply(r -> r.lastMessageId); } + @Override + public CompletableFuture> getLastMessageIdsAsync() { + return getLastMessageIdAsync() + .thenApply(msgId -> Collections.singletonList(TopicMessageId.create(topic, msgId))); + } + public CompletableFuture internalGetLastMessageIdAsync() { if (getState() == State.Closing || getState() == State.Closed) { return FutureUtil diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiMessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiMessageIdImpl.java deleted file mode 100644 index 6e60239ffe537..0000000000000 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiMessageIdImpl.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.client.impl; - -import java.util.Map; -import java.util.Map.Entry; -import java.util.Objects; -import lombok.Getter; -import org.apache.pulsar.client.api.MessageId; - -/** - * A MessageId implementation that contains a map of . - * This is useful when MessageId is need for partition/multi-topics/pattern consumer. - * e.g. seek(), ackCumulative(), getLastMessageId(). - */ -public class MultiMessageIdImpl implements MessageId { - @Getter - private Map map; - - MultiMessageIdImpl(Map map) { - this.map = map; - } - - // TODO: Add support for Serialization and Deserialization - // https://github.com/apache/pulsar/issues/4940 - @Override - public byte[] toByteArray() { - throw new UnsupportedOperationException(); - } - - @Override - public int hashCode() { - return Objects.hash(map); - } - - // If all messageId in map are same size, and all bigger/smaller than the other, return valid value. - @Override - public int compareTo(MessageId o) { - if (!(o instanceof MultiMessageIdImpl)) { - throw new IllegalArgumentException( - "expected MultiMessageIdImpl object. Got instance of " + o.getClass().getName()); - } - - MultiMessageIdImpl other = (MultiMessageIdImpl) o; - Map otherMap = other.getMap(); - - if ((map == null || map.isEmpty()) && (otherMap == null || otherMap.isEmpty())) { - return 0; - } - - if (otherMap == null || map == null || otherMap.size() != map.size()) { - throw new IllegalArgumentException("Current size and other size not equals"); - } - - int result = 0; - for (Entry entry : map.entrySet()) { - MessageId otherMessage = otherMap.get(entry.getKey()); - if (otherMessage == null) { - throw new IllegalArgumentException( - "Other MessageId not have topic " + entry.getKey()); - } - - int currentResult = entry.getValue().compareTo(otherMessage); - if (result == 0) { - result = currentResult; - } else if (currentResult == 0) { - continue; - } else if (result != currentResult) { - throw new IllegalArgumentException( - "Different MessageId in Map get different compare result"); - } else { - continue; - } - } - - return result; - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof MultiMessageIdImpl) { - MultiMessageIdImpl other = (MultiMessageIdImpl) obj; - - try { - return compareTo(other) == 0; - } catch (IllegalArgumentException e) { - return false; - } - } - return false; - } -} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 5fe0e4a82b840..1f4b69c13a9fd 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -21,8 +21,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableMap.Builder; import com.google.common.collect.Lists; import io.netty.util.Timeout; import io.netty.util.TimerTask; @@ -49,7 +47,6 @@ import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; -import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.client.api.BatchReceivePolicy; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerStats; @@ -1470,30 +1467,20 @@ public Timeout getPartitionsAutoUpdateTimeout() { @Override public CompletableFuture getLastMessageIdAsync() { - CompletableFuture returnFuture = new CompletableFuture<>(); - - Map> messageIdFutures = consumers.entrySet().stream() - .map(entry -> Pair.of(entry.getKey(), entry.getValue().getLastMessageIdAsync())) - .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); - - CompletableFuture - .allOf(messageIdFutures.values().toArray(new CompletableFuture[0])) - .whenComplete((ignore, ex) -> { - Builder builder = ImmutableMap.builder(); - messageIdFutures.forEach((key, future) -> { - MessageId messageId; - try { - messageId = future.get(); - } catch (Exception e) { - log.warn("[{}] Exception when topic {} getLastMessageId.", key, e); - messageId = MessageId.earliest; - } - builder.put(key, messageId); - }); - returnFuture.complete(new MultiMessageIdImpl(builder.build())); - }); + return FutureUtil.failedFuture(new PulsarClientException( + "getLastMessageIdAsync cannot be used on a multi-topics consumer")); + } - return returnFuture; + @Override + public CompletableFuture> getLastMessageIdsAsync() { + final List>> futures = consumers.values().stream() + .map(ConsumerImpl::getLastMessageIdsAsync) + .collect(Collectors.toList()); + return FutureUtil.waitForAll(futures).thenApply(__ -> { + final List messageIds = new ArrayList<>(); + futures.stream().map(CompletableFuture::join).forEach(messageIds::addAll); + return messageIds; + }); } private static final Logger log = LoggerFactory.getLogger(MultiTopicsConsumerImpl.class); diff --git a/pulsar-client/src/main/resources/findbugsExclude.xml b/pulsar-client/src/main/resources/findbugsExclude.xml index 92ec9e934ee1e..0dc325f97eb8c 100644 --- a/pulsar-client/src/main/resources/findbugsExclude.xml +++ b/pulsar-client/src/main/resources/findbugsExclude.xml @@ -597,11 +597,6 @@ - - - - - diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageIdImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageIdImplTest.java index 10d805cdc4db3..07e89c1dc7ab8 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageIdImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageIdImplTest.java @@ -21,7 +21,6 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotEquals; import java.io.IOException; -import java.util.Collections; import org.testng.annotations.Test; public class BatchMessageIdImplTest { @@ -68,18 +67,6 @@ public void equalsTest() { assertEquals(msgId, batchMsgId4); } - @Test - public void notEqualsMultiTest() { - BatchMessageIdImpl batchMsgId = new BatchMessageIdImpl(0, 0, 0, 0); - MessageIdImpl msgId = new MessageIdImpl(0, 0, 0); - MultiMessageIdImpl multiMsgId = new MultiMessageIdImpl(Collections.singletonMap("topic", msgId)); - - assertNotEquals(msgId, multiMsgId); - assertNotEquals(multiMsgId, msgId); - assertNotEquals(batchMsgId, multiMsgId); - assertNotEquals(multiMsgId, batchMsgId); - } - @Test public void compareToUnbatchedTest() { MessageIdImpl msgId = new MessageIdImpl(1, 2, 3); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java index 4f0eca6ea4af8..99435118b2863 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java @@ -19,14 +19,8 @@ package org.apache.pulsar.client.impl; import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertTrue; -import static org.testng.Assert.fail; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import org.apache.pulsar.client.api.MessageId; import org.testng.annotations.Test; /** @@ -188,220 +182,4 @@ public void testBatchMessageIdImplCompareToTopicMessageId() { assertTrue(topicMessageId2.compareTo(messageIdImpl2) < 0, "Expected to be less than"); assertTrue(topicMessageId2.compareTo(messageIdImpl2) < 0, "Expected to be less than"); } - - @Test - public void testMultiMessageIdEqual() { - // null - MultiMessageIdImpl null1 = new MultiMessageIdImpl(null); - MultiMessageIdImpl null2 = new MultiMessageIdImpl(null); - assertEquals(null1, null2); - - // empty - MultiMessageIdImpl empty1 = new MultiMessageIdImpl(Collections.emptyMap()); - MultiMessageIdImpl empty2 = new MultiMessageIdImpl(Collections.emptyMap()); - assertEquals(empty1, empty2); - - // null empty - assertEquals(null1, empty2); - assertEquals(empty2, null1); - - // 1 item - String topic1 = "topicName1"; - MessageIdImpl messageIdImpl1 = new MessageIdImpl(123L, 345L, 567); - MessageIdImpl messageIdImpl2 = new MessageIdImpl(123L, 345L, 567); - MessageIdImpl messageIdImpl3 = new MessageIdImpl(345L, 456L, 567); - - MultiMessageIdImpl item1 = new MultiMessageIdImpl(Collections.singletonMap(topic1, messageIdImpl1)); - MultiMessageIdImpl item2 = new MultiMessageIdImpl(Collections.singletonMap(topic1, messageIdImpl2)); - assertEquals(item1, item2); - - // 1 item, empty not equal - assertNotEquals(item1, null1); - assertNotEquals(null1, item1); - - // key not equal - String topic2 = "topicName2"; - MultiMessageIdImpl item3 = new MultiMessageIdImpl(Collections.singletonMap(topic2, messageIdImpl2)); - assertNotEquals(item1, item3); - assertNotEquals(item3, item1); - - // value not equal - MultiMessageIdImpl item4 = new MultiMessageIdImpl(Collections.singletonMap(topic1, messageIdImpl3)); - assertNotEquals(item1, item4); - assertNotEquals(item4, item1); - - // key value not equal - assertNotEquals(item3, item4); - assertNotEquals(item4, item3); - - // 2 items - Map map1 = new HashMap<>(); - Map map2 = new HashMap<>(); - map1.put(topic1, messageIdImpl1); - map1.put(topic2, messageIdImpl2); - map2.put(topic2, messageIdImpl2); - map2.put(topic1, messageIdImpl1); - - MultiMessageIdImpl item5 = new MultiMessageIdImpl(map1); - MultiMessageIdImpl item6 = new MultiMessageIdImpl(map2); - - assertEquals(item5, item6); - - assertNotEquals(item5, null1); - assertNotEquals(item5, empty1); - assertNotEquals(item5, item1); - assertNotEquals(item5, item3); - assertNotEquals(item5, item4); - - assertNotEquals(null1, item5); - assertNotEquals(empty1, item5); - assertNotEquals(item1, item5); - assertNotEquals(item3, item5); - assertNotEquals(item4, item5); - - map2.put(topic1, messageIdImpl3); - MultiMessageIdImpl item7 = new MultiMessageIdImpl(map2); - assertNotEquals(item5, item7); - assertNotEquals(item7, item5); - } - - @Test - public void testMultiMessageIdCompareto() { - // null - MultiMessageIdImpl null1 = new MultiMessageIdImpl(null); - MultiMessageIdImpl null2 = new MultiMessageIdImpl(null); - assertEquals(0, null1.compareTo(null2)); - - // empty - MultiMessageIdImpl empty1 = new MultiMessageIdImpl(Collections.emptyMap()); - MultiMessageIdImpl empty2 = new MultiMessageIdImpl(Collections.emptyMap()); - assertEquals(0, empty1.compareTo(empty2)); - - // null empty - assertEquals(0, null1.compareTo(empty2)); - assertEquals(0, empty2.compareTo(null1)); - - // 1 item - String topic1 = "topicName1"; - MessageIdImpl messageIdImpl1 = new MessageIdImpl(123L, 345L, 567); - MessageIdImpl messageIdImpl2 = new MessageIdImpl(123L, 345L, 567); - MessageIdImpl messageIdImpl3 = new MessageIdImpl(345L, 456L, 567); - - MultiMessageIdImpl item1 = new MultiMessageIdImpl(Collections.singletonMap(topic1, messageIdImpl1)); - MultiMessageIdImpl item2 = new MultiMessageIdImpl(Collections.singletonMap(topic1, messageIdImpl2)); - assertEquals(0, item1.compareTo(item2)); - - // 1 item, empty not equal - try { - item1.compareTo(null1); - fail("should throw exception for not comparable"); - } catch (IllegalArgumentException e) { - // expected - } - try { - null1.compareTo(item1); - fail("should throw exception for not comparable"); - } catch (IllegalArgumentException e) { - // expected - } - - // key not equal - String topic2 = "topicName2"; - MultiMessageIdImpl item3 = new MultiMessageIdImpl(Collections.singletonMap(topic2, messageIdImpl2)); - try { - item1.compareTo(item3); - fail("should throw exception for not comparable"); - } catch (IllegalArgumentException e) { - // expected - } - try { - item3.compareTo(item1); - fail("should throw exception for not comparable"); - } catch (IllegalArgumentException e) { - // expected - } - - // value not equal - MultiMessageIdImpl item4 = new MultiMessageIdImpl(Collections.singletonMap(topic1, messageIdImpl3)); - assertTrue(item1.compareTo(item4) < 0); - assertTrue(item4.compareTo(item1) > 0); - - // key value not equal - try { - item3.compareTo(item4); - fail("should throw exception for not comparable"); - } catch (IllegalArgumentException e) { - // expected - } - try { - item4.compareTo(item3); - fail("should throw exception for not comparable"); - } catch (IllegalArgumentException e) { - // expected - } - - // 2 items - Map map1 = new HashMap<>(); - Map map2 = new HashMap<>(); - map1.put(topic1, messageIdImpl1); - map1.put(topic2, messageIdImpl2); - map2.put(topic2, messageIdImpl2); - map2.put(topic1, messageIdImpl1); - - MultiMessageIdImpl item5 = new MultiMessageIdImpl(map1); - MultiMessageIdImpl item6 = new MultiMessageIdImpl(map2); - - assertTrue(item5.compareTo(item6) == 0); - - try { - item5.compareTo(null1); - fail("should throw exception for not comparable"); - } catch (IllegalArgumentException e) { - // expected - } - - try { - item5.compareTo(empty1); - fail("should throw exception for not comparable"); - } catch (IllegalArgumentException e) { - // expected - } - - try { - item5.compareTo(item1); - fail("should throw exception for not comparable"); - } catch (IllegalArgumentException e) { - // expected - } - - try { - item5.compareTo(item3); - fail("should throw exception for not comparable"); - } catch (IllegalArgumentException e) { - // expected - } - - try { - item5.compareTo(item4); - fail("should throw exception for not comparable"); - } catch (IllegalArgumentException e) { - // expected - } - - map2.put(topic1, messageIdImpl3); - MultiMessageIdImpl item7 = new MultiMessageIdImpl(map2); - - assertTrue(item7.compareTo(item5) > 0); - assertTrue(item5.compareTo(item7) < 0); - - Map map3 = new HashMap<>(); - map3.put(topic1, messageIdImpl3); - map3.put(topic2, messageIdImpl3); - MultiMessageIdImpl item8 = new MultiMessageIdImpl(map3); - assertTrue(item8.compareTo(item5) > 0); - assertTrue(item8.compareTo(item7) > 0); - - assertTrue(item5.compareTo(item8) < 0); - assertTrue(item7.compareTo(item8) < 0); - } }