Skip to content

Commit

Permalink
[cleanup] Cleanup some duplicated code (apache#23204)
Browse files Browse the repository at this point in the history
  • Loading branch information
zymap authored Aug 20, 2024
1 parent 94e1341 commit a605ea3
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -687,37 +687,7 @@ public Position addEntry(byte[] data, int numberOfMessages) throws InterruptedEx

@Override
public Position addEntry(byte[] data, int offset, int length) throws InterruptedException, ManagedLedgerException {
final CountDownLatch counter = new CountDownLatch(1);
// Result list will contain the status exception and the resulting
// position
class Result {
ManagedLedgerException status = null;
Position position = null;
}
final Result result = new Result();

asyncAddEntry(data, offset, length, new AddEntryCallback() {
@Override
public void addComplete(Position position, ByteBuf entryData, Object ctx) {
result.position = position;
counter.countDown();
}

@Override
public void addFailed(ManagedLedgerException exception, Object ctx) {
result.status = exception;
counter.countDown();
}
}, null);

counter.await();

if (result.status != null) {
log.error("[{}] Error adding entry", name, result.status);
throw result.status;
}

return result.position;
return addEntry(data, 1, offset, length);
}

@Override
Expand Down Expand Up @@ -777,19 +747,7 @@ public void asyncAddEntry(final byte[] data, int numberOfMessages, int offset, i

@Override
public void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback, Object ctx) {
if (log.isDebugEnabled()) {
log.debug("[{}] asyncAddEntry size={} state={}", name, buffer.readableBytes(), state);
}

// retain buffer in this thread
buffer.retain();

// Jump to specific thread to avoid contention from writers writing from different threads
executor.execute(() -> {
OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, callback, ctx,
currentLedgerTimeoutTriggered);
internalAsyncAddEntry(addOperation);
});
asyncAddEntry(buffer, 1, callback, ctx);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -683,12 +683,8 @@ public void updateSubscribeRateLimiter() {
}

private void asyncAddEntry(ByteBuf headersAndPayload, PublishContext publishContext) {
if (brokerService.isBrokerEntryMetadataEnabled()) {
ledger.asyncAddEntry(headersAndPayload,
(int) publishContext.getNumberOfMessages(), this, publishContext);
} else {
ledger.asyncAddEntry(headersAndPayload, this, publishContext);
}
ledger.asyncAddEntry(headersAndPayload,
(int) publishContext.getNumberOfMessages(), this, publishContext);
}

public void asyncReadEntry(Position position, AsyncCallbacks.ReadEntryCallback callback, Object ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.atLeast;
Expand Down Expand Up @@ -294,11 +295,11 @@ public void testPublishMessage() throws Exception {

doAnswer(invocationOnMock -> {
final ByteBuf payload = (ByteBuf) invocationOnMock.getArguments()[0];
final AddEntryCallback callback = (AddEntryCallback) invocationOnMock.getArguments()[1];
final Topic.PublishContext ctx = (Topic.PublishContext) invocationOnMock.getArguments()[2];
final AddEntryCallback callback = (AddEntryCallback) invocationOnMock.getArguments()[2];
final Topic.PublishContext ctx = (Topic.PublishContext) invocationOnMock.getArguments()[3];
callback.addComplete(PositionFactory.LATEST, payload, ctx);
return null;
}).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), any(AddEntryCallback.class), any());
}).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), anyInt(), any(AddEntryCallback.class), any());

PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
long lastMaxReadPositionMovedForwardTimestamp = topic.getLastMaxReadPositionMovedForwardTimestamp();
Expand Down Expand Up @@ -377,10 +378,10 @@ public void testPublishMessageMLFailure() throws Exception {

// override asyncAddEntry callback to return error
doAnswer((Answer<Object>) invocationOnMock -> {
((AddEntryCallback) invocationOnMock.getArguments()[1]).addFailed(
new ManagedLedgerException("Managed ledger failure"), invocationOnMock.getArguments()[2]);
((AddEntryCallback) invocationOnMock.getArguments()[2]).addFailed(
new ManagedLedgerException("Managed ledger failure"), invocationOnMock.getArguments()[3]);
return null;
}).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), any(AddEntryCallback.class), any());
}).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), anyInt(), any(AddEntryCallback.class), any());

topic.publishMessage(payload, (exception, ledgerId, entryId) -> {
if (exception == null) {
Expand Down Expand Up @@ -1421,11 +1422,11 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {

// call addComplete on ledger asyncAddEntry
doAnswer(invocationOnMock -> {
((AddEntryCallback) invocationOnMock.getArguments()[1]).addComplete(PositionFactory.create(1, 1),
((AddEntryCallback) invocationOnMock.getArguments()[2]).addComplete(PositionFactory.create(1, 1),
null,
invocationOnMock.getArguments()[2]);
invocationOnMock.getArguments()[3]);
return null;
}).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), any(AddEntryCallback.class), any());
}).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), anyInt(), any(AddEntryCallback.class), any());

// call openCursorComplete on cursor asyncOpen
doAnswer(invocationOnMock -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2943,12 +2943,12 @@ private void setupMLAsyncCallbackMocks() {

// call addComplete on ledger asyncAddEntry
doAnswer((Answer<Object>) invocationOnMock -> {
((AddEntryCallback) invocationOnMock.getArguments()[1]).addComplete(
((AddEntryCallback) invocationOnMock.getArguments()[2]).addComplete(
PositionFactory.create(-1, -1),
null,
invocationOnMock.getArguments()[2]);
invocationOnMock.getArguments()[3]);
return null;
}).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), any(AddEntryCallback.class), any());
}).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), anyInt(), any(AddEntryCallback.class), any());

doAnswer((Answer<Object>) invocationOnMock -> true).when(cursorMock).isDurable();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
Expand Down Expand Up @@ -284,7 +285,7 @@ public void testIsDuplicateWithFailure() {

persistentTopic.publishMessage(byteBuf1, publishContext1);
persistentTopic.addComplete(PositionFactory.create(0, 1), null, publishContext1);
verify(managedLedger, times(1)).asyncAddEntry(any(ByteBuf.class), any(), any());
verify(managedLedger, times(1)).asyncAddEntry(any(ByteBuf.class), anyInt(), any(), any());
Long lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1);
assertNotNull(lastSequenceIdPushed);
assertEquals(lastSequenceIdPushed.longValue(), 0);
Expand All @@ -294,7 +295,7 @@ public void testIsDuplicateWithFailure() {

persistentTopic.publishMessage(byteBuf2, publishContext2);
persistentTopic.addComplete(PositionFactory.create(0, 2), null, publishContext2);
verify(managedLedger, times(2)).asyncAddEntry(any(ByteBuf.class), any(), any());
verify(managedLedger, times(2)).asyncAddEntry(any(ByteBuf.class), anyInt(), any(), any());
lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName2);
assertNotNull(lastSequenceIdPushed);
assertEquals(lastSequenceIdPushed.longValue(), 1);
Expand All @@ -306,7 +307,7 @@ public void testIsDuplicateWithFailure() {
publishContext1 = getPublishContext(producerName1, 1);
persistentTopic.publishMessage(byteBuf1, publishContext1);
persistentTopic.addComplete(PositionFactory.create(0, 3), null, publishContext1);
verify(managedLedger, times(3)).asyncAddEntry(any(ByteBuf.class), any(), any());
verify(managedLedger, times(3)).asyncAddEntry(any(ByteBuf.class), anyInt(), any(), any());
lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1);
assertNotNull(lastSequenceIdPushed);
assertEquals(lastSequenceIdPushed.longValue(), 1);
Expand All @@ -318,7 +319,7 @@ public void testIsDuplicateWithFailure() {
publishContext1 = getPublishContext(producerName1, 5);
persistentTopic.publishMessage(byteBuf1, publishContext1);
persistentTopic.addComplete(PositionFactory.create(0, 4), null, publishContext1);
verify(managedLedger, times(4)).asyncAddEntry(any(ByteBuf.class), any(), any());
verify(managedLedger, times(4)).asyncAddEntry(any(ByteBuf.class), anyInt(), any(), any());
lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1);
assertNotNull(lastSequenceIdPushed);
assertEquals(lastSequenceIdPushed.longValue(), 5);
Expand All @@ -330,7 +331,7 @@ public void testIsDuplicateWithFailure() {
byteBuf1 = getMessage(producerName1, 0);
publishContext1 = getPublishContext(producerName1, 0);
persistentTopic.publishMessage(byteBuf1, publishContext1);
verify(managedLedger, times(4)).asyncAddEntry(any(ByteBuf.class), any(), any());
verify(managedLedger, times(4)).asyncAddEntry(any(ByteBuf.class), anyInt(), any(), any());
lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1);
assertNotNull(lastSequenceIdPushed);
assertEquals(lastSequenceIdPushed.longValue(), 5);
Expand All @@ -341,7 +342,7 @@ public void testIsDuplicateWithFailure() {
publishContext1 = getPublishContext(producerName1, 6);
// don't complete message
persistentTopic.publishMessage(byteBuf1, publishContext1);
verify(managedLedger, times(5)).asyncAddEntry(any(ByteBuf.class), any(), any());
verify(managedLedger, times(5)).asyncAddEntry(any(ByteBuf.class), anyInt(), any(), any());
lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1);
assertNotNull(lastSequenceIdPushed);
assertEquals(lastSequenceIdPushed.longValue(), 6);
Expand All @@ -353,7 +354,7 @@ public void testIsDuplicateWithFailure() {
byteBuf1 = getMessage(producerName1, 6);
publishContext1 = getPublishContext(producerName1, 6);
persistentTopic.publishMessage(byteBuf1, publishContext1);
verify(managedLedger, times(5)).asyncAddEntry(any(ByteBuf.class), any(), any());
verify(managedLedger, times(5)).asyncAddEntry(any(ByteBuf.class), anyInt(), any(), any());
verify(publishContext1, times(1)).completed(any(MessageDeduplication.MessageDupUnknownException.class), eq(-1L), eq(-1L));

// complete seq 6 message eventually
Expand All @@ -363,7 +364,7 @@ public void testIsDuplicateWithFailure() {
byteBuf1 = getMessage(producerName1, 7);
publishContext1 = getPublishContext(producerName1, 7);
persistentTopic.publishMessage(byteBuf1, publishContext1);
verify(managedLedger, times(6)).asyncAddEntry(any(ByteBuf.class), any(), any());
verify(managedLedger, times(6)).asyncAddEntry(any(ByteBuf.class), anyInt(), any(), any());

persistentTopic.addFailed(new ManagedLedgerException("test"), publishContext1);
// check highestSequencedPushed is reset
Expand All @@ -383,7 +384,7 @@ public void testIsDuplicateWithFailure() {
byteBuf1 = getMessage(producerName1, 6);
publishContext1 = getPublishContext(producerName1, 6);
persistentTopic.publishMessage(byteBuf1, publishContext1);
verify(managedLedger, times(6)).asyncAddEntry(any(ByteBuf.class), any(), any());
verify(managedLedger, times(6)).asyncAddEntry(any(ByteBuf.class), anyInt(), any(), any());
verify(publishContext1, times(1)).completed(eq(null), eq(-1L), eq(-1L));
lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1);
assertNotNull(lastSequenceIdPushed);
Expand All @@ -393,7 +394,7 @@ public void testIsDuplicateWithFailure() {
byteBuf1 = getMessage(producerName1, 8);
publishContext1 = getPublishContext(producerName1, 8);
persistentTopic.publishMessage(byteBuf1, publishContext1);
verify(managedLedger, times(7)).asyncAddEntry(any(ByteBuf.class), any(), any());
verify(managedLedger, times(7)).asyncAddEntry(any(ByteBuf.class), anyInt(), any(), any());
persistentTopic.addComplete(PositionFactory.create(0, 5), null, publishContext1);
lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1);
assertNotNull(lastSequenceIdPushed);
Expand Down

0 comments on commit a605ea3

Please sign in to comment.