Skip to content

Commit

Permalink
fix getMessageIdByTimestamp cannot redirect
Browse files Browse the repository at this point in the history
  • Loading branch information
liudezhi2098 committed Nov 29, 2024
1 parent 47ab166 commit a1df211
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1843,8 +1843,8 @@ public void getMessageIdByTimestamp(
if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to get message ID by timestamp {} from {}",
clientAppId(), timestamp, topicName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,16 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.pulsar.broker.MultiBrokerBaseTest;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.LeaderBroker;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
Expand Down Expand Up @@ -122,4 +125,27 @@ public void testTopicLookup(TopicDomain topicDomain, boolean isPartition) throws
Assert.assertEquals(lookupResultSet.size(), 1);
}

@Test(timeOut = 30 * 1000)
public void testTopicGetMessageIdByTimestamp() throws Exception {
PulsarAdmin admin0 = getAllAdmins().get(0);
String topic = "public/default/t1";
admin0.topics().createPartitionedTopic(topic, 1);
pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
for (int i = 0; i < 20; i++) {
producer.send("msg".getBytes());
}
String brokerUrl = admin0.lookups().lookupTopic(topic + "-partition-0");
PulsarAdmin admin = null;
for (PulsarService additionalBroker : additionalBrokers) {
if (!brokerUrl.endsWith(String.valueOf(additionalBroker.getBrokerListenPort().get()))) {
admin = additionalBroker.getAdminClient();
}
}
Assert.assertNotNull(admin);
MessageId msgId =
admin.topics().getMessageIdByTimestamp(topic + "-partition-0", System.currentTimeMillis());
Assert.assertNotNull(msgId);
}
}

0 comments on commit a1df211

Please sign in to comment.