From 70f52b168527482834776ccd3e0436d1ebea4838 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Thu, 18 Apr 2024 15:07:40 +0200 Subject: [PATCH] Fix errors while revalidating locks in Releasing state --- .../pulsar/broker/admin/impl/NamespacesBase.java | 2 +- .../pulsar/broker/namespace/OwnershipCache.java | 2 +- .../service/SystemTopicBasedTopicPoliciesService.java | 4 ++++ .../apache/pulsar/broker/web/PulsarWebResource.java | 2 +- .../pulsar/metadata/cache/impl/MetadataCacheImpl.java | 2 ++ .../metadata/coordination/impl/ResourceLockImpl.java | 11 +++++++++-- 6 files changed, 18 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 9d4325fc24a33..b55c9490242e8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -345,7 +345,7 @@ private void internalRetryableDeleteNamespaceAsync0(boolean force, int retryTime } else { callback.completeExceptionally( new RestException(Status.CONFLICT, "The broker still have in-flight topics" - + " created during namespace deletion, please try again.")); + + " created during namespace deletion or some bundle is still owned, please try again.")); // drop out recursive } return; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java index 0033abf36c78c..8ebc0db876fdf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java @@ -208,11 +208,11 @@ public CompletableFuture tryAcquiringOwnership(Namespace */ public CompletableFuture removeOwnership(NamespaceBundle bundle) { ResourceLock lock = locallyAcquiredLocks.remove(bundle); + log.info("Removing ownership of {} (current lock is {})", bundle, lock); if (lock == null) { // We don't own the specified bundle anymore return CompletableFuture.completedFuture(null); } - return lock.release(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index 4abff08a2ca6d..6fb4a8233931b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -320,6 +320,10 @@ public CompletableFuture addOwnedNamespaceBundleAsync(NamespaceBundle name boolean readerCachedAndNotFailed = readerCompletableFuture != null && readerCompletableFuture.isDone() && !readerCompletableFuture.isCompletedExceptionally(); + boolean completedExceptionally = readerCompletableFuture != null + && readerCompletableFuture.isCompletedExceptionally(); + log.info("[{}] addOwnedNamespaceBundleAsync, readerCachedAndNotFailed: {}, completedExceptionally: {}", namespace, + readerCachedAndNotFailed, completedExceptionally); if (readerCachedAndNotFailed) { ownedBundlesCountPerNamespace.get(namespace).incrementAndGet(); return CompletableFuture.completedFuture(null); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index 909bc1f9ca455..1aa9346fc1070 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -898,7 +898,7 @@ public static CompletableFuture checkLocalOrGetPeerReplicationC if (policiesResult.isPresent()) { Policies policies = policiesResult.get(); if (!allowDeletedNamespace && policies.deleted) { - String msg = String.format("Namespace %s is deleted", namespace.toString()); + String msg = String.format("Namespace %s is marked as deleted", namespace.toString()); log.warn(msg); validationFuture.completeExceptionally(new RestException(Status.NOT_FOUND, msg)); } else if (policies.replication_clusters.isEmpty()) { diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java index b9051a7dc7df4..1f11e67be51b6 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java @@ -91,6 +91,7 @@ public CompletableFuture>> asyncReload( if (store instanceof AbstractMetadataStore && ((AbstractMetadataStore) store).isConnected()) { return readValueFromStore(key); } else { + log.error("Cannot refresh cache item for key {} because we're not connected to the metadata store", key); // Do not try to refresh the cache item if we know that we're not connected to the // metadata store return CompletableFuture.completedFuture(oldValue); @@ -216,6 +217,7 @@ public CompletableFuture create(String path, T value) { } CompletableFuture future = new CompletableFuture<>(); + log.info("Creating path {} with value {}", path, value); store.put(path, content, Optional.of(-1L)) .thenAccept(stat -> { // Make sure we have the value cached before the operation is completed diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java index 93c994b2436b9..59bd3f43d88ff 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java @@ -77,6 +77,8 @@ public synchronized CompletableFuture updateValue(T newValue) { return sequencer.sequential(() -> { synchronized (ResourceLockImpl.this) { if (state != State.Valid) { + log.error("Cannot update value on lock at {} because it's not in valid state: {}", path, state, + new Exception("invalid lock state " + state + " at " + path).fillInStackTrace()); return CompletableFuture.failedFuture( new IllegalStateException("Lock was not in valid state: " + state)); } @@ -210,12 +212,17 @@ synchronized CompletableFuture revalidateIfNeededAfterReconnection() { * This method is thread-safe and it will perform multiple re-validation operations in turn. */ synchronized CompletableFuture silentRevalidateOnce() { + if (state == State.Releasing) { + log.info("Lock on resource {} is being released. Skip revalidation", path); + return CompletableFuture.completedFuture(null); + } return sequencer.sequential(() -> revalidate(value)) .thenRun(() -> log.info("Successfully revalidated the lock on {}", path)) .exceptionally(ex -> { synchronized (ResourceLockImpl.this) { Throwable realCause = FutureUtil.unwrapCompletionException(ex); - if (realCause instanceof BadVersionException || realCause instanceof LockBusyException) { + if (realCause instanceof BadVersionException + || realCause instanceof LockBusyException) { log.warn("Failed to revalidate the lock at {}. Marked as expired. {}", path, realCause.getMessage()); state = State.Released; @@ -237,7 +244,7 @@ private synchronized CompletableFuture revalidate(T newValue) { // Since the distributed lock has been expired, we don't need to revalidate it. if (state != State.Valid && state != State.Init) { return CompletableFuture.failedFuture( - new IllegalStateException("Lock was not in valid state: " + state)); + new IllegalStateException("Lock for " + path + " was not in valid state: " + state)); } if (log.isDebugEnabled()) { log.debug("doRevalidate with newValue={}, version={}", newValue, version);