diff --git a/server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java b/server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java index 9c525ed70..b6e333361 100644 --- a/server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java +++ b/server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java @@ -57,7 +57,6 @@ public ConsumerBlockItemObserver( final StreamObserver subscribeStreamResponseObserver) { this.timeoutThresholdMillis = timeoutThresholdMillis; - this.producerLivenessClock = producerLivenessClock; this.streamMediator = streamMediator; // The ServerCallStreamObserver can be configured with a Runnable to @@ -80,45 +79,31 @@ public ConsumerBlockItemObserver( } this.subscribeStreamResponseObserver = subscribeStreamResponseObserver; + this.producerLivenessClock = producerLivenessClock; + this.producerLivenessMillis = producerLivenessClock.millis(); } /** Pass the block to the observer provided by Helidon */ @Override public void onEvent(final ObjectEvent event, final long l, final boolean b) { - if (producerLivenessClock.millis() - producerLivenessMillis > timeoutThresholdMillis) { - - // if (isThresholdExceeded(producerLivenessMillis)) { + final long currentMillis = producerLivenessClock.millis(); + if (currentMillis - producerLivenessMillis > timeoutThresholdMillis) { streamMediator.unsubscribe(this); + LOGGER.log(System.Logger.Level.DEBUG, "Unsubscribed handler"); } else { - // Refresh the producer liveness and pass the block to the observer. - producerLivenessMillis = producerLivenessClock.millis(); + // Refresh the producer liveness and pass the BlockItem to the downstream observer. + producerLivenessMillis = currentMillis; - final BlockItem blockItem = event.get(); + // Construct a response final SubscribeStreamResponse subscribeStreamResponse = - SubscribeStreamResponse.newBuilder().setBlockItem(blockItem).build(); + SubscribeStreamResponse.newBuilder().setBlockItem(event.get()).build(); subscribeStreamResponseObserver.onNext(subscribeStreamResponse); } } - // private boolean isThresholdExceeded(long livenessMillis) { - // final long currentTimeMillis = Clock.systemDefaultZone().millis(); - // final long elapsedMillis = currentTimeMillis - livenessMillis; - // if (elapsedMillis > timeoutThresholdMillis) { - // LOGGER.log( - // System.Logger.Level.INFO, - // "Elapsed milliseconds: " - // + elapsedMillis - // + ", timeout threshold: " - // + timeoutThresholdMillis); - // return true; - // } - // - // return false; - // } - @Override public void awaitShutdown() throws InterruptedException { shutdownLatch.await(); diff --git a/server/src/test/java/com/hedera/block/server/consumer/LiveStreamObserverImplTest.java b/server/src/test/java/com/hedera/block/server/consumer/ConsumerBlockItemObserverTest.java similarity index 56% rename from server/src/test/java/com/hedera/block/server/consumer/LiveStreamObserverImplTest.java rename to server/src/test/java/com/hedera/block/server/consumer/ConsumerBlockItemObserverTest.java index 5a6170cee..3f2cf3e61 100644 --- a/server/src/test/java/com/hedera/block/server/consumer/LiveStreamObserverImplTest.java +++ b/server/src/test/java/com/hedera/block/server/consumer/ConsumerBlockItemObserverTest.java @@ -18,21 +18,20 @@ import static com.hedera.block.protos.BlockStreamService.BlockItem; import static com.hedera.block.protos.BlockStreamService.SubscribeStreamResponse; +import static com.hedera.block.server.util.TestClock.buildClockInsideWindow; +import static com.hedera.block.server.util.TestClock.buildClockOutsideWindow; import static org.mockito.Mockito.*; import com.hedera.block.server.data.ObjectEvent; import com.hedera.block.server.mediator.StreamMediator; import io.grpc.stub.StreamObserver; -import java.time.Instant; -import java.time.InstantSource; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @ExtendWith(MockitoExtension.class) -public class LiveStreamObserverImplTest { +public class ConsumerBlockItemObserverTest { private final long TIMEOUT_THRESHOLD_MILLIS = 50L; private final long TEST_TIME = 1_719_427_664_950L; @@ -44,8 +43,7 @@ public class LiveStreamObserverImplTest { @Mock private ObjectEvent objectEvent; @Test - @Disabled - public void testConsumerTimeoutWithinWindow() { + public void testProducerTimeoutWithinWindow() { final var consumerBlockItemObserver = new ConsumerBlockItemObserver( TIMEOUT_THRESHOLD_MILLIS, @@ -61,37 +59,8 @@ public void testConsumerTimeoutWithinWindow() { consumerBlockItemObserver.onEvent(objectEvent, 0, true); - // verify the observer is called with the next - // block and the stream mediator is not unsubscribed + // verify the observer is called with the next BlockItem verify(responseStreamObserver).onNext(subscribeStreamResponse); - verify(streamMediator, never()).unsubscribe(consumerBlockItemObserver); - } - - @Test - public void testConsumerTimeoutOutsideWindow() { - - final var consumerBlockItemObserver = - new ConsumerBlockItemObserver( - TIMEOUT_THRESHOLD_MILLIS, - buildClockOutsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS), - streamMediator, - responseStreamObserver); - - consumerBlockItemObserver.onEvent(objectEvent, 1, true); - verify(streamMediator).unsubscribe(consumerBlockItemObserver); - } - - @Test - @Disabled - public void testProducerTimeoutWithinWindow() { - final var consumerBlockItemObserver = - new ConsumerBlockItemObserver( - TIMEOUT_THRESHOLD_MILLIS, - buildClockOutsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS), - streamMediator, - responseStreamObserver); - - consumerBlockItemObserver.onEvent(objectEvent, 0, true); // verify the mediator is NOT called to unsubscribe the observer verify(streamMediator, never()).unsubscribe(consumerBlockItemObserver); @@ -111,39 +80,4 @@ public void testProducerTimeoutOutsideWindow() throws InterruptedException { consumerBlockItemObserver.onEvent(objectEvent, 0, true); verify(streamMediator).unsubscribe(consumerBlockItemObserver); } - - private static InstantSource buildClockInsideWindow( - long testTime, long timeoutThresholdMillis) { - return new TestClock(testTime, testTime + timeoutThresholdMillis - 1); - } - - private static InstantSource buildClockOutsideWindow( - long testTime, long timeoutThresholdMillis) { - return new TestClock(testTime, testTime + timeoutThresholdMillis + 1); - } - - static class TestClock implements InstantSource { - - private int index; - private final Long[] millis; - - TestClock(Long... millis) { - this.millis = millis; - } - - @Override - public long millis() { - long value = millis[index]; - - // cycle through the provided millis - // and wrap around if necessary - index = index > millis.length - 1 ? 0 : index + 1; - return value; - } - - @Override - public Instant instant() { - return null; - } - } } diff --git a/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java b/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java index 0123924ea..21c669b07 100644 --- a/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java +++ b/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java @@ -16,9 +16,8 @@ package com.hedera.block.server.mediator; -import static com.hedera.block.protos.BlockStreamService.Block; -import static com.hedera.block.protos.BlockStreamService.BlockItem; -import static com.hedera.block.protos.BlockStreamService.SubscribeStreamResponse; +import static com.hedera.block.protos.BlockStreamService.*; +import static com.hedera.block.server.util.TestClock.buildClockInsideWindow; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.times; @@ -30,8 +29,6 @@ import com.hedera.block.server.persistence.WriteThroughCacheHandler; import com.hedera.block.server.persistence.storage.BlockStorage; import io.grpc.stub.StreamObserver; -import java.time.Clock; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -48,7 +45,9 @@ public class LiveStreamMediatorImplTest { @Mock private BlockStorage blockStorage; - @Mock private StreamObserver streamObserver; + @Mock private StreamObserver streamObserver1; + @Mock private StreamObserver streamObserver2; + @Mock private StreamObserver streamObserver3; @Test public void testUnsubscribeEach() { @@ -104,24 +103,33 @@ public void testMediatorPersistenceWithoutSubscribers() { } @Test - @Disabled - public void testMediatorPublishEventToSubscribers() { + public void testMediatorPublishEventToSubscribers() throws InterruptedException { - final long TEST_TIMEOUT = 10000; + final long TIMEOUT_THRESHOLD_MILLIS = 100L; + final long TEST_TIME = 1_719_427_664_950L; final var streamMediator = new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage)); final var concreteObserver1 = new ConsumerBlockItemObserver( - TEST_TIMEOUT, Clock.systemDefaultZone(), streamMediator, streamObserver); + TIMEOUT_THRESHOLD_MILLIS, + buildClockInsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS), + streamMediator, + streamObserver1); final var concreteObserver2 = new ConsumerBlockItemObserver( - TEST_TIMEOUT, Clock.systemDefaultZone(), streamMediator, streamObserver); + TIMEOUT_THRESHOLD_MILLIS, + buildClockInsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS), + streamMediator, + streamObserver2); final var concreteObserver3 = new ConsumerBlockItemObserver( - TEST_TIMEOUT, Clock.systemDefaultZone(), streamMediator, streamObserver); + TIMEOUT_THRESHOLD_MILLIS, + buildClockInsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS), + streamMediator, + streamObserver3); // Set up the subscribers streamMediator.subscribe(concreteObserver1); @@ -139,13 +147,21 @@ public void testMediatorPublishEventToSubscribers() { "Expected the mediator to have observer3 subscribed"); final BlockItem blockItem = BlockItem.newBuilder().build(); + final SubscribeStreamResponse subscribeStreamResponse = + SubscribeStreamResponse.newBuilder().setBlockItem(blockItem).build(); // Acting as a producer, notify the mediator of a new block streamMediator.publishEvent(blockItem); + // TODO: Is there a better way? + synchronized (streamObserver1) { + streamObserver1.wait(2000); + } + // Confirm each subscriber was notified of the new block - verify(streamObserver, times(3)) - .onNext(SubscribeStreamResponse.newBuilder().setBlockItem(blockItem).build()); + verify(streamObserver1, times(1)).onNext(subscribeStreamResponse); + verify(streamObserver2, times(1)).onNext(subscribeStreamResponse); + verify(streamObserver3, times(1)).onNext(subscribeStreamResponse); // Confirm the BlockStorage write method was // called despite the absence of subscribers diff --git a/server/src/test/java/com/hedera/block/server/util/TestClock.java b/server/src/test/java/com/hedera/block/server/util/TestClock.java new file mode 100644 index 000000000..c32cd98c8 --- /dev/null +++ b/server/src/test/java/com/hedera/block/server/util/TestClock.java @@ -0,0 +1,61 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.server.util; + +import java.time.Instant; +import java.time.InstantSource; + +public class TestClock implements InstantSource { + + private int index; + private final Long[] millis; + private final System.Logger LOGGER = System.getLogger(getClass().getName()); + + public TestClock(Long... millis) { + if (millis.length == 0) { + throw new IllegalArgumentException("At least 1 argument is required"); + } + + this.millis = millis; + } + + @Override + public long millis() { + LOGGER.log(System.Logger.Level.INFO, "millis() called"); + long value = millis[index]; + + // cycle through the provided millis + // and wrap around if necessary + int temp = index + 1; + index = (temp % millis.length == 0) ? 0 : temp; + return value; + } + + @Override + public Instant instant() { + return null; + } + + public static InstantSource buildClockInsideWindow(long testTime, long timeoutThresholdMillis) { + return new TestClock(testTime, testTime + timeoutThresholdMillis - 1); + } + + public static InstantSource buildClockOutsideWindow( + long testTime, long timeoutThresholdMillis) { + return new TestClock(testTime, testTime + timeoutThresholdMillis + 1); + } +}