diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index 34d1d15764d39..a22418ad8e668 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -41,6 +41,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.io.StringReader; import java.lang.reflect.Array; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; @@ -67,6 +68,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpResponse; import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpGet; @@ -102,6 +104,10 @@ import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.awaitility.Awaitility; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.MockZooKeeper; +import org.glassfish.jersey.client.JerseyClient; +import org.glassfish.jersey.client.JerseyClientBuilder; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -1461,4 +1467,117 @@ public void testDuplicateAcknowledgement() throws Exception { assertEquals(admin.topics().getStats(topicName).getSubscriptions() .get("sub-1").getUnackedMessages(), 0); } + + @Test + public void testMetricsPersistentTopicLoadFails() throws Exception { + final String namespace = "prop/" + UUID.randomUUID().toString().replaceAll("-", ""); + String topic = "persistent://" + namespace + "/topic1_" + UUID.randomUUID(); + admin.namespaces().createNamespace(namespace); + admin.topics().createNonPartitionedTopic(topic); + admin.topics().unload(topic); + + // Inject an error that makes the topic load fails. + AtomicBoolean failMarker = new AtomicBoolean(true); + mockZooKeeper.failConditional(KeeperException.Code.NODEEXISTS, (op, path) -> { + if (failMarker.get() && op.equals(MockZooKeeper.Op.SET) && + path.endsWith(TopicName.get(topic).getPersistenceNamingEncoding())) { + return true; + } + return false; + }); + + // Do test + Thread.sleep(1000 * 3); + CompletableFuture> producer = pulsarClient.newProducer().topic(topic).createAsync(); + JerseyClient httpClient = JerseyClientBuilder.createClient(); + Awaitility.await().until(() -> { + String response = httpClient.target(pulsar.getWebServiceAddress()).path("/metrics/") + .request().get(String.class); + BufferedReader reader = new BufferedReader(new StringReader(response)); + String line; + String metricsLine = null; + while ((line = reader.readLine()) != null) { + if (StringUtils.isBlank(line)) { + continue; + } + if (line.startsWith("#")) { + continue; + } + if (line.contains("topic_load_failed")) { + metricsLine = line; + break; + } + } + log.info("topic_load_failed: {}", metricsLine); + if (metricsLine == null) { + return false; + } + reader.close(); + String[] parts = metricsLine.split(" "); + Double value = Double.valueOf(parts[parts.length - 1]); + return value >= 1D; + }); + + // Remove the injection. + failMarker.set(false); + // cleanup. + httpClient.close(); + producer.join().close(); + admin.topics().delete(topic); + admin.namespaces().deleteNamespace(namespace); + } + + @Test + public void testMetricsNonPersistentTopicLoadFails() throws Exception { + final String namespace = "prop/" + UUID.randomUUID().toString().replaceAll("-", ""); + String topic = "non-persistent://" + namespace + "/topic1_" + UUID.randomUUID(); + admin.namespaces().createNamespace(namespace); + + // Inject an error that makes the topic load fails. + pulsar.getConfiguration().setEnableNonPersistentTopics(false); + + // Do test. + CompletableFuture> producer = pulsarClient.newProducer().topic(topic).createAsync(); + JerseyClient httpClient = JerseyClientBuilder.createClient(); + Awaitility.await().until(() -> { + String response = httpClient.target(pulsar.getWebServiceAddress()).path("/metrics/") + .request().get(String.class); + BufferedReader reader = new BufferedReader(new StringReader(response)); + String line; + String metricsLine = null; + while ((line = reader.readLine()) != null) { + if (StringUtils.isBlank(line)) { + continue; + } + if (line.startsWith("#")) { + continue; + } + if (line.contains("topic_load_failed")) { + metricsLine = line; + break; + } + } + log.info("topic_load_failed: {}", metricsLine); + if (metricsLine == null) { + return false; + } + reader.close(); + String[] parts = metricsLine.split(" "); + Double value = Double.valueOf(parts[parts.length - 1]); + return value >= 1D; + }); + + // Remove the injection. + pulsar.getConfiguration().setEnableNonPersistentTopics(true); + + // cleanup. + httpClient.close(); + try { + producer.join().close(); + } catch (Exception ex) { + // The producer creation failed, so skip to close it. + } + admin.topics().delete(topic); + admin.namespaces().deleteNamespace(namespace); + } }