diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 7846df838f6ed..0e638881c70db 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -3983,7 +3983,8 @@ private void checkManagedLedgerIsOpen() throws ManagedLedgerException { } } - synchronized void setFenced() { + @VisibleForTesting + public synchronized void setFenced() { log.info("{} Moving to Fenced state", name); STATE_UPDATER.set(this, State.Fenced); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 773fc7cab4a54..1d9751c0cd876 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1040,6 +1040,11 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St } else if (ex.getCause() instanceof BrokerServiceException.SubscriptionFencedException && isCompactionSubscription(subscriptionName)) { log.warn("[{}] Failed to create compaction subscription: {}", topic, ex.getMessage()); + } else if (ex.getCause() instanceof ManagedLedgerFencedException) { + // If the topic has been fenced, we cannot continue using it. We need to close and reopen + log.warn("[{}][{}] has been fenced. closing the topic {}", topic, subscriptionName, + ex.getMessage()); + close(); } else { log.error("[{}] Failed to create subscription: {}", topic, subscriptionName, ex); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index f5c5e6742423c..4bcb0e0065c35 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -4875,4 +4875,31 @@ private int compareMessageIds(MessageIdImpl messageId1, MessageIdImpl messageId2 return 0; } } + + @Test + public void testFencedLedger() throws Exception { + log.info("-- Starting {} test --", methodName); + + final String topic = "persistent://my-property/my-ns/fencedLedger"; + + @Cleanup + PulsarClient newPulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).build(); + + @Cleanup + Producer producer = newPulsarClient.newProducer().topic(topic).enableBatching(false).create(); + + final int numMessages = 5; + for (int i = 0; i < numMessages; i++) { + producer.newMessage().value(("value-" + i).getBytes(UTF_8)).eventTime((i + 1) * 100L).sendAsync(); + } + producer.flush(); + + PersistentTopic pTopic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get(); + ManagedLedgerImpl ml = (ManagedLedgerImpl) pTopic.getManagedLedger(); + ml.setFenced(); + + Reader reader = newPulsarClient.newReader().topic(topic).startMessageId(MessageId.earliest) + .createAsync().get(5, TimeUnit.SECONDS); + assertNotNull(reader); + } }