Skip to content

Commit

Permalink
fix: replacing locks with mockito timeout
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Aug 1, 2024
1 parent d349634 commit fdbf663
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
public class BlockStreamServiceIT {

private final System.Logger LOGGER = System.getLogger(getClass().getName());
private final Object lock = new Object();

@Mock private BlockPersistenceHandler<BlockItem, Block> blockPersistenceHandler;
@Mock private StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> streamMediator;
Expand Down Expand Up @@ -127,30 +126,22 @@ public void testPublishBlockStreamRegistrationAndExecution()
// Calling onNext() as Helidon will
streamObserver.onNext(publishStreamRequest);

synchronized (lock) {
lock.wait(50);
}

final ItemAcknowledgement itemAck = new ItemAckBuilder().buildAck(blockItem);
final PublishStreamResponse publishStreamResponse =
PublishStreamResponse.newBuilder().setAcknowledgement(itemAck).build();

// Verify the BlockItem message is sent to the mediator
verify(streamMediator, times(1)).publishEvent(blockItem);
verify(streamMediator, timeout(50).times(1)).publishEvent(blockItem);

// Verify our custom StreamObserver implementation builds and sends
// a response back to the producer
verify(publishStreamResponseObserver, times(1)).onNext(publishStreamResponse);
verify(publishStreamResponseObserver, timeout(50).times(1)).onNext(publishStreamResponse);

// Close the stream as Helidon does
streamObserver.onCompleted();

synchronized (lock) {
lock.wait(50);
}

// verify the onCompleted() method is invoked on the wrapped StreamObserver
verify(publishStreamResponseObserver, times(1)).onCompleted();
verify(publishStreamResponseObserver, timeout(50).times(1)).onCompleted();
}

@Test
Expand Down Expand Up @@ -191,16 +182,12 @@ public void testSubscribeBlockStream() throws InterruptedException {
// Calling onNext() with a BlockItem
streamObserver.onNext(publishStreamRequest);

synchronized (lock) {
lock.wait(50);
}

final SubscribeStreamResponse subscribeStreamResponse =
SubscribeStreamResponse.newBuilder().setBlockItem(blockItems.getFirst()).build();

verify(subscribeStreamObserver1, times(1)).onNext(subscribeStreamResponse);
verify(subscribeStreamObserver2, times(1)).onNext(subscribeStreamResponse);
verify(subscribeStreamObserver3, times(1)).onNext(subscribeStreamResponse);
verify(subscribeStreamObserver1, timeout(50).times(1)).onNext(subscribeStreamResponse);
verify(subscribeStreamObserver2, timeout(50).times(1)).onNext(subscribeStreamResponse);
verify(subscribeStreamObserver3, timeout(50).times(1)).onNext(subscribeStreamResponse);
}

@Test
Expand Down Expand Up @@ -228,10 +215,6 @@ public void testFullHappyPath() throws IOException, InterruptedException {
streamObserver.onNext(publishStreamRequest);
}

synchronized (lock) {
lock.wait(50);
}

verifySubscribeStreamResponse(
numberOfBlocks, 0, numberOfBlocks, subscribeStreamObserver1, blockItems);
verifySubscribeStreamResponse(
Expand Down Expand Up @@ -289,10 +272,6 @@ public void testFullWithSubscribersAddedDynamically() throws IOException, Interr
}
}

synchronized (lock) {
lock.wait(50);
}

// Verify subscribers who were listening before the stream started
verifySubscribeStreamResponse(
numberOfBlocks, 0, numberOfBlocks, subscribeStreamObserver1, blockItems);
Expand Down Expand Up @@ -384,10 +363,6 @@ public void testSubAndUnsubWhileStreaming() throws IOException, InterruptedExcep
}
}

synchronized (lock) {
lock.wait(50);
}

// Verify subscribers who were listening before the stream started
verifySubscribeStreamResponse(numberOfBlocks, 0, 10, subscribeStreamObserver1, blockItems);
verifySubscribeStreamResponse(numberOfBlocks, 0, 60, subscribeStreamObserver2, blockItems);
Expand Down Expand Up @@ -465,17 +440,13 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure()
// Simulate a consumer attempting to connect to the Block Node after the exception.
blockStreamService.subscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver4);

synchronized (lock) {
lock.wait(50);
}

// The BlockItem passed through since it was published
// before the IOException was thrown.
final SubscribeStreamResponse subscribeStreamResponse =
SubscribeStreamResponse.newBuilder().setBlockItem(blockItems.getFirst()).build();
verify(subscribeStreamObserver1, times(1)).onNext(subscribeStreamResponse);
verify(subscribeStreamObserver2, times(1)).onNext(subscribeStreamResponse);
verify(subscribeStreamObserver3, times(1)).onNext(subscribeStreamResponse);
verify(subscribeStreamObserver1, timeout(50).times(1)).onNext(subscribeStreamResponse);
verify(subscribeStreamObserver2, timeout(50).times(1)).onNext(subscribeStreamResponse);
verify(subscribeStreamObserver3, timeout(50).times(1)).onNext(subscribeStreamResponse);

// Verify all the consumers received the end of stream response
// TODO: Fix the response code when it's available
Expand All @@ -485,9 +456,9 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure()
SubscribeStreamResponse.SubscribeStreamResponseCode
.READ_STREAM_SUCCESS)
.build();
verify(subscribeStreamObserver1, times(1)).onNext(endStreamResponse);
verify(subscribeStreamObserver2, times(1)).onNext(endStreamResponse);
verify(subscribeStreamObserver3, times(1)).onNext(endStreamResponse);
verify(subscribeStreamObserver1, timeout(50).times(1)).onNext(endStreamResponse);
verify(subscribeStreamObserver2, timeout(50).times(1)).onNext(endStreamResponse);
verify(subscribeStreamObserver3, timeout(50).times(1)).onNext(endStreamResponse);

// Verify all the consumers were unsubscribed
for (final var s : subscribers.keySet()) {
Expand All @@ -502,8 +473,8 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure()
.build();
final var endOfStreamResponse =
PublishStreamResponse.newBuilder().setStatus(endOfStream).build();
verify(publishStreamResponseObserver, times(2)).onNext(endOfStreamResponse);
verify(webServer, times(1)).stop();
verify(publishStreamResponseObserver, timeout(50).times(2)).onNext(endOfStreamResponse);
verify(webServer, timeout(50).times(1)).stop();

// Now verify the block was removed from the file system.
final BlockReader<Block> blockReader = new BlockAsDirReader(JUNIT, testConfig);
Expand All @@ -518,7 +489,8 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure()
SingleBlockResponse.SingleBlockResponseCode
.READ_BLOCK_NOT_AVAILABLE)
.build();
verify(singleBlockResponseStreamObserver, times(1)).onNext(expectedSingleBlockNotAvailable);
verify(singleBlockResponseStreamObserver, timeout(50).times(1))
.onNext(expectedSingleBlockNotAvailable);

// TODO: Fix the response code when it's available
final SubscribeStreamResponse expectedSubscriberStreamNotAvailable =
Expand All @@ -527,7 +499,8 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure()
SubscribeStreamResponse.SubscribeStreamResponseCode
.READ_STREAM_SUCCESS)
.build();
verify(subscribeStreamObserver4, times(1)).onNext(expectedSubscriberStreamNotAvailable);
verify(subscribeStreamObserver4, timeout(50).times(1))
.onNext(expectedSubscriberStreamNotAvailable);
}

private void removeRootPathWritePerms(final Config config) throws IOException {
Expand Down Expand Up @@ -562,9 +535,9 @@ private static void verifySubscribeStreamResponse(
final SubscribeStreamResponse stateProofStreamResponse =
buildSubscribeStreamResponse(stateProofBlockItem);

verify(streamObserver, times(1)).onNext(headerSubStreamResponse);
verify(streamObserver, times(8)).onNext(bodySubStreamResponse);
verify(streamObserver, times(1)).onNext(stateProofStreamResponse);
verify(streamObserver, timeout(50).times(1)).onNext(headerSubStreamResponse);
verify(streamObserver, timeout(50).times(8)).onNext(bodySubStreamResponse);
verify(streamObserver, timeout(50).times(1)).onNext(stateProofStreamResponse);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@
@ExtendWith(MockitoExtension.class)
public class LiveStreamMediatorImplTest {

private final Object lock = new Object();

@Mock private BlockItemEventHandler<ObjectEvent<SubscribeStreamResponse>> observer1;
@Mock private BlockItemEventHandler<ObjectEvent<SubscribeStreamResponse>> observer2;
@Mock private BlockItemEventHandler<ObjectEvent<SubscribeStreamResponse>> observer3;
Expand Down Expand Up @@ -166,14 +164,10 @@ public void testMediatorPublishEventToSubscribers() throws IOException, Interrup
// Acting as a producer, notify the mediator of a new block
streamMediator.publishEvent(blockItem);

synchronized (lock) {
lock.wait(50);
}

// Confirm each subscriber was notified of the new block
verify(streamObserver1, times(1)).onNext(subscribeStreamResponse);
verify(streamObserver2, times(1)).onNext(subscribeStreamResponse);
verify(streamObserver3, times(1)).onNext(subscribeStreamResponse);
verify(streamObserver1, timeout(50).times(1)).onNext(subscribeStreamResponse);
verify(streamObserver2, timeout(50).times(1)).onNext(subscribeStreamResponse);
verify(streamObserver3, timeout(50).times(1)).onNext(subscribeStreamResponse);

// Confirm the BlockStorage write method was called
verify(blockWriter).write(blockItem);
Expand Down Expand Up @@ -296,16 +290,13 @@ public void testMediatorBlocksPublishAfterException() throws IOException, Interr
streamMediator.publishEvent(firstBlockItem);
fail("Expected an IOException to be thrown");
} catch (IOException e) {
synchronized (lock) {
lock.wait(50);
}

final BlockItem secondBlockItem = blockItems.get(1);
streamMediator.publishEvent(secondBlockItem);

// Confirm the BlockPersistenceHandler write method was only called
// once despite the second block being published.
verify(blockPersistenceHandler, times(1)).persist(firstBlockItem);
verify(blockPersistenceHandler, timeout(50).times(1)).persist(firstBlockItem);
}
}

Expand Down

0 comments on commit fdbf663

Please sign in to comment.