From 8ff51ebb6979664d006c175f6fb1ad4ae89cde8a Mon Sep 17 00:00:00 2001 From: Yan Zhao Date: Mon, 18 Sep 2023 23:29:49 +0800 Subject: [PATCH] [fix] [auto-recovery] Fix PulsarLedgerUnderreplicationManager notify problem. (#21161) --- pulsar-metadata/pom.xml | 2 - .../PulsarLedgerUnderreplicationManager.java | 64 ++-- .../replication/AuditorLedgerCheckerTest.java | 312 ++++++++++++++++++ .../LedgerUnderreplicationManagerTest.java | 19 +- 4 files changed, 371 insertions(+), 26 deletions(-) create mode 100644 pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java diff --git a/pulsar-metadata/pom.xml b/pulsar-metadata/pom.xml index c4f7c4ee81061..1e4994e768e2a 100644 --- a/pulsar-metadata/pom.xml +++ b/pulsar-metadata/pom.xml @@ -57,7 +57,6 @@ - io.dropwizard.metrics metrics-core @@ -85,7 +84,6 @@ - org.xerial.snappy snappy-java diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java index aaa247bfb32fc..2673328b81139 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java @@ -105,14 +105,17 @@ long getLedgerNodeVersion() { private final String urLockPath; private final String layoutPath; private final String lostBookieRecoveryDelayPath; + private final String replicationDisablePath; private final String checkAllLedgersCtimePath; private final String placementPolicyCheckCtimePath; private final String replicasCheckCtimePath; private final MetadataStoreExtended store; - private BookkeeperInternalCallbacks.GenericCallback replicationEnabledListener; - private BookkeeperInternalCallbacks.GenericCallback lostBookieRecoveryDelayListener; + private final List> replicationEnabledCallbacks = + new ArrayList<>(); + private final List> lostBookieRecoveryDelayCallbacks = + new ArrayList<>(); private static class PulsarUnderreplicatedLedger extends UnderreplicatedLedger { PulsarUnderreplicatedLedger(long ledgerId) { @@ -139,6 +142,7 @@ public PulsarLedgerUnderreplicationManager(AbstractConfiguration conf, Metada urLedgerPath = basePath + BookKeeperConstants.DEFAULT_ZK_LEDGERS_ROOT_PATH; urLockPath = basePath + '/' + BookKeeperConstants.UNDER_REPLICATION_LOCK; lostBookieRecoveryDelayPath = basePath + '/' + BookKeeperConstants.LOSTBOOKIERECOVERYDELAY_NODE; + replicationDisablePath = basePath + '/' + BookKeeperConstants.DISABLE_NODE; checkAllLedgersCtimePath = basePath + '/' + BookKeeperConstants.CHECK_ALL_LEDGERS_CTIME; placementPolicyCheckCtimePath = basePath + '/' + BookKeeperConstants.PLACEMENT_POLICY_CHECK_CTIME; replicasCheckCtimePath = basePath + '/' + BookKeeperConstants.REPLICAS_CHECK_CTIME; @@ -232,17 +236,34 @@ private void handleNotification(Notification n) { synchronized (this) { // Notify that there were some changes on the under-replicated z-nodes notifyAll(); - - if (n.getType() == NotificationType.Deleted) { - if (n.getPath().equals(basePath + '/' + BookKeeperConstants.DISABLE_NODE)) { - log.info("LedgerReplication is enabled externally through MetadataStore, " - + "since DISABLE_NODE ZNode is deleted"); - if (replicationEnabledListener != null) { - replicationEnabledListener.operationComplete(0, null); + if (lostBookieRecoveryDelayPath.equals(n.getPath())) { + final List> callbackList; + synchronized (lostBookieRecoveryDelayCallbacks) { + callbackList = new ArrayList<>(lostBookieRecoveryDelayCallbacks); + lostBookieRecoveryDelayCallbacks.clear(); + } + for (BookkeeperInternalCallbacks.GenericCallback callback : callbackList) { + try { + callback.operationComplete(0, null); + } catch (Exception e) { + log.warn("lostBookieRecoveryDelayCallbacks handle error", e); } - } else if (n.getPath().equals(lostBookieRecoveryDelayPath)) { - if (lostBookieRecoveryDelayListener != null) { - lostBookieRecoveryDelayListener.operationComplete(0, null); + } + return; + } + if (replicationDisablePath.equals(n.getPath()) && n.getType() == NotificationType.Deleted) { + log.info("LedgerReplication is enabled externally through MetadataStore, " + + "since DISABLE_NODE ZNode is deleted"); + final List> callbackList; + synchronized (replicationEnabledCallbacks) { + callbackList = new ArrayList<>(replicationEnabledCallbacks); + replicationEnabledCallbacks.clear(); + } + for (BookkeeperInternalCallbacks.GenericCallback callback : callbackList) { + try { + callback.operationComplete(0, null); + } catch (Exception e) { + log.warn("replicationEnabledCallbacks handle error", e); } } } @@ -688,8 +709,7 @@ public void disableLedgerReplication() log.debug("disableLedegerReplication()"); } try { - String path = basePath + '/' + BookKeeperConstants.DISABLE_NODE; - store.put(path, "".getBytes(UTF_8), Optional.of(-1L)) + store.put(replicationDisablePath, "".getBytes(UTF_8), Optional.of(-1L)) .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); log.info("Auto ledger re-replication is disabled!"); } catch (ExecutionException | TimeoutException ee) { @@ -710,7 +730,7 @@ public void enableLedgerReplication() log.debug("enableLedegerReplication()"); } try { - store.delete(basePath + '/' + BookKeeperConstants.DISABLE_NODE, Optional.empty()) + store.delete(replicationDisablePath, Optional.empty()) .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); log.info("Resuming automatic ledger re-replication"); } catch (ExecutionException | TimeoutException ee) { @@ -731,7 +751,7 @@ public boolean isLedgerReplicationEnabled() log.debug("isLedgerReplicationEnabled()"); } try { - return !store.exists(basePath + '/' + BookKeeperConstants.DISABLE_NODE) + return !store.exists(replicationDisablePath) .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); } catch (ExecutionException | TimeoutException ee) { log.error("Error while checking the state of " @@ -751,13 +771,11 @@ public void notifyLedgerReplicationEnabled(final BookkeeperInternalCallbacks.Gen if (log.isDebugEnabled()) { log.debug("notifyLedgerReplicationEnabled()"); } - - synchronized (this) { - replicationEnabledListener = cb; + synchronized (replicationEnabledCallbacks) { + replicationEnabledCallbacks.add(cb); } - try { - if (!store.exists(basePath + '/' + BookKeeperConstants.DISABLE_NODE) + if (!store.exists(replicationDisablePath) .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS)) { log.info("LedgerReplication is enabled externally through metadata store, " + "since DISABLE_NODE node is deleted"); @@ -851,8 +869,8 @@ public int getLostBookieRecoveryDelay() throws ReplicationException.UnavailableE public void notifyLostBookieRecoveryDelayChanged(BookkeeperInternalCallbacks.GenericCallback cb) throws ReplicationException.UnavailableException { log.debug("notifyLostBookieRecoveryDelayChanged()"); - synchronized (this) { - lostBookieRecoveryDelayListener = cb; + synchronized (lostBookieRecoveryDelayCallbacks) { + lostBookieRecoveryDelayCallbacks.add(cb); } try { if (!store.exists(lostBookieRecoveryDelayPath).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS)) { diff --git a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java new file mode 100644 index 0000000000000..d394e4ae7d1d5 --- /dev/null +++ b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.replication; + +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertFalse; +import static org.testng.AssertJUnit.assertNotNull; +import static org.testng.AssertJUnit.assertNotSame; +import static org.testng.AssertJUnit.assertTrue; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import org.apache.bookkeeper.client.AsyncCallback.AddCallback; +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager; +import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase; +import org.apache.bookkeeper.net.BookieId; +import org.apache.bookkeeper.proto.BookieServer; +import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException; +import org.apache.bookkeeper.test.BookKeeperClusterTestCase; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * Tests publishing of under replicated ledgers by the Auditor bookie node when + * corresponding bookies identifes as not running. + */ +public class AuditorLedgerCheckerTest extends BookKeeperClusterTestCase { + + // Depending on the taste, select the amount of logging + // by decommenting one of the two lines below + // private static final Logger LOG = Logger.getRootLogger(); + private static final Logger LOG = LoggerFactory + .getLogger(AuditorLedgerCheckerTest.class); + + private static final byte[] ledgerPassword = "aaa".getBytes(); + private Random rng; // Random Number Generator + + private DigestType digestType; + + private String underreplicatedPath; + private Map auditorElectors = new ConcurrentHashMap<>(); + private ZkLedgerUnderreplicationManager urLedgerMgr; + private Set urLedgerList; + private String electionPath; + + private List ledgerList; + + public AuditorLedgerCheckerTest() + throws IOException, KeeperException, InterruptedException, + CompatibilityException { + this("org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory"); + } + + AuditorLedgerCheckerTest(String ledgerManagerFactoryClass) + throws IOException, KeeperException, InterruptedException, + CompatibilityException { + super(3); + LOG.info("Running test case using ledger manager : " + + ledgerManagerFactoryClass); + this.digestType = DigestType.CRC32; + // set ledger manager name + baseConf.setLedgerManagerFactoryClassName(ledgerManagerFactoryClass); + baseClientConf + .setLedgerManagerFactoryClassName(ledgerManagerFactoryClass); + } + + @BeforeMethod + public void setUp() throws Exception { + super.setUp(); + underreplicatedPath = ZKMetadataDriverBase.resolveZkLedgersRootPath(baseClientConf) + + "/underreplication/ledgers"; + electionPath = ZKMetadataDriverBase.resolveZkLedgersRootPath(baseConf) + + "/underreplication/auditorelection"; + + urLedgerMgr = new ZkLedgerUnderreplicationManager(baseClientConf, zkc); + urLedgerMgr.setCheckAllLedgersCTime(System.currentTimeMillis()); + startAuditorElectors(); + rng = new Random(System.currentTimeMillis()); // Initialize the Random + urLedgerList = new HashSet(); + ledgerList = new ArrayList(2); + baseClientConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri()); + baseConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri()); + } + + @Override + public void tearDown() throws Exception { + stopAuditorElectors(); + super.tearDown(); + } + + private void startAuditorElectors() throws Exception { + for (String addr : bookieAddresses().stream().map(Object::toString) + .collect(Collectors.toList())) { + AuditorElector auditorElector = new AuditorElector(addr, baseConf); + auditorElectors.put(addr, auditorElector); + auditorElector.start(); + if (LOG.isDebugEnabled()) { + LOG.debug("Starting Auditor Elector"); + } + } + } + + private void stopAuditorElectors() throws Exception { + for (AuditorElector auditorElector : auditorElectors.values()) { + auditorElector.shutdown(); + if (LOG.isDebugEnabled()) { + LOG.debug("Stopping Auditor Elector!"); + } + } + } + + @Test + public void testTriggerAuditorBySettingDelayToZeroWithPendingAuditTask() throws Exception { + // wait for a second so that the initial periodic check finishes + Thread.sleep(1000); + + Auditor auditorBookiesAuditor = getAuditorBookiesAuditor(); + LedgerHandle lh1 = createAndAddEntriesToLedger(); + Long ledgerId = lh1.getId(); + if (LOG.isDebugEnabled()) { + LOG.debug("Created ledger : " + ledgerId); + } + ledgerList.add(ledgerId); + lh1.close(); + + final CountDownLatch underReplicaLatch = registerUrLedgerWatcher(ledgerList + .size()); + + int lostBookieRecoveryDelay = 5; + // wait for 5 seconds before starting the recovery work when a bookie fails + urLedgerMgr.setLostBookieRecoveryDelay(lostBookieRecoveryDelay); + + // shutdown a non auditor bookie; choosing non-auditor to avoid another election + String shutdownBookie = shutDownNonAuditorBookie(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Waiting for ledgers to be marked as under replicated"); + } + assertFalse("audit of lost bookie isn't delayed", underReplicaLatch.await(2, TimeUnit.SECONDS)); + assertEquals("under replicated ledgers identified when it was not expected", 0, + urLedgerList.size()); + + Future auditTask = auditorBookiesAuditor.getAuditTask(); + assertNotSame("auditTask is not supposed to be null", null, auditTask); + assertEquals( + "lostBookieRecoveryDelayBeforeChange of Auditor should be equal to what we set", + lostBookieRecoveryDelay, auditorBookiesAuditor.getLostBookieRecoveryDelayBeforeChange()); + + // set lostBookieRecoveryDelay to 0, so that Auditor is triggered immediately + urLedgerMgr.setLostBookieRecoveryDelay(0); + assertTrue("audit of lost bookie shouldn't be delayed", underReplicaLatch.await(1, TimeUnit.SECONDS)); + assertEquals("all under replicated ledgers should be identified", ledgerList.size(), + urLedgerList.size()); + + Thread.sleep(100); + auditTask = auditorBookiesAuditor.getAuditTask(); + assertEquals("auditTask is supposed to be null", null, auditTask); + assertEquals( + "lostBookieRecoveryDelayBeforeChange of Auditor should be equal to previously set value", + 0, auditorBookiesAuditor.getLostBookieRecoveryDelayBeforeChange()); + } + + private CountDownLatch registerUrLedgerWatcher(int count) + throws KeeperException, InterruptedException { + final CountDownLatch underReplicaLatch = new CountDownLatch(count); + for (Long ledgerId : ledgerList) { + Watcher urLedgerWatcher = new ChildWatcher(underReplicaLatch); + String znode = ZkLedgerUnderreplicationManager.getUrLedgerZnode(underreplicatedPath, + ledgerId); + zkc.exists(znode, urLedgerWatcher); + } + return underReplicaLatch; + } + + private String shutdownBookie(int bkShutdownIndex) throws Exception { + BookieServer bkServer = serverByIndex(bkShutdownIndex); + String bookieAddr = bkServer.getBookieId().toString(); + if (LOG.isDebugEnabled()) { + LOG.debug("Shutting down bookie:" + bookieAddr); + } + killBookie(bkShutdownIndex); + auditorElectors.get(bookieAddr).shutdown(); + auditorElectors.remove(bookieAddr); + return bookieAddr; + } + + private LedgerHandle createAndAddEntriesToLedger() throws BKException, + InterruptedException { + int numEntriesToWrite = 100; + // Create a ledger + LedgerHandle lh = bkc.createLedger(digestType, ledgerPassword); + LOG.info("Ledger ID: " + lh.getId()); + addEntry(numEntriesToWrite, lh); + return lh; + } + + private void addEntry(int numEntriesToWrite, LedgerHandle lh) + throws InterruptedException, BKException { + final CountDownLatch completeLatch = new CountDownLatch(numEntriesToWrite); + final AtomicInteger rc = new AtomicInteger(BKException.Code.OK); + + for (int i = 0; i < numEntriesToWrite; i++) { + ByteBuffer entry = ByteBuffer.allocate(4); + entry.putInt(rng.nextInt(Integer.MAX_VALUE)); + entry.position(0); + lh.asyncAddEntry(entry.array(), new AddCallback() { + public void addComplete(int rc2, LedgerHandle lh, long entryId, Object ctx) { + rc.compareAndSet(BKException.Code.OK, rc2); + completeLatch.countDown(); + } + }, null); + } + completeLatch.await(); + if (rc.get() != BKException.Code.OK) { + throw BKException.create(rc.get()); + } + + } + + private class ChildWatcher implements Watcher { + private final CountDownLatch underReplicaLatch; + + public ChildWatcher(CountDownLatch underReplicaLatch) { + this.underReplicaLatch = underReplicaLatch; + } + + @Override + public void process(WatchedEvent event) { + LOG.info("Received notification for the ledger path : " + + event.getPath()); + for (Long ledgerId : ledgerList) { + if (event.getPath().contains(ledgerId + "")) { + urLedgerList.add(ledgerId); + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("Count down and waiting for next notification"); + } + // count down and waiting for next notification + underReplicaLatch.countDown(); + } + } + + private BookieServer getAuditorBookie() throws Exception { + List auditors = new LinkedList(); + byte[] data = zkc.getData(electionPath, false, null); + assertNotNull("Auditor election failed", data); + for (int i = 0; i < bookieCount(); i++) { + BookieId bookieId = addressByIndex(i); + if (new String(data).contains(bookieId + "")) { + auditors.add(serverByIndex(i)); + } + } + assertEquals("Multiple Bookies acting as Auditor!", 1, auditors + .size()); + return auditors.get(0); + } + + private Auditor getAuditorBookiesAuditor() throws Exception { + BookieServer auditorBookieServer = getAuditorBookie(); + String bookieAddr = auditorBookieServer.getBookieId().toString(); + return auditorElectors.get(bookieAddr).auditor; + } + + private String shutDownNonAuditorBookie() throws Exception { + // shutdown bookie which is not an auditor + int indexOf = indexOfServer(getAuditorBookie()); + int bkIndexDownBookie; + if (indexOf < lastBookieIndex()) { + bkIndexDownBookie = indexOf + 1; + } else { + bkIndexDownBookie = indexOf - 1; + } + return shutdownBookie(bkIndexDownBookie); + } +} diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java index 649dc1663c68f..0e9c781fb9143 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java @@ -40,6 +40,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import lombok.Cleanup; @@ -614,6 +615,8 @@ public void testDisableLedgerReplication(String provider, Supplier urlSu final String missingReplica = "localhost:3181"; // disabling replication + AtomicInteger callbackCount = new AtomicInteger(); + lum.notifyLedgerReplicationEnabled((rc, result) -> callbackCount.incrementAndGet()); lum.disableLedgerReplication(); log.info("Disabled Ledeger Replication"); @@ -631,6 +634,7 @@ public void testDisableLedgerReplication(String provider, Supplier urlSu } catch (TimeoutException te) { // expected behaviour, as the replication is disabled } + assertEquals(callbackCount.get(), 1, "Notify callback times mismatch"); } /** @@ -651,7 +655,8 @@ public void testEnableLedgerReplication(String provider, Supplier urlSup log.debug("Unexpected exception while marking urLedger", e); fail("Unexpected exception while marking urLedger" + e.getMessage()); } - + AtomicInteger callbackCount = new AtomicInteger(); + lum.notifyLedgerReplicationEnabled((rc, result) -> callbackCount.incrementAndGet()); // disabling replication lum.disableLedgerReplication(); log.debug("Disabled Ledeger Replication"); @@ -688,6 +693,7 @@ public void testEnableLedgerReplication(String provider, Supplier urlSup znodeLatch.await(5, TimeUnit.SECONDS); log.debug("Enabled Ledeger Replication"); assertEquals(znodeLatch.getCount(), 0, "Failed to disable ledger replication!"); + assertEquals(callbackCount.get(), 2, "Notify callback times mismatch"); } finally { thread1.interrupt(); } @@ -749,6 +755,17 @@ public void testReplicasCheckCTime(String provider, Supplier urlSupplier assertEquals(underReplicaMgr1.getReplicasCheckCTime(), curTime); } + @Test(timeOut = 60000, dataProvider = "impl") + public void testLostBookieRecoveryDelay(String provider, Supplier urlSupplier) throws Exception { + methodSetup(urlSupplier); + + AtomicInteger callbackCount = new AtomicInteger(); + lum.notifyLostBookieRecoveryDelayChanged((rc, result) -> callbackCount.incrementAndGet()); + // disabling replication + lum.setLostBookieRecoveryDelay(10); + Awaitility.await().until(() -> callbackCount.get() == 2); + } + private void verifyMarkLedgerUnderreplicated(Collection missingReplica) throws Exception { Long ledgerA = 0xfeadeefdacL; String znodeA = getUrLedgerZnode(ledgerA);