Skip to content

Commit

Permalink
[fix] [auto-recovery] Fix PulsarLedgerUnderreplicationManager notify …
Browse files Browse the repository at this point in the history
…problem. (apache#21161)
  • Loading branch information
horizonzy authored Sep 18, 2023
1 parent 1363777 commit 8ff51eb
Show file tree
Hide file tree
Showing 4 changed files with 371 additions and 26 deletions.
2 changes: 0 additions & 2 deletions pulsar-metadata/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
</exclusions>
</dependency>

<!-- zookeeper server -->
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
Expand Down Expand Up @@ -85,7 +84,6 @@
</exclusions>
</dependency>

<!-- zookeeper server -->
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,17 @@ long getLedgerNodeVersion() {
private final String urLockPath;
private final String layoutPath;
private final String lostBookieRecoveryDelayPath;
private final String replicationDisablePath;
private final String checkAllLedgersCtimePath;
private final String placementPolicyCheckCtimePath;
private final String replicasCheckCtimePath;

private final MetadataStoreExtended store;

private BookkeeperInternalCallbacks.GenericCallback<Void> replicationEnabledListener;
private BookkeeperInternalCallbacks.GenericCallback<Void> lostBookieRecoveryDelayListener;
private final List<BookkeeperInternalCallbacks.GenericCallback<Void>> replicationEnabledCallbacks =
new ArrayList<>();
private final List<BookkeeperInternalCallbacks.GenericCallback<Void>> lostBookieRecoveryDelayCallbacks =
new ArrayList<>();

private static class PulsarUnderreplicatedLedger extends UnderreplicatedLedger {
PulsarUnderreplicatedLedger(long ledgerId) {
Expand All @@ -139,6 +142,7 @@ public PulsarLedgerUnderreplicationManager(AbstractConfiguration<?> conf, Metada
urLedgerPath = basePath + BookKeeperConstants.DEFAULT_ZK_LEDGERS_ROOT_PATH;
urLockPath = basePath + '/' + BookKeeperConstants.UNDER_REPLICATION_LOCK;
lostBookieRecoveryDelayPath = basePath + '/' + BookKeeperConstants.LOSTBOOKIERECOVERYDELAY_NODE;
replicationDisablePath = basePath + '/' + BookKeeperConstants.DISABLE_NODE;
checkAllLedgersCtimePath = basePath + '/' + BookKeeperConstants.CHECK_ALL_LEDGERS_CTIME;
placementPolicyCheckCtimePath = basePath + '/' + BookKeeperConstants.PLACEMENT_POLICY_CHECK_CTIME;
replicasCheckCtimePath = basePath + '/' + BookKeeperConstants.REPLICAS_CHECK_CTIME;
Expand Down Expand Up @@ -232,17 +236,34 @@ private void handleNotification(Notification n) {
synchronized (this) {
// Notify that there were some changes on the under-replicated z-nodes
notifyAll();

if (n.getType() == NotificationType.Deleted) {
if (n.getPath().equals(basePath + '/' + BookKeeperConstants.DISABLE_NODE)) {
log.info("LedgerReplication is enabled externally through MetadataStore, "
+ "since DISABLE_NODE ZNode is deleted");
if (replicationEnabledListener != null) {
replicationEnabledListener.operationComplete(0, null);
if (lostBookieRecoveryDelayPath.equals(n.getPath())) {
final List<BookkeeperInternalCallbacks.GenericCallback<Void>> callbackList;
synchronized (lostBookieRecoveryDelayCallbacks) {
callbackList = new ArrayList<>(lostBookieRecoveryDelayCallbacks);
lostBookieRecoveryDelayCallbacks.clear();
}
for (BookkeeperInternalCallbacks.GenericCallback<Void> callback : callbackList) {
try {
callback.operationComplete(0, null);
} catch (Exception e) {
log.warn("lostBookieRecoveryDelayCallbacks handle error", e);
}
} else if (n.getPath().equals(lostBookieRecoveryDelayPath)) {
if (lostBookieRecoveryDelayListener != null) {
lostBookieRecoveryDelayListener.operationComplete(0, null);
}
return;
}
if (replicationDisablePath.equals(n.getPath()) && n.getType() == NotificationType.Deleted) {
log.info("LedgerReplication is enabled externally through MetadataStore, "
+ "since DISABLE_NODE ZNode is deleted");
final List<BookkeeperInternalCallbacks.GenericCallback<Void>> callbackList;
synchronized (replicationEnabledCallbacks) {
callbackList = new ArrayList<>(replicationEnabledCallbacks);
replicationEnabledCallbacks.clear();
}
for (BookkeeperInternalCallbacks.GenericCallback<Void> callback : callbackList) {
try {
callback.operationComplete(0, null);
} catch (Exception e) {
log.warn("replicationEnabledCallbacks handle error", e);
}
}
}
Expand Down Expand Up @@ -688,8 +709,7 @@ public void disableLedgerReplication()
log.debug("disableLedegerReplication()");
}
try {
String path = basePath + '/' + BookKeeperConstants.DISABLE_NODE;
store.put(path, "".getBytes(UTF_8), Optional.of(-1L))
store.put(replicationDisablePath, "".getBytes(UTF_8), Optional.of(-1L))
.get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
log.info("Auto ledger re-replication is disabled!");
} catch (ExecutionException | TimeoutException ee) {
Expand All @@ -710,7 +730,7 @@ public void enableLedgerReplication()
log.debug("enableLedegerReplication()");
}
try {
store.delete(basePath + '/' + BookKeeperConstants.DISABLE_NODE, Optional.empty())
store.delete(replicationDisablePath, Optional.empty())
.get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
log.info("Resuming automatic ledger re-replication");
} catch (ExecutionException | TimeoutException ee) {
Expand All @@ -731,7 +751,7 @@ public boolean isLedgerReplicationEnabled()
log.debug("isLedgerReplicationEnabled()");
}
try {
return !store.exists(basePath + '/' + BookKeeperConstants.DISABLE_NODE)
return !store.exists(replicationDisablePath)
.get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
} catch (ExecutionException | TimeoutException ee) {
log.error("Error while checking the state of "
Expand All @@ -751,13 +771,11 @@ public void notifyLedgerReplicationEnabled(final BookkeeperInternalCallbacks.Gen
if (log.isDebugEnabled()) {
log.debug("notifyLedgerReplicationEnabled()");
}

synchronized (this) {
replicationEnabledListener = cb;
synchronized (replicationEnabledCallbacks) {
replicationEnabledCallbacks.add(cb);
}

try {
if (!store.exists(basePath + '/' + BookKeeperConstants.DISABLE_NODE)
if (!store.exists(replicationDisablePath)
.get(BLOCKING_CALL_TIMEOUT, MILLISECONDS)) {
log.info("LedgerReplication is enabled externally through metadata store, "
+ "since DISABLE_NODE node is deleted");
Expand Down Expand Up @@ -851,8 +869,8 @@ public int getLostBookieRecoveryDelay() throws ReplicationException.UnavailableE
public void notifyLostBookieRecoveryDelayChanged(BookkeeperInternalCallbacks.GenericCallback<Void> cb) throws
ReplicationException.UnavailableException {
log.debug("notifyLostBookieRecoveryDelayChanged()");
synchronized (this) {
lostBookieRecoveryDelayListener = cb;
synchronized (lostBookieRecoveryDelayCallbacks) {
lostBookieRecoveryDelayCallbacks.add(cb);
}
try {
if (!store.exists(lostBookieRecoveryDelayPath).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS)) {
Expand Down
Loading

0 comments on commit 8ff51eb

Please sign in to comment.