From b2f2c3f491d64c756243ea2092f4224538cafe25 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Sat, 11 May 2024 17:38:52 +0800 Subject: [PATCH] [fix][broker] Fix cursor should use latest ledger config (#22644) Signed-off-by: Zixuan Liu (cherry picked from commit 774a5d42e8342ee50395cf3626b9e7af27da849e) Signed-off-by: Zixuan Liu --- .../mledger/impl/ManagedCursorImpl.java | 67 ++++++++++--------- .../mledger/impl/ManagedCursorMXBeanImpl.java | 3 +- .../mledger/impl/ManagedLedgerImpl.java | 8 +-- .../mledger/impl/NonDurableCursorImpl.java | 5 +- .../bookkeeper/mledger/impl/OpReadEntry.java | 2 +- .../mledger/impl/ReadOnlyCursorImpl.java | 5 +- .../impl/ReadOnlyManagedLedgerImpl.java | 2 +- ...edCursorIndividualDeletedMessagesTest.java | 3 +- .../mledger/impl/ManagedCursorTest.java | 4 +- .../mledger/impl/ManagedLedgerTest.java | 2 +- .../service/BrokerBkEnsemblesTests.java | 8 +-- .../persistent/PersistentTopicTest.java | 32 ++++++++- 12 files changed, 83 insertions(+), 58 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 8867d81cb1b6c1..9924e2f6e2dca4 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -107,7 +107,6 @@ public class ManagedCursorImpl implements ManagedCursor { protected final BookKeeper bookkeeper; - protected final ManagedLedgerConfig config; protected final ManagedLedgerImpl ledger; private final String name; private volatile Map cursorProperties; @@ -282,32 +281,31 @@ public interface VoidCallback { void operationFailed(ManagedLedgerException exception); } - ManagedCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig config, ManagedLedgerImpl ledger, String cursorName) { + ManagedCursorImpl(BookKeeper bookkeeper, ManagedLedgerImpl ledger, String cursorName) { this.bookkeeper = bookkeeper; this.cursorProperties = Collections.emptyMap(); - this.config = config; this.ledger = ledger; this.name = cursorName; - this.individualDeletedMessages = config.isUnackedRangesOpenCacheSetEnabled() + this.individualDeletedMessages = getConfig().isUnackedRangesOpenCacheSetEnabled() ? new ConcurrentOpenLongPairRangeSet<>(4096, positionRangeConverter) : new LongPairRangeSet.DefaultRangeSet<>(positionRangeConverter); - if (config.isDeletionAtBatchIndexLevelEnabled()) { + if (getConfig().isDeletionAtBatchIndexLevelEnabled()) { this.batchDeletedIndexes = new ConcurrentSkipListMap<>(); } else { this.batchDeletedIndexes = null; } - this.digestType = BookKeeper.DigestType.fromApiDigestType(config.getDigestType()); + this.digestType = BookKeeper.DigestType.fromApiDigestType(getConfig().getDigestType()); STATE_UPDATER.set(this, State.Uninitialized); PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER.set(this, 0); PENDING_READ_OPS_UPDATER.set(this, 0); RESET_CURSOR_IN_PROGRESS_UPDATER.set(this, FALSE); WAITING_READ_OP_UPDATER.set(this, null); - this.clock = config.getClock(); + this.clock = getConfig().getClock(); this.lastActive = this.clock.millis(); this.lastLedgerSwitchTimestamp = this.clock.millis(); - if (config.getThrottleMarkDelete() > 0.0) { - markDeleteLimiter = RateLimiter.create(config.getThrottleMarkDelete()); + if (getConfig().getThrottleMarkDelete() > 0.0) { + markDeleteLimiter = RateLimiter.create(getConfig().getThrottleMarkDelete()); } else { // Disable mark-delete rate limiter markDeleteLimiter = null; @@ -537,7 +535,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac if (positionInfo.getIndividualDeletedMessagesCount() > 0) { recoverIndividualDeletedMessages(positionInfo.getIndividualDeletedMessagesList()); } - if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null + if (getConfig().isDeletionAtBatchIndexLevelEnabled() && positionInfo.getBatchedEntryDeletionIndexInfoCount() > 0) { recoverBatchDeletedIndexes(positionInfo.getBatchedEntryDeletionIndexInfoList()); } @@ -546,7 +544,8 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac }, null); }; try { - bookkeeper.asyncOpenLedger(ledgerId, digestType, config.getPassword(), openCallback, null); + bookkeeper.asyncOpenLedger(ledgerId, digestType, getConfig().getPassword(), openCallback, + null); } catch (Throwable t) { log.error("[{}] Encountered error on opening cursor ledger {} for cursor {}", ledger.getName(), ledgerId, name, t); @@ -878,10 +877,10 @@ public void asyncReadEntriesOrWait(int maxEntries, long maxSizeBytes, ReadEntrie // Check again for new entries after the configured time, then if still no entries are available register // to be notified - if (config.getNewEntriesCheckDelayInMillis() > 0) { + if (getConfig().getNewEntriesCheckDelayInMillis() > 0) { ledger.getScheduledExecutor() .schedule(() -> checkForNewEntries(op, callback, ctx), - config.getNewEntriesCheckDelayInMillis(), TimeUnit.MILLISECONDS); + getConfig().getNewEntriesCheckDelayInMillis(), TimeUnit.MILLISECONDS); } else { // If there's no delay, check directly from the same thread checkForNewEntries(op, callback, ctx); @@ -1201,7 +1200,7 @@ public void operationComplete() { lastMarkDeleteEntry = new MarkDeleteEntry(newMarkDeletePosition, isCompactionCursor() ? getProperties() : Collections.emptyMap(), null, null); individualDeletedMessages.clear(); - if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) { + if (getConfig().isDeletionAtBatchIndexLevelEnabled()) { batchDeletedIndexes.values().forEach(BitSetRecyclable::recycle); batchDeletedIndexes.clear(); long[] resetWords = newReadPosition.ackSet; @@ -1466,7 +1465,7 @@ protected long getNumberOfEntries(Range range) { lock.readLock().lock(); try { - if (config.isUnackedRangesOpenCacheSetEnabled()) { + if (getConfig().isUnackedRangesOpenCacheSetEnabled()) { int cardinality = individualDeletedMessages.cardinality( range.lowerEndpoint().ledgerId, range.lowerEndpoint().entryId, range.upperEndpoint().ledgerId, range.upperEndpoint().entryId); @@ -1829,7 +1828,7 @@ public void asyncMarkDelete(final Position position, Map propertie PositionImpl newPosition = (PositionImpl) position; - if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) { + if (getConfig().isDeletionAtBatchIndexLevelEnabled()) { if (newPosition.ackSet != null) { batchDeletedIndexes.put(newPosition, BitSetRecyclable.create().resetWords(newPosition.ackSet)); newPosition = ledger.getPreviousPosition(newPosition); @@ -1992,7 +1991,7 @@ public void operationComplete() { try { individualDeletedMessages.removeAtMost(mdEntry.newPosition.getLedgerId(), mdEntry.newPosition.getEntryId()); - if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) { + if (getConfig().isDeletionAtBatchIndexLevelEnabled()) { Map subMap = batchDeletedIndexes.subMap(PositionImpl.EARLIEST, false, PositionImpl.get(mdEntry.newPosition.getLedgerId(), mdEntry.newPosition.getEntryId()), true); @@ -2120,8 +2119,8 @@ public void asyncDelete(Iterable positions, AsyncCallbacks.DeleteCallb } if (individualDeletedMessages.contains(position.getLedgerId(), position.getEntryId()) - || position.compareTo(markDeletePosition) <= 0) { - if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) { + || position.compareTo(markDeletePosition) <= 0) { + if (getConfig().isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) { BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position); if (bitSetRecyclable != null) { bitSetRecyclable.recycle(); @@ -2133,7 +2132,7 @@ public void asyncDelete(Iterable positions, AsyncCallbacks.DeleteCallb continue; } if (position.ackSet == null) { - if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) { + if (getConfig().isDeletionAtBatchIndexLevelEnabled()) { BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position); if (bitSetRecyclable != null) { bitSetRecyclable.recycle(); @@ -2150,7 +2149,7 @@ public void asyncDelete(Iterable positions, AsyncCallbacks.DeleteCallb log.debug("[{}] [{}] Individually deleted messages: {}", ledger.getName(), name, individualDeletedMessages); } - } else if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) { + } else if (getConfig().isDeletionAtBatchIndexLevelEnabled()) { BitSetRecyclable givenBitSet = BitSetRecyclable.create().resetWords(position.ackSet); BitSetRecyclable bitSet = batchDeletedIndexes.computeIfAbsent(position, (v) -> givenBitSet); if (givenBitSet != bitSet) { @@ -2480,8 +2479,8 @@ public void operationFailed(MetaStoreException e) { private boolean shouldPersistUnackRangesToLedger() { return cursorLedger != null && !isCursorLedgerReadOnly - && config.getMaxUnackedRangesToPersist() > 0 - && individualDeletedMessages.size() > config.getMaxUnackedRangesToPersistInZk(); + && getConfig().getMaxUnackedRangesToPersist() > 0 + && individualDeletedMessages.size() > getConfig().getMaxUnackedRangesToPersistInZk(); } private void persistPositionMetaStore(long cursorsLedgerId, PositionImpl position, Map properties, @@ -2504,7 +2503,7 @@ private void persistPositionMetaStore(long cursorsLedgerId, PositionImpl positio info.addAllCursorProperties(buildStringPropertiesMap(cursorProperties)); if (persistIndividualDeletedMessageRanges) { info.addAllIndividualDeletedMessages(buildIndividualDeletedMessageRanges()); - if (config.isDeletionAtBatchIndexLevelEnabled()) { + if (getConfig().isDeletionAtBatchIndexLevelEnabled()) { info.addAllBatchedEntryDeletionIndexInfo(buildBatchEntryDeletionIndexInfoList()); } } @@ -2681,7 +2680,7 @@ void internalFlushPendingMarkDeletes() { void createNewMetadataLedger(final VoidCallback callback) { ledger.mbean.startCursorLedgerCreateOp(); - ledger.asyncCreateLedger(bookkeeper, config, digestType, (rc, lh, ctx) -> { + ledger.asyncCreateLedger(bookkeeper, getConfig(), digestType, (rc, lh, ctx) -> { if (ledger.checkAndCompleteLedgerOpTask(rc, lh, ctx)) { return; @@ -2802,7 +2801,8 @@ private List buildIndividualDeletedMessageRanges() { MessageRange messageRange = messageRangeBuilder.build(); acksSerializedSize.addAndGet(messageRange.getSerializedSize()); rangeList.add(messageRange); - return rangeList.size() <= config.getMaxUnackedRangesToPersist(); + + return rangeList.size() <= getConfig().getMaxUnackedRangesToPersist(); }); this.individualDeletedMessagesSerializedSize = acksSerializedSize.get(); return rangeList; @@ -2814,8 +2814,7 @@ private List buildIndividualDeletedMessageRanges() { private List buildBatchEntryDeletionIndexInfoList() { lock.readLock().lock(); try { - if (!config.isDeletionAtBatchIndexLevelEnabled() || batchDeletedIndexes == null - || batchDeletedIndexes.isEmpty()) { + if (!getConfig().isDeletionAtBatchIndexLevelEnabled() || batchDeletedIndexes.isEmpty()) { return Collections.emptyList(); } MLDataFormats.NestedPositionInfo.Builder nestedPositionBuilder = MLDataFormats.NestedPositionInfo @@ -2824,7 +2823,7 @@ private List buildBatchEntryDeletio .BatchedEntryDeletionIndexInfo.newBuilder(); List result = Lists.newArrayList(); Iterator> iterator = batchDeletedIndexes.entrySet().iterator(); - while (iterator.hasNext() && result.size() < config.getMaxBatchDeletedIndexToPersist()) { + while (iterator.hasNext() && result.size() < getConfig().getMaxBatchDeletedIndexToPersist()) { Map.Entry entry = iterator.next(); nestedPositionBuilder.setLedgerId(entry.getKey().getLedgerId()); nestedPositionBuilder.setEntryId(entry.getKey().getEntryId()); @@ -2913,8 +2912,8 @@ public void operationFailed(MetaStoreException e) { boolean shouldCloseLedger(LedgerHandle lh) { long now = clock.millis(); if (ledger.getFactory().isMetadataServiceAvailable() - && (lh.getLastAddConfirmed() >= config.getMetadataMaxEntriesPerLedger() - || lastLedgerSwitchTimestamp < (now - config.getLedgerRolloverTimeout() * 1000)) + && (lh.getLastAddConfirmed() >= getConfig().getMetadataMaxEntriesPerLedger() + || lastLedgerSwitchTimestamp < (now - getConfig().getLedgerRolloverTimeout() * 1000)) && (STATE_UPDATER.get(this) != State.Closed && STATE_UPDATER.get(this) != State.Closing)) { // It's safe to modify the timestamp since this method will be only called from a callback, implying that // calls will be serialized on one single thread @@ -3271,7 +3270,7 @@ private ManagedCursorImpl cursorImpl() { @Override public long[] getDeletedBatchIndexesAsLongArray(PositionImpl position) { - if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) { + if (getConfig().isDeletionAtBatchIndexLevelEnabled()) { BitSetRecyclable bitSet = batchDeletedIndexes.get(position); return bitSet == null ? null : bitSet.toLongArray(); } else { @@ -3355,5 +3354,9 @@ public void setState(State state) { this.state = state; } + public ManagedLedgerConfig getConfig() { + return getManagedLedger().getConfig(); + } + private static final Logger log = LoggerFactory.getLogger(ManagedCursorImpl.class); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorMXBeanImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorMXBeanImpl.java index fd9dcc1e17f1c4..62a5df95d24c03 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorMXBeanImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorMXBeanImpl.java @@ -90,7 +90,8 @@ public long getPersistZookeeperErrors() { @Override public void addWriteCursorLedgerSize(final long size) { - writeCursorLedgerSize.add(size * ((ManagedCursorImpl) managedCursor).config.getWriteQuorumSize()); + writeCursorLedgerSize.add( + size * managedCursor.getManagedLedger().getConfig().getWriteQuorumSize()); writeCursorLedgerLogicalSize.add(size); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 0515fd21131419..cac1e7d9a3e19d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -554,7 +554,7 @@ public void operationComplete(List consumers, Stat s) { for (final String cursorName : consumers) { log.info("[{}] Loading cursor {}", name, cursorName); final ManagedCursorImpl cursor; - cursor = new ManagedCursorImpl(bookKeeper, config, ManagedLedgerImpl.this, cursorName); + cursor = new ManagedCursorImpl(bookKeeper, ManagedLedgerImpl.this, cursorName); cursor.recover(new VoidCallback() { @Override @@ -585,7 +585,7 @@ public void operationFailed(ManagedLedgerException exception) { log.debug("[{}] Recovering cursor {} lazily", name, cursorName); } final ManagedCursorImpl cursor; - cursor = new ManagedCursorImpl(bookKeeper, config, ManagedLedgerImpl.this, cursorName); + cursor = new ManagedCursorImpl(bookKeeper, ManagedLedgerImpl.this, cursorName); CompletableFuture cursorRecoveryFuture = new CompletableFuture<>(); uninitializedCursors.put(cursorName, cursorRecoveryFuture); @@ -967,7 +967,7 @@ public synchronized void asyncOpenCursor(final String cursorName, final InitialP if (log.isDebugEnabled()) { log.debug("[{}] Creating new cursor: {}", name, cursorName); } - final ManagedCursorImpl cursor = new ManagedCursorImpl(bookKeeper, config, this, cursorName); + final ManagedCursorImpl cursor = new ManagedCursorImpl(bookKeeper, this, cursorName); CompletableFuture cursorFuture = new CompletableFuture<>(); uninitializedCursors.put(cursorName, cursorFuture); PositionImpl position = InitialPosition.Earliest == initialPosition ? getFirstPosition() : getLastPosition(); @@ -1101,7 +1101,7 @@ public ManagedCursor newNonDurableCursor(Position startCursorPosition, String cu return cachedCursor; } - NonDurableCursorImpl cursor = new NonDurableCursorImpl(bookKeeper, config, this, cursorName, + NonDurableCursorImpl cursor = new NonDurableCursorImpl(bookKeeper, this, cursorName, (PositionImpl) startCursorPosition, initialPosition, isReadCompacted); cursor.setActive(); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java index a171c24e2dbb2c..e03000484f7469 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java @@ -25,7 +25,6 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback; -import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.common.api.proto.CommandSubscribe; import org.slf4j.Logger; @@ -35,10 +34,10 @@ public class NonDurableCursorImpl extends ManagedCursorImpl { private final boolean readCompacted; - NonDurableCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig config, ManagedLedgerImpl ledger, String cursorName, + NonDurableCursorImpl(BookKeeper bookkeeper, ManagedLedgerImpl ledger, String cursorName, PositionImpl startCursorPosition, CommandSubscribe.InitialPosition initialPosition, boolean isReadCompacted) { - super(bookkeeper, config, ledger, cursorName); + super(bookkeeper, ledger, cursorName); this.readCompacted = isReadCompacted; // Compare with "latest" position marker by using only the ledger id. Since the C++ client is using 48bits to diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java index b8018db511a194..8f4d6e1ec30946 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java @@ -97,7 +97,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { callback.readEntriesComplete(entries, ctx); recycle(); })); - } else if (cursor.config.isAutoSkipNonRecoverableData() && exception instanceof NonRecoverableLedgerException) { + } else if (cursor.getConfig().isAutoSkipNonRecoverableData() && exception instanceof NonRecoverableLedgerException) { log.warn("[{}][{}] read failed from ledger at position:{} : {}", cursor.ledger.getName(), cursor.getName(), readPosition, exception.getMessage()); // try to find and move to next valid ledger diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java index 8c20e0d560ee73..7aa25479b47d5e 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java @@ -22,7 +22,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.mledger.AsyncCallbacks; -import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ReadOnlyCursor; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound; import org.apache.bookkeeper.mledger.proto.MLDataFormats; @@ -30,9 +29,9 @@ @Slf4j public class ReadOnlyCursorImpl extends ManagedCursorImpl implements ReadOnlyCursor { - public ReadOnlyCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig config, ManagedLedgerImpl ledger, + public ReadOnlyCursorImpl(BookKeeper bookkeeper, ManagedLedgerImpl ledger, PositionImpl startPosition, String cursorName) { - super(bookkeeper, config, ledger, cursorName); + super(bookkeeper, ledger, cursorName); if (startPosition.equals(PositionImpl.EARLIEST)) { readPosition = ledger.getFirstPosition().getNext(); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java index f1ef7b04857fc8..74f5586367eb60 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java @@ -135,7 +135,7 @@ private ReadOnlyCursor createReadOnlyCursor(PositionImpl startPosition) { } } - return new ReadOnlyCursorImpl(bookKeeper, config, this, startPosition, "read-only-cursor"); + return new ReadOnlyCursorImpl(bookKeeper, this, startPosition, "read-only-cursor"); } @Override diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorIndividualDeletedMessagesTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorIndividualDeletedMessagesTest.java index a5b921cad76f1a..08bcc0973ec8c1 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorIndividualDeletedMessagesTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorIndividualDeletedMessagesTest.java @@ -56,8 +56,9 @@ void testRecoverIndividualDeletedMessages() throws Exception { ManagedLedgerImpl ledger = mock(ManagedLedgerImpl.class); doReturn(ledgersInfo).when(ledger).getLedgersInfo(); + doReturn(config).when(ledger).getConfig(); - ManagedCursorImpl cursor = spy(new ManagedCursorImpl(bookkeeper, config, ledger, "test-cursor")); + ManagedCursorImpl cursor = spy(new ManagedCursorImpl(bookkeeper, ledger, "test-cursor")); LongPairRangeSet deletedMessages = cursor.getIndividuallyDeletedMessagesSet(); Method recoverMethod = ManagedCursorImpl.class.getDeclaredMethod("recoverIndividualDeletedMessages", diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index e42f46a364ba3d..bc91071febd08c 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -3122,10 +3122,10 @@ public Object answer(InvocationOnMock invocation) { when(ml.getNextValidLedger(markDeleteLedgerId)).thenReturn(3L); when(ml.getNextValidPosition(lastPosition)).thenReturn(nextPosition); when(ml.ledgerExists(markDeleteLedgerId)).thenReturn(false); + when(ml.getConfig()).thenReturn(new ManagedLedgerConfig()); BookKeeper mockBookKeeper = mock(BookKeeper.class); - final ManagedCursorImpl cursor = new ManagedCursorImpl(mockBookKeeper, new ManagedLedgerConfig(), ml, - cursorName); + final ManagedCursorImpl cursor = new ManagedCursorImpl(mockBookKeeper, ml, cursorName); cursor.recover(new VoidCallback() { @Override diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 2f41ff8e3982e3..41bc804c04ab1f 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -3137,7 +3137,7 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) { // (2) test read-timeout for: ManagedLedger.asyncReadEntry(..) AtomicReference responseException2 = new AtomicReference<>(); PositionImpl readPositionRef = PositionImpl.EARLIEST; - ManagedCursorImpl cursor = new ManagedCursorImpl(bk, config, ledger, "cursor1"); + ManagedCursorImpl cursor = new ManagedCursorImpl(bk, ledger, "cursor1"); OpReadEntry opReadEntry = OpReadEntry.create(cursor, readPositionRef, 1, new ReadEntriesCallback() { @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java index 4eb5803fd615bc..071bffc43d4931 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java @@ -209,10 +209,8 @@ public void testSkipCorruptDataLedger() throws Exception { PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topic1).get(); ManagedLedgerImpl ml = (ManagedLedgerImpl) topic.getManagedLedger(); ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().iterator().next(); - Field configField = ManagedCursorImpl.class.getDeclaredField("config"); - configField.setAccessible(true); // Create multiple data-ledger - ManagedLedgerConfig config = (ManagedLedgerConfig) configField.get(cursor); + ManagedLedgerConfig config = ml.getConfig(); config.setMaxEntriesPerLedger(entriesPerLedger); config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS); // bookkeeper client @@ -322,10 +320,8 @@ public void testTruncateCorruptDataLedger() throws Exception { PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topic1).get(); ManagedLedgerImpl ml = (ManagedLedgerImpl) topic.getManagedLedger(); ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().iterator().next(); - Field configField = ManagedCursorImpl.class.getDeclaredField("config"); - configField.setAccessible(true); // Create multiple data-ledger - ManagedLedgerConfig config = (ManagedLedgerConfig) configField.get(cursor); + ManagedLedgerConfig config = ml.getConfig(); config.setMaxEntriesPerLedger(entriesPerLedger); config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS); // bookkeeper client diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index 085a986e351255..6577caa4c8981a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -27,11 +27,11 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.testng.Assert.assertTrue; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; import com.google.common.collect.Sets; import java.lang.reflect.Field; import java.time.Duration; @@ -42,16 +42,18 @@ import java.util.List; import java.util.Set; import java.util.UUID; -import java.util.concurrent.TimeUnit; -import lombok.Data; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; +import lombok.Cleanup; +import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerTestBase; import org.apache.pulsar.client.api.Consumer; @@ -473,4 +475,28 @@ public void testCreateTopicWithZombieReplicatorCursor(boolean topicLevelPolicy) return !topic.getManagedLedger().getCursors().iterator().hasNext(); }); } + + @Test + public void testCursorGetConfigAfterTopicPoliciesChanged() throws Exception { + final String topicName = "persistent://prop/ns-abc/" + UUID.randomUUID(); + final String subName = "test_sub"; + + @Cleanup + Consumer subscribe = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe(); + PersistentTopic persistentTopic = + (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).join().get(); + PersistentSubscription subscription = persistentTopic.getSubscription(subName); + + Integer maxConsumers = 100; + admin.topicPolicies().setMaxConsumers(topicName, 100); + Awaitility.await().untilAsserted(() -> { + assertEquals(admin.topicPolicies().getMaxConsumers(topicName, false), maxConsumers); + }); + + ManagedCursorImpl cursor = (ManagedCursorImpl) subscription.getCursor(); + assertEquals(cursor.getConfig(), persistentTopic.getManagedLedger().getConfig()); + + subscribe.close(); + admin.topics().delete(topicName); + } }