Skip to content

Commit

Permalink
fix: replaced lock
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Aug 1, 2024
1 parent d9db451 commit 2529488
Showing 1 changed file with 20 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@
@ExtendWith(MockitoExtension.class)
public class ProducerBlockItemObserverTest {

private final Object lock = new Object();

@Mock private ItemAckBuilder itemAckBuilder;
@Mock private StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> streamMediator;
@Mock private StreamObserver<PublishStreamResponse> publishStreamResponseObserver;
Expand All @@ -63,11 +61,10 @@ public class ProducerBlockItemObserverTest {
@Mock private ServiceStatus serviceStatus;

@Test
public void testProducerOnNext()
throws InterruptedException, IOException, NoSuchAlgorithmException {
public void testProducerOnNext() throws IOException, NoSuchAlgorithmException {

List<BlockItem> blockItems = generateBlockItems(1);
ProducerBlockItemObserver producerBlockItemObserver =
final List<BlockItem> blockItems = generateBlockItems(1);
final ProducerBlockItemObserver producerBlockItemObserver =
new ProducerBlockItemObserver(
streamMediator,
publishStreamResponseObserver,
Expand All @@ -76,16 +73,12 @@ public void testProducerOnNext()

when(serviceStatus.isRunning()).thenReturn(true);

BlockItem blockHeader = blockItems.getFirst();
PublishStreamRequest publishStreamRequest =
final BlockItem blockHeader = blockItems.getFirst();
final PublishStreamRequest publishStreamRequest =
PublishStreamRequest.newBuilder().setBlockItem(blockHeader).build();
producerBlockItemObserver.onNext(publishStreamRequest);

synchronized (lock) {
lock.wait(50);
}

verify(streamMediator, times(1)).publishEvent(blockHeader);
verify(streamMediator, timeout(50).times(1)).publishEvent(blockHeader);

final ItemAcknowledgement itemAck =
ItemAcknowledgement.newBuilder()
Expand All @@ -95,12 +88,12 @@ public void testProducerOnNext()
BlockStreamService.PublishStreamResponse.newBuilder()
.setAcknowledgement(itemAck)
.build();
verify(publishStreamResponseObserver, times(1)).onNext(publishStreamResponse);
verify(publishStreamResponseObserver, timeout(50).times(1)).onNext(publishStreamResponse);

// Helidon will call onCompleted after onNext
producerBlockItemObserver.onCompleted();

verify(publishStreamResponseObserver, times(1)).onCompleted();
verify(publishStreamResponseObserver, timeout(50).times(1)).onCompleted();
}

@Test
Expand Down Expand Up @@ -160,18 +153,14 @@ public void testProducerToManyConsumers() throws IOException, InterruptedExcepti
new ItemAckBuilder(),
serviceStatus);

PublishStreamRequest publishStreamRequest =
final PublishStreamRequest publishStreamRequest =
PublishStreamRequest.newBuilder().setBlockItem(blockItem).build();
producerBlockItemObserver.onNext(publishStreamRequest);

synchronized (lock) {
lock.wait(50);
}

// Confirm each subscriber was notified of the new block
verify(streamObserver1, times(1)).onNext(subscribeStreamResponse);
verify(streamObserver2, times(1)).onNext(subscribeStreamResponse);
verify(streamObserver3, times(1)).onNext(subscribeStreamResponse);
verify(streamObserver1, timeout(50).times(1)).onNext(subscribeStreamResponse);
verify(streamObserver2, timeout(50).times(1)).onNext(subscribeStreamResponse);
verify(streamObserver3, timeout(50).times(1)).onNext(subscribeStreamResponse);

// Confirm the BlockStorage write method was
// called despite the absence of subscribers
Expand All @@ -180,23 +169,22 @@ public void testProducerToManyConsumers() throws IOException, InterruptedExcepti

@Test
public void testOnError() {
ProducerBlockItemObserver producerBlockItemObserver =
final ProducerBlockItemObserver producerBlockItemObserver =
new ProducerBlockItemObserver(
streamMediator,
publishStreamResponseObserver,
new ItemAckBuilder(),
serviceStatus);

Throwable t = new Throwable("Test error");
final Throwable t = new Throwable("Test error");
producerBlockItemObserver.onError(t);
verify(publishStreamResponseObserver).onError(t);
}

@Test
public void testItemAckBuilderExceptionTest()
throws InterruptedException, IOException, NoSuchAlgorithmException {
public void testItemAckBuilderExceptionTest() throws IOException, NoSuchAlgorithmException {

ProducerBlockItemObserver producerBlockItemObserver =
final ProducerBlockItemObserver producerBlockItemObserver =
new ProducerBlockItemObserver(
streamMediator,
publishStreamResponseObserver,
Expand All @@ -207,16 +195,12 @@ public void testItemAckBuilderExceptionTest()
when(itemAckBuilder.buildAck(any()))
.thenThrow(new NoSuchAlgorithmException("Test exception"));

List<BlockItem> blockItems = generateBlockItems(1);
BlockItem blockHeader = blockItems.getFirst();
PublishStreamRequest publishStreamRequest =
final List<BlockItem> blockItems = generateBlockItems(1);
final BlockItem blockHeader = blockItems.getFirst();
final PublishStreamRequest publishStreamRequest =
PublishStreamRequest.newBuilder().setBlockItem(blockHeader).build();
producerBlockItemObserver.onNext(publishStreamRequest);

synchronized (lock) {
lock.wait(50);
}

final PublishStreamResponse.EndOfStream endOfStream =
PublishStreamResponse.EndOfStream.newBuilder()
.setStatus(
Expand All @@ -225,6 +209,6 @@ public void testItemAckBuilderExceptionTest()
.build();
final PublishStreamResponse errorResponse =
PublishStreamResponse.newBuilder().setStatus(endOfStream).build();
verify(publishStreamResponseObserver, times(1)).onNext(errorResponse);
verify(publishStreamResponseObserver, timeout(50).times(1)).onNext(errorResponse);
}
}

0 comments on commit 2529488

Please sign in to comment.