Skip to content

Commit

Permalink
Fix errors while revalidating locks in Releasing state
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli committed Apr 18, 2024
1 parent a766686 commit 70f52b1
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ private void internalRetryableDeleteNamespaceAsync0(boolean force, int retryTime
} else {
callback.completeExceptionally(
new RestException(Status.CONFLICT, "The broker still have in-flight topics"
+ " created during namespace deletion, please try again."));
+ " created during namespace deletion or some bundle is still owned, please try again."));
// drop out recursive
}
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,11 @@ public CompletableFuture<NamespaceEphemeralData> tryAcquiringOwnership(Namespace
*/
public CompletableFuture<Void> removeOwnership(NamespaceBundle bundle) {
ResourceLock<NamespaceEphemeralData> lock = locallyAcquiredLocks.remove(bundle);
log.info("Removing ownership of {} (current lock is {})", bundle, lock);
if (lock == null) {
// We don't own the specified bundle anymore
return CompletableFuture.completedFuture(null);
}

return lock.release();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,10 @@ public CompletableFuture<Void> addOwnedNamespaceBundleAsync(NamespaceBundle name
boolean readerCachedAndNotFailed = readerCompletableFuture != null
&& readerCompletableFuture.isDone()
&& !readerCompletableFuture.isCompletedExceptionally();
boolean completedExceptionally = readerCompletableFuture != null
&& readerCompletableFuture.isCompletedExceptionally();
log.info("[{}] addOwnedNamespaceBundleAsync, readerCachedAndNotFailed: {}, completedExceptionally: {}", namespace,
readerCachedAndNotFailed, completedExceptionally);
if (readerCachedAndNotFailed) {
ownedBundlesCountPerNamespace.get(namespace).incrementAndGet();
return CompletableFuture.completedFuture(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -898,7 +898,7 @@ public static CompletableFuture<ClusterDataImpl> checkLocalOrGetPeerReplicationC
if (policiesResult.isPresent()) {
Policies policies = policiesResult.get();
if (!allowDeletedNamespace && policies.deleted) {
String msg = String.format("Namespace %s is deleted", namespace.toString());
String msg = String.format("Namespace %s is marked as deleted", namespace.toString());
log.warn(msg);
validationFuture.completeExceptionally(new RestException(Status.NOT_FOUND, msg));
} else if (policies.replication_clusters.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public CompletableFuture<Optional<CacheGetResult<T>>> asyncReload(
if (store instanceof AbstractMetadataStore && ((AbstractMetadataStore) store).isConnected()) {
return readValueFromStore(key);
} else {
log.error("Cannot refresh cache item for key {} because we're not connected to the metadata store", key);
// Do not try to refresh the cache item if we know that we're not connected to the
// metadata store
return CompletableFuture.completedFuture(oldValue);
Expand Down Expand Up @@ -216,6 +217,7 @@ public CompletableFuture<Void> create(String path, T value) {
}

CompletableFuture<Void> future = new CompletableFuture<>();
log.info("Creating path {} with value {}", path, value);
store.put(path, content, Optional.of(-1L))
.thenAccept(stat -> {
// Make sure we have the value cached before the operation is completed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public synchronized CompletableFuture<Void> updateValue(T newValue) {
return sequencer.sequential(() -> {
synchronized (ResourceLockImpl.this) {
if (state != State.Valid) {
log.error("Cannot update value on lock at {} because it's not in valid state: {}", path, state,
new Exception("invalid lock state " + state + " at " + path).fillInStackTrace());
return CompletableFuture.failedFuture(
new IllegalStateException("Lock was not in valid state: " + state));
}
Expand Down Expand Up @@ -210,12 +212,17 @@ synchronized CompletableFuture<Void> revalidateIfNeededAfterReconnection() {
* This method is thread-safe and it will perform multiple re-validation operations in turn.
*/
synchronized CompletableFuture<Void> silentRevalidateOnce() {
if (state == State.Releasing) {
log.info("Lock on resource {} is being released. Skip revalidation", path);
return CompletableFuture.completedFuture(null);
}
return sequencer.sequential(() -> revalidate(value))
.thenRun(() -> log.info("Successfully revalidated the lock on {}", path))
.exceptionally(ex -> {
synchronized (ResourceLockImpl.this) {
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
if (realCause instanceof BadVersionException || realCause instanceof LockBusyException) {
if (realCause instanceof BadVersionException
|| realCause instanceof LockBusyException) {
log.warn("Failed to revalidate the lock at {}. Marked as expired. {}",
path, realCause.getMessage());
state = State.Released;
Expand All @@ -237,7 +244,7 @@ private synchronized CompletableFuture<Void> revalidate(T newValue) {
// Since the distributed lock has been expired, we don't need to revalidate it.
if (state != State.Valid && state != State.Init) {
return CompletableFuture.failedFuture(
new IllegalStateException("Lock was not in valid state: " + state));
new IllegalStateException("Lock for " + path + " was not in valid state: " + state));
}
if (log.isDebugEnabled()) {
log.debug("doRevalidate with newValue={}, version={}", newValue, version);
Expand Down

0 comments on commit 70f52b1

Please sign in to comment.