diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java index 00e381e07292f..f31e5a6b78a65 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java @@ -34,6 +34,7 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.metadata.api.MetadataCache; +import org.apache.pulsar.metadata.api.MetadataCacheConfig; import org.apache.pulsar.metadata.api.MetadataStore; import org.apache.pulsar.metadata.api.MetadataStoreException; @@ -58,13 +59,19 @@ public class BaseResources { public BaseResources(MetadataStore store, Class clazz, int operationTimeoutSec) { this.store = store; - this.cache = store.getMetadataCache(clazz); + this.cache = store.getMetadataCache(clazz, MetadataCacheConfig.builder() + .retryBackoff(MetadataCacheConfig.DEFAULT_RETRY_BACKOFF_BUILDER.setMandatoryStop(operationTimeoutSec, + TimeUnit.SECONDS)) + .build()); this.operationTimeoutSec = operationTimeoutSec; } public BaseResources(MetadataStore store, TypeReference typeRef, int operationTimeoutSec) { this.store = store; - this.cache = store.getMetadataCache(typeRef); + this.cache = store.getMetadataCache(typeRef, MetadataCacheConfig.builder() + .retryBackoff(MetadataCacheConfig.DEFAULT_RETRY_BACKOFF_BUILDER.setMandatoryStop(operationTimeoutSec, + TimeUnit.SECONDS)) + .build()); this.operationTimeoutSec = operationTimeoutSec; } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/Backoff.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/Backoff.java index 4eab85f3c41be..d7381792f60e8 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/Backoff.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/Backoff.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.common.util; -import com.google.common.annotations.VisibleForTesting; import java.time.Clock; import java.util.Random; import java.util.concurrent.TimeUnit; @@ -95,11 +94,6 @@ public void reset() { this.mandatoryStopMade = false; } - @VisibleForTesting - long getFirstBackoffTimeInMillis() { - return firstBackoffTimeInMillis; - } - public static boolean shouldBackoff(long initialTimestamp, TimeUnit unitInitial, int failedAttempts, long defaultInterval, long maxBackoffInterval) { long initialTimestampInNano = unitInitial.toNanos(initialTimestamp); diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/BackoffTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/BackoffTest.java index b3786236a70ef..ccac4d0e5a13b 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/BackoffTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/BackoffTest.java @@ -136,6 +136,7 @@ public void mandatoryStopTest() { // would have been 1600 w/o the mandatory stop assertTrue(withinTenPercentAndDecrementTimer(backoff, 400)); + assertTrue(backoff.isMandatoryStopMade()); Mockito.when(mockClock.millis()).thenReturn(1900L); assertTrue(withinTenPercentAndDecrementTimer(backoff, 3200)); Mockito.when(mockClock.millis()).thenReturn(3200L); diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCacheConfig.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCacheConfig.java index 2bc042aebb308..641d4bde5a23b 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCacheConfig.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCacheConfig.java @@ -24,6 +24,7 @@ import lombok.Builder; import lombok.Getter; import lombok.ToString; +import org.apache.pulsar.common.util.BackoffBuilder; /** * The configuration builder for a {@link MetadataCache} config. @@ -33,6 +34,10 @@ @ToString public class MetadataCacheConfig { private static final long DEFAULT_CACHE_REFRESH_TIME_MILLIS = TimeUnit.MINUTES.toMillis(5); + public static final BackoffBuilder DEFAULT_RETRY_BACKOFF_BUILDER = + new BackoffBuilder().setInitialTime(5, TimeUnit.MILLISECONDS) + .setMax(3, TimeUnit.SECONDS) + .setMandatoryStop(30, TimeUnit.SECONDS); /** * Specifies that active entries are eligible for automatic refresh once a fixed duration has @@ -57,4 +62,7 @@ public class MetadataCacheConfig { @Builder.Default private final BiConsumer>> asyncReloadConsumer = null; + @Builder.Default + private final BackoffBuilder retryBackoff = DEFAULT_RETRY_BACKOFF_BUILDER; + } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java index 4c7f34aa5c16e..66c8388c83bd2 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java @@ -30,13 +30,16 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.common.concurrent.FutureUtils; +import org.apache.pulsar.common.util.Backoff; import org.apache.pulsar.metadata.api.CacheGetResult; import org.apache.pulsar.metadata.api.GetResult; import org.apache.pulsar.metadata.api.MetadataCache; @@ -58,18 +61,23 @@ public class MetadataCacheImpl implements MetadataCache, Consumer serde; + private final ScheduledExecutorService executor; + private final MetadataCacheConfig cacheConfig; private final AsyncLoadingCache>> objCache; - public MetadataCacheImpl(MetadataStore store, TypeReference typeRef, MetadataCacheConfig cacheConfig) { - this(store, new JSONMetadataSerdeTypeRef<>(typeRef), cacheConfig); + public MetadataCacheImpl(MetadataStore store, TypeReference typeRef, MetadataCacheConfig cacheConfig, + ScheduledExecutorService executor) { + this(store, new JSONMetadataSerdeTypeRef<>(typeRef), cacheConfig, executor); } - public MetadataCacheImpl(MetadataStore store, JavaType type, MetadataCacheConfig cacheConfig) { - this(store, new JSONMetadataSerdeSimpleType<>(type), cacheConfig); + public MetadataCacheImpl(MetadataStore store, JavaType type, MetadataCacheConfig cacheConfig, + ScheduledExecutorService executor) { + this(store, new JSONMetadataSerdeSimpleType<>(type), cacheConfig, executor); } - public MetadataCacheImpl(MetadataStore store, MetadataSerde serde, MetadataCacheConfig cacheConfig) { + public MetadataCacheImpl(MetadataStore store, MetadataSerde serde, MetadataCacheConfig cacheConfig, + ScheduledExecutorService executor) { this.store = store; if (store instanceof MetadataStoreExtended) { this.storeExtended = (MetadataStoreExtended) store; @@ -77,6 +85,8 @@ public MetadataCacheImpl(MetadataStore store, MetadataSerde serde, MetadataCa this.storeExtended = null; } this.serde = serde; + this.cacheConfig = cacheConfig; + this.executor = executor; Caffeine cacheBuilder = Caffeine.newBuilder(); if (cacheConfig.getRefreshAfterWriteMillis() > 0) { @@ -321,22 +331,34 @@ public void accept(Notification t) { } } - private CompletableFuture executeWithRetry(Supplier> op, String key) { - CompletableFuture result = new CompletableFuture<>(); + private void execute(Supplier> op, String key, CompletableFuture result, Backoff backoff) { op.get().thenAccept(result::complete).exceptionally((ex) -> { if (ex.getCause() instanceof BadVersionException) { // if resource is updated by other than metadata-cache then metadata-cache will get bad-version // exception. so, try to invalidate the cache and try one more time. objCache.synchronous().invalidate(key); - op.get().thenAccept(result::complete).exceptionally((ex1) -> { - result.completeExceptionally(ex1.getCause()); + long elapsed = System.currentTimeMillis() - backoff.getFirstBackoffTimeInMillis(); + if (backoff.isMandatoryStopMade()) { + result.completeExceptionally(new TimeoutException( + String.format("Timeout to update key %s. Elapsed time: %d ms", key, elapsed))); return null; - }); + } + final var next = backoff.next(); + log.info("Update key {} conflicts. Retrying in {} ms. Mandatory stop: {}. Elapsed time: {} ms", key, + next, backoff.isMandatoryStopMade(), elapsed); + executor.schedule(() -> execute(op, key, result, backoff), next, + TimeUnit.MILLISECONDS); return null; } result.completeExceptionally(ex.getCause()); return null; }); + } + + private CompletableFuture executeWithRetry(Supplier> op, String key) { + final var backoff = cacheConfig.getRetryBackoff().create(); + CompletableFuture result = new CompletableFuture<>(); + execute(op, key, result, backoff); return result; } } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java index c458d0da2146a..f63aa1c036d88 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java @@ -236,21 +236,21 @@ protected boolean shouldIgnoreEvent(MetadataEvent event, GetResult existingValue @Override public MetadataCache getMetadataCache(Class clazz, MetadataCacheConfig cacheConfig) { MetadataCacheImpl metadataCache = new MetadataCacheImpl(this, - TypeFactory.defaultInstance().constructSimpleType(clazz, null), cacheConfig); + TypeFactory.defaultInstance().constructSimpleType(clazz, null), cacheConfig, this.executor); metadataCaches.add(metadataCache); return metadataCache; } @Override public MetadataCache getMetadataCache(TypeReference typeRef, MetadataCacheConfig cacheConfig) { - MetadataCacheImpl metadataCache = new MetadataCacheImpl(this, typeRef, cacheConfig); + MetadataCacheImpl metadataCache = new MetadataCacheImpl(this, typeRef, cacheConfig, this.executor); metadataCaches.add(metadataCache); return metadataCache; } @Override public MetadataCache getMetadataCache(MetadataSerde serde, MetadataCacheConfig cacheConfig) { - MetadataCacheImpl metadataCache = new MetadataCacheImpl<>(this, serde, cacheConfig); + MetadataCacheImpl metadataCache = new MetadataCacheImpl<>(this, serde, cacheConfig, this.executor); metadataCaches.add(metadataCache); return metadataCache; } diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java index 6992c69b7252e..ddd975e422ab8 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java @@ -18,6 +18,9 @@ */ package org.apache.pulsar.metadata; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotSame; @@ -26,7 +29,9 @@ import static org.testng.Assert.fail; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.core.type.TypeReference; +import com.github.benmanes.caffeine.cache.AsyncLoadingCache; import java.io.IOException; +import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.EnumSet; @@ -36,6 +41,8 @@ import java.util.TreeMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import lombok.AllArgsConstructor; @@ -44,6 +51,8 @@ import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.common.policies.data.Policies; +import org.apache.pulsar.common.util.BackoffBuilder; +import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.metadata.api.CacheGetResult; import org.apache.pulsar.metadata.api.MetadataCache; @@ -51,6 +60,7 @@ import org.apache.pulsar.metadata.api.MetadataSerde; import org.apache.pulsar.metadata.api.MetadataStore; import org.apache.pulsar.metadata.api.MetadataStoreConfig; +import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException; import org.apache.pulsar.metadata.api.MetadataStoreException.ContentDeserializationException; import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException; @@ -60,6 +70,7 @@ import org.apache.pulsar.metadata.api.extended.CreateOption; import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl; import org.awaitility.Awaitility; +import org.mockito.stubbing.Answer; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -488,32 +499,74 @@ public void readModifyUpdate(String provider, Supplier urlSupplier) thro public void readModifyUpdateBadVersionRetry() throws Exception { String url = zks.getConnectionString(); @Cleanup - MetadataStore sourceStore1 = MetadataStoreFactory.create(url, MetadataStoreConfig.builder().build()); - @Cleanup - MetadataStore sourceStore2 = MetadataStoreFactory.create(url, MetadataStoreConfig.builder().build()); + MetadataStore store = MetadataStoreFactory.create(url, MetadataStoreConfig.builder().build()); - MetadataCache objCache1 = sourceStore1.getMetadataCache(MyClass.class); - MetadataCache objCache2 = sourceStore2.getMetadataCache(MyClass.class); + MetadataCache cache = store.getMetadataCache(MyClass.class); String key1 = newKey(); MyClass value1 = new MyClass("a", 1); - objCache1.create(key1, value1).join(); - assertEquals(objCache1.get(key1).join().get().b, 1); + cache.create(key1, value1).join(); + assertEquals(cache.get(key1).join().get().b, 1); - CompletableFuture future1 = objCache1.readModifyUpdate(key1, v -> { - return new MyClass(v.a, v.b + 1); - }); - - CompletableFuture future2 = objCache2.readModifyUpdate(key1, v -> { - return new MyClass(v.a, v.b + 1); - }); + final var futures = new ArrayList>(); + final var sourceStores = new ArrayList(); - MyClass myClass1 = future1.join(); - assertEquals(myClass1.b, 2); + for (int i = 0; i < 20; i++) { + final var sourceStore = MetadataStoreFactory.create(url, MetadataStoreConfig.builder().build()); + sourceStores.add(sourceStore); + final var objCache = sourceStore.getMetadataCache(MyClass.class); + futures.add(objCache.readModifyUpdate(key1, v -> new MyClass(v.a, v.b + 1))); + } + FutureUtil.waitForAll(futures).join(); + for (var sourceStore : sourceStores) { + sourceStore.close(); + } + } - MyClass myClass2 = future2.join(); - assertEquals(myClass2.b, 3); + @Test + public void readModifyUpdateOrCreateRetryTimeout() throws Exception { + String url = zks.getConnectionString(); + @Cleanup + MetadataStore store = MetadataStoreFactory.create(url, MetadataStoreConfig.builder().build()); + + MetadataCache cache = store.getMetadataCache(MyClass.class, MetadataCacheConfig.builder() + .retryBackoff(new BackoffBuilder() + .setInitialTime(5, TimeUnit.MILLISECONDS) + .setMax(1, TimeUnit.SECONDS) + .setMandatoryStop(3, TimeUnit.SECONDS)).build()); + + Field metadataCacheField = cache.getClass().getDeclaredField("objCache"); + metadataCacheField.setAccessible(true); + var objCache = metadataCacheField.get(cache); + var spyObjCache = (AsyncLoadingCache) spy(objCache); + doAnswer((Answer>) invocation -> CompletableFuture.failedFuture( + new MetadataStoreException.BadVersionException(""))).when(spyObjCache).get(any()); + metadataCacheField.set(cache, spyObjCache); + + // Test three times to ensure that the retry works each time. + for (int i = 0; i < 3; i++) { + var start = System.currentTimeMillis(); + boolean timeouted = false; + try { + cache.readModifyUpdateOrCreate(newKey(), Optional::get).join(); + } catch (CompletionException e) { + if (e.getCause() instanceof TimeoutException) { + var elapsed = System.currentTimeMillis() - start; + // Since we reduce the wait time by a random amount for each retry, the total elapsed time should be + // mandatoryStopTime - maxTime * 0.9, which is 2900ms. + assertTrue(elapsed >= 2900L, + "The elapsed time should be greater than the timeout. But now it's " + elapsed); + // The elapsed time should be less than the timeout. The 1.5 factor allows for some extra time. + assertTrue(elapsed < 3000L * 1.5, + "The retry should have been stopped after the timeout. But now it's " + elapsed); + timeouted = true; + } else { + fail("Should have failed with TimeoutException, but failed with " + e.getCause()); + } + } + assertTrue(timeouted, "Should have failed with TimeoutException, but succeeded"); + } } @Test(dataProvider = "impl") @@ -647,4 +700,15 @@ public void testAsyncReloadConsumer(String provider, Supplier urlSupplie refreshed.contains(value2); }); } + + @Test + public void testDefaultMetadataCacheConfig() { + final var config = MetadataCacheConfig.builder().build(); + assertEquals(config.getRefreshAfterWriteMillis(), TimeUnit.MINUTES.toMillis(5)); + assertEquals(config.getExpireAfterWriteMillis(), TimeUnit.MINUTES.toMillis(10)); + final var backoff = config.getRetryBackoff().create(); + assertEquals(backoff.getInitial(), 5); + assertEquals(backoff.getMax(), 3000); + assertEquals(backoff.getMandatoryStop(), 30_000); + } }