Skip to content

Commit

Permalink
[improve] [test] Add more test for the case that client receives a Se…
Browse files Browse the repository at this point in the history
…ndError, which relates to the PR apache#23038 (apache#23721)
  • Loading branch information
poorbarcode authored Dec 16, 2024
1 parent 0ae3f9d commit 1113153
Showing 1 changed file with 57 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,14 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
Expand All @@ -44,6 +48,7 @@
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Slf4j
@Test(groups = "broker-impl")
public class ClientCnxTest extends MockedPulsarServiceBaseTest {

Expand Down Expand Up @@ -137,6 +142,7 @@ public void testClientVersion() throws Exception {
public void testCnxReceiveSendError() throws Exception {
final String topicOne = "persistent://" + NAMESPACE + "/testCnxReceiveSendError-one";
final String topicTwo = "persistent://" + NAMESPACE + "/testCnxReceiveSendError-two";
final String topicThree = "persistent://" + NAMESPACE + "/testCnxReceiveSendError-three";

PulsarClient client = PulsarClient.builder().serviceUrl(lookupUrl.toString()).connectionsPerBroker(1).build();
Producer<String> producerOne = client.newProducer(Schema.STRING)
Expand All @@ -145,22 +151,31 @@ public void testCnxReceiveSendError() throws Exception {
Producer<String> producerTwo = client.newProducer(Schema.STRING)
.topic(topicTwo)
.create();
Producer<String> producerThree = client.newProducer(Schema.STRING)
.topic(topicThree).producerName("three")
.create();
ClientCnx cnxOne = ((ProducerImpl<?>) producerOne).getClientCnx();
ClientCnx cnxTwo = ((ProducerImpl<?>) producerTwo).getClientCnx();
ClientCnx cnxThree = ((ProducerImpl<?>) producerTwo).getClientCnx();

// simulate a sending error
cnxOne.handleSendError(Commands.newSendErrorCommand(((ProducerImpl<?>) producerOne).producerId,
10, ServerError.PersistenceError, "persistent error").getSendError());
10, ServerError.PersistenceError, "persistent error 1").getSendError());
cnxThree.handleSendError(Commands.newSendErrorCommand(((ProducerImpl<?>) producerOne).producerId,
10, ServerError.PersistenceError, "persistent error 3").getSendError());

// two producer use the same cnx
Assert.assertEquals(cnxOne, cnxTwo);
Assert.assertEquals(cnxThree, cnxTwo);

// the cnx will not change
try {
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() ->
(((ProducerImpl<?>) producerOne).getClientCnx() != null
&& !cnxOne.equals(((ProducerImpl<?>) producerOne).getClientCnx()))
|| !cnxTwo.equals(((ProducerImpl<?>) producerTwo).getClientCnx()));
|| (((ProducerImpl<?>) producerThree).getClientCnx() != null
&& !cnxThree.equals(((ProducerImpl<?>) producerThree).getClientCnx()))
|| !cnxTwo.equals(((ProducerImpl<?>) producerTwo).getClientCnx()));
Assert.fail();
} catch (Throwable e) {
Assert.assertTrue(e instanceof ConditionTimeoutException);
Expand All @@ -173,11 +188,51 @@ public void testCnxReceiveSendError() throws Exception {
// producer also can send message
producerOne.send("test");
producerTwo.send("test");
producerThree.send("test");
producerTwo.close();
producerOne.close();
producerThree.close();
client.close();
}

@Test
public void testCnxReceiveSendErrorWithMultiConnectionsPerBroker() throws Exception {
PulsarClient client = PulsarClient.builder().serviceUrl(lookupUrl.toString())
.connectionsPerBroker(1000).build();

// Create a producer with customized name.
final String tp = BrokerTestUtil.newUniqueName(NAMESPACE + "/tp");
admin.topics().createNonPartitionedTopic(tp);
ProducerImpl<String> p =
(ProducerImpl<String>) client.newProducer(Schema.STRING).producerName("p1").topic(tp).create();

// Inject a persistence error.
org.apache.pulsar.broker.service.Producer serverProducer = pulsar.getBrokerService().getTopic(tp, false)
.join().get().getProducers().values().iterator().next();
ServerCnx serverCnx = (ServerCnx) serverProducer.getCnx();
serverCnx.getCommandSender().sendSendError(serverProducer.getProducerId(), 1/* sequenceId */,
ServerError.PersistenceError, "mocked error");

// Wait for the client receives the error.
// If the client confirmed two Pings, it means the client has handled the PersistenceError we sent.
serverCnx.checkConnectionLiveness().join();
serverCnx.checkConnectionLiveness().join();

try {
// Verify: the next publish will finish.
MessageId messageId = p.sendAsync("1").get(10, TimeUnit.SECONDS);
MessageIdAdv messageIdAdv = (MessageIdAdv) messageId;
log.info("sent {}:{}", messageIdAdv.getLedgerId(), messageIdAdv.getEntryId());
} finally {
// cleanup orphan producers.
serverCnx.ctx().close();
// cleanup
client.close();
p.close();
admin.topics().delete(tp);
}
}

public void testSupportsGetPartitionedMetadataWithoutAutoCreation() throws Exception {
final String topic = BrokerTestUtil.newUniqueName( "persistent://" + NAMESPACE + "/tp");
admin.topics().createNonPartitionedTopic(topic);
Expand Down

0 comments on commit 1113153

Please sign in to comment.