From 1113153bd33a59b5997a1473514cc63f34d54991 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Mon, 16 Dec 2024 10:11:58 +0800 Subject: [PATCH] [improve] [test] Add more test for the case that client receives a SendError, which relates to the PR #23038 (#23721) --- .../pulsar/client/impl/ClientCnxTest.java | 59 ++++++++++++++++++- 1 file changed, 57 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java index e25212e0108f8..35b89ad988cc4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java @@ -26,10 +26,14 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.service.ServerCnx; import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageIdAdv; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; @@ -44,6 +48,7 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +@Slf4j @Test(groups = "broker-impl") public class ClientCnxTest extends MockedPulsarServiceBaseTest { @@ -137,6 +142,7 @@ public void testClientVersion() throws Exception { public void testCnxReceiveSendError() throws Exception { final String topicOne = "persistent://" + NAMESPACE + "/testCnxReceiveSendError-one"; final String topicTwo = "persistent://" + NAMESPACE + "/testCnxReceiveSendError-two"; + final String topicThree = "persistent://" + NAMESPACE + "/testCnxReceiveSendError-three"; PulsarClient client = PulsarClient.builder().serviceUrl(lookupUrl.toString()).connectionsPerBroker(1).build(); Producer producerOne = client.newProducer(Schema.STRING) @@ -145,22 +151,31 @@ public void testCnxReceiveSendError() throws Exception { Producer producerTwo = client.newProducer(Schema.STRING) .topic(topicTwo) .create(); + Producer producerThree = client.newProducer(Schema.STRING) + .topic(topicThree).producerName("three") + .create(); ClientCnx cnxOne = ((ProducerImpl) producerOne).getClientCnx(); ClientCnx cnxTwo = ((ProducerImpl) producerTwo).getClientCnx(); + ClientCnx cnxThree = ((ProducerImpl) producerTwo).getClientCnx(); // simulate a sending error cnxOne.handleSendError(Commands.newSendErrorCommand(((ProducerImpl) producerOne).producerId, - 10, ServerError.PersistenceError, "persistent error").getSendError()); + 10, ServerError.PersistenceError, "persistent error 1").getSendError()); + cnxThree.handleSendError(Commands.newSendErrorCommand(((ProducerImpl) producerOne).producerId, + 10, ServerError.PersistenceError, "persistent error 3").getSendError()); // two producer use the same cnx Assert.assertEquals(cnxOne, cnxTwo); + Assert.assertEquals(cnxThree, cnxTwo); // the cnx will not change try { Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> (((ProducerImpl) producerOne).getClientCnx() != null && !cnxOne.equals(((ProducerImpl) producerOne).getClientCnx())) - || !cnxTwo.equals(((ProducerImpl) producerTwo).getClientCnx())); + || (((ProducerImpl) producerThree).getClientCnx() != null + && !cnxThree.equals(((ProducerImpl) producerThree).getClientCnx())) + || !cnxTwo.equals(((ProducerImpl) producerTwo).getClientCnx())); Assert.fail(); } catch (Throwable e) { Assert.assertTrue(e instanceof ConditionTimeoutException); @@ -173,11 +188,51 @@ public void testCnxReceiveSendError() throws Exception { // producer also can send message producerOne.send("test"); producerTwo.send("test"); + producerThree.send("test"); producerTwo.close(); producerOne.close(); + producerThree.close(); client.close(); } + @Test + public void testCnxReceiveSendErrorWithMultiConnectionsPerBroker() throws Exception { + PulsarClient client = PulsarClient.builder().serviceUrl(lookupUrl.toString()) + .connectionsPerBroker(1000).build(); + + // Create a producer with customized name. + final String tp = BrokerTestUtil.newUniqueName(NAMESPACE + "/tp"); + admin.topics().createNonPartitionedTopic(tp); + ProducerImpl p = + (ProducerImpl) client.newProducer(Schema.STRING).producerName("p1").topic(tp).create(); + + // Inject a persistence error. + org.apache.pulsar.broker.service.Producer serverProducer = pulsar.getBrokerService().getTopic(tp, false) + .join().get().getProducers().values().iterator().next(); + ServerCnx serverCnx = (ServerCnx) serverProducer.getCnx(); + serverCnx.getCommandSender().sendSendError(serverProducer.getProducerId(), 1/* sequenceId */, + ServerError.PersistenceError, "mocked error"); + + // Wait for the client receives the error. + // If the client confirmed two Pings, it means the client has handled the PersistenceError we sent. + serverCnx.checkConnectionLiveness().join(); + serverCnx.checkConnectionLiveness().join(); + + try { + // Verify: the next publish will finish. + MessageId messageId = p.sendAsync("1").get(10, TimeUnit.SECONDS); + MessageIdAdv messageIdAdv = (MessageIdAdv) messageId; + log.info("sent {}:{}", messageIdAdv.getLedgerId(), messageIdAdv.getEntryId()); + } finally { + // cleanup orphan producers. + serverCnx.ctx().close(); + // cleanup + client.close(); + p.close(); + admin.topics().delete(tp); + } + } + public void testSupportsGetPartitionedMetadataWithoutAutoCreation() throws Exception { final String topic = BrokerTestUtil.newUniqueName( "persistent://" + NAMESPACE + "/tp"); admin.topics().createNonPartitionedTopic(topic);