Skip to content

Commit

Permalink
fix style
Browse files Browse the repository at this point in the history
Signed-off-by: Alfredo Gutierrez <[email protected]>
  • Loading branch information
AlfredoG87 committed Jul 29, 2024
1 parent 0485389 commit 9efbe8c
Showing 1 changed file with 62 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,43 +16,45 @@

package com.hedera.block.server.consumer;

import static org.mockito.Mockito.*;

import com.hedera.block.protos.BlockStreamServiceGrpcProto;
import com.hedera.block.server.mediator.StreamMediator;
import io.grpc.stub.StreamObserver;
import java.time.Instant;
import java.time.InstantSource;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import java.time.Clock;
import java.time.Instant;
import java.time.InstantSource;
import java.time.ZoneId;

import static org.mockito.Mockito.*;

@ExtendWith(MockitoExtension.class)
public class LiveStreamObserverImplTest {

private final long TIMEOUT_THRESHOLD_MILLIS = 60L;
private final long TEST_TIME = 1_719_427_664_950L;

@Mock
private StreamMediator<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> streamMediator;

@Mock
private StreamObserver<BlockStreamServiceGrpcProto.Block> responseStreamObserver;
private StreamMediator<
BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse>
streamMediator;

@Mock private StreamObserver<BlockStreamServiceGrpcProto.Block> responseStreamObserver;

@Test
public void testConsumerTimeoutWithinWindow() {
final LiveStreamObserver<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> liveStreamObserver = new LiveStreamObserverImpl(
TIMEOUT_THRESHOLD_MILLIS,
buildClockInsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS),
buildClockInsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS),
streamMediator,
responseStreamObserver);
BlockStreamServiceGrpcProto.Block newBlock = BlockStreamServiceGrpcProto.Block.newBuilder().build();
final LiveStreamObserver<
BlockStreamServiceGrpcProto.Block,
BlockStreamServiceGrpcProto.BlockResponse>
liveStreamObserver =
new LiveStreamObserverImpl(
TIMEOUT_THRESHOLD_MILLIS,
buildClockInsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS),
buildClockInsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS),
streamMediator,
responseStreamObserver);
BlockStreamServiceGrpcProto.Block newBlock =
BlockStreamServiceGrpcProto.Block.newBuilder().build();
liveStreamObserver.notify(newBlock);

// verify the observer is called with the next
Expand All @@ -64,29 +66,39 @@ public void testConsumerTimeoutWithinWindow() {
@Test
public void testConsumerTimeoutOutsideWindow() throws InterruptedException {

final LiveStreamObserver<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> liveStreamObserver = new LiveStreamObserverImpl(
TIMEOUT_THRESHOLD_MILLIS,
buildClockOutsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS),
buildClockOutsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS),
streamMediator,
responseStreamObserver);

final BlockStreamServiceGrpcProto.Block newBlock = BlockStreamServiceGrpcProto.Block.newBuilder().build();
final LiveStreamObserver<
BlockStreamServiceGrpcProto.Block,
BlockStreamServiceGrpcProto.BlockResponse>
liveStreamObserver =
new LiveStreamObserverImpl(
TIMEOUT_THRESHOLD_MILLIS,
buildClockOutsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS),
buildClockOutsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS),
streamMediator,
responseStreamObserver);

final BlockStreamServiceGrpcProto.Block newBlock =
BlockStreamServiceGrpcProto.Block.newBuilder().build();
when(streamMediator.isSubscribed(liveStreamObserver)).thenReturn(true);
liveStreamObserver.notify(newBlock);
verify(streamMediator).unsubscribe(liveStreamObserver);
}

@Test
public void testProducerTimeoutWithinWindow() {
final LiveStreamObserver<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> liveStreamObserver = new LiveStreamObserverImpl(
TIMEOUT_THRESHOLD_MILLIS,
buildClockInsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS),
buildClockInsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS),
streamMediator,
responseStreamObserver);

BlockStreamServiceGrpcProto.BlockResponse blockResponse = BlockStreamServiceGrpcProto.BlockResponse.newBuilder().build();
final LiveStreamObserver<
BlockStreamServiceGrpcProto.Block,
BlockStreamServiceGrpcProto.BlockResponse>
liveStreamObserver =
new LiveStreamObserverImpl(
TIMEOUT_THRESHOLD_MILLIS,
buildClockInsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS),
buildClockInsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS),
streamMediator,
responseStreamObserver);

BlockStreamServiceGrpcProto.BlockResponse blockResponse =
BlockStreamServiceGrpcProto.BlockResponse.newBuilder().build();
liveStreamObserver.onNext(blockResponse);

// verify the mediator is NOT called to unsubscribe the observer
Expand All @@ -95,25 +107,32 @@ public void testProducerTimeoutWithinWindow() {

@Test
public void testProducerTimeoutOutsideWindow() throws InterruptedException {
final LiveStreamObserver<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> liveStreamObserver = new LiveStreamObserverImpl(
TIMEOUT_THRESHOLD_MILLIS,
buildClockOutsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS),
buildClockOutsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS),
streamMediator,
responseStreamObserver);
final LiveStreamObserver<
BlockStreamServiceGrpcProto.Block,
BlockStreamServiceGrpcProto.BlockResponse>
liveStreamObserver =
new LiveStreamObserverImpl(
TIMEOUT_THRESHOLD_MILLIS,
buildClockOutsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS),
buildClockOutsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS),
streamMediator,
responseStreamObserver);

Thread.sleep(51);
BlockStreamServiceGrpcProto.BlockResponse blockResponse = BlockStreamServiceGrpcProto.BlockResponse.newBuilder().build();
BlockStreamServiceGrpcProto.BlockResponse blockResponse =
BlockStreamServiceGrpcProto.BlockResponse.newBuilder().build();
liveStreamObserver.onNext(blockResponse);

verify(streamMediator).unsubscribe(liveStreamObserver);
}

private static InstantSource buildClockInsideWindow(long testTime, long timeoutThresholdMillis) {
private static InstantSource buildClockInsideWindow(
long testTime, long timeoutThresholdMillis) {
return new TestClock(testTime, testTime + timeoutThresholdMillis - 1);
}

private static InstantSource buildClockOutsideWindow(long testTime, long timeoutThresholdMillis) {
private static InstantSource buildClockOutsideWindow(
long testTime, long timeoutThresholdMillis) {
return new TestClock(testTime, testTime + timeoutThresholdMillis + 1);
}

Expand Down

0 comments on commit 9efbe8c

Please sign in to comment.