diff --git a/buildSrc/src/main/kotlin/com.hedera.block.jpms-modules.gradle.kts b/buildSrc/src/main/kotlin/com.hedera.block.jpms-modules.gradle.kts index aca3f89df..ae2ad42a7 100644 --- a/buildSrc/src/main/kotlin/com.hedera.block.jpms-modules.gradle.kts +++ b/buildSrc/src/main/kotlin/com.hedera.block.jpms-modules.gradle.kts @@ -87,4 +87,8 @@ extraJavaModuleInfo { } module("io.grpc:grpc-util", "io.grpc.util") module("io.perfmark:perfmark-api", "io.perfmark") + + module("junit:junit", "junit") + module("org.mockito:mockito-core", "org.mockito") + module("org.mockito:mockito-junit-jupiter", "org.mockito.junit.jupiter") } \ No newline at end of file diff --git a/gradle/modules.properties b/gradle/modules.properties index e609ece8a..f95b58a4f 100644 --- a/gradle/modules.properties +++ b/gradle/modules.properties @@ -3,4 +3,4 @@ io.helidon.webserver=io.helidon.webserver:helidon-webserver io.helidon.webserver.grpc=io.helidon.webserver:helidon-webserver-grpc io.helidon.webserver.testing.junit5=io.helidon.webserver.testing.junit5:helidon-webserver-testing-junit5 io.grpc=io.grpc:grpc-stub -grpc.protobuf=io.grpc:grpc-protobuf:1.20.0 +grpc.protobuf=io.grpc:grpc-protobuf diff --git a/server/build.gradle.kts b/server/build.gradle.kts index 4dd9bc885..711646e16 100644 --- a/server/build.gradle.kts +++ b/server/build.gradle.kts @@ -26,4 +26,6 @@ application { testModuleInfo { requires("org.junit.jupiter.api") + requires("org.mockito") + requires("org.mockito.junit.jupiter") } \ No newline at end of file diff --git a/server/src/main/java/com/hedera/block/server/BlockStreamService.java b/server/src/main/java/com/hedera/block/server/BlockStreamService.java index 7d9ce3b4b..f301272df 100644 --- a/server/src/main/java/com/hedera/block/server/BlockStreamService.java +++ b/server/src/main/java/com/hedera/block/server/BlockStreamService.java @@ -25,6 +25,8 @@ import io.grpc.stub.StreamObserver; import io.helidon.webserver.grpc.GrpcService; +import java.time.Clock; + import static com.hedera.block.server.Constants.*; /** @@ -117,6 +119,8 @@ private StreamObserver streamSource(f // Return a custom StreamObserver to handle streaming blocks from the producer. final LiveStreamObserver streamObserver = new LiveStreamObserverImpl( timeoutThresholdMillis, + Clock.systemDefaultZone(), + Clock.systemDefaultZone(), streamMediator, responseStreamObserver); diff --git a/server/src/main/java/com/hedera/block/server/consumer/LiveStreamObserverImpl.java b/server/src/main/java/com/hedera/block/server/consumer/LiveStreamObserverImpl.java index 4c94e2af0..6117430c7 100644 --- a/server/src/main/java/com/hedera/block/server/consumer/LiveStreamObserverImpl.java +++ b/server/src/main/java/com/hedera/block/server/consumer/LiveStreamObserverImpl.java @@ -20,6 +20,10 @@ import com.hedera.block.server.mediator.StreamMediator; import io.grpc.stub.StreamObserver; +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; + /** * The LiveStreamObserverImpl class implements the LiveStreamObserver interface to pass blocks to the downstream consumer * via the notify method and manage the bidirectional stream to the consumer via the onNext, onError, and onCompleted methods. @@ -31,10 +35,14 @@ public class LiveStreamObserverImpl implements LiveStreamObserver mediator; private final StreamObserver responseStreamObserver; - private long consumerLivenessMillis; - private long producerLivenessMillis; private final long timeoutThresholdMillis; + private final Clock producerLivenessClock; + private Instant producerLivenessInstant; + + private final Clock consumerLivenessClock; + private Instant consumerLivenessInstant; + /** * Constructor for the LiveStreamObserverImpl class. * @@ -43,15 +51,19 @@ public class LiveStreamObserverImpl implements LiveStreamObserver mediator, final StreamObserver responseStreamObserver) { + this.timeoutThresholdMillis = timeoutThresholdMillis; + this.producerLivenessClock = producerLivenessClock; + this.consumerLivenessClock = consumerLivenessClock; this.mediator = mediator; this.responseStreamObserver = responseStreamObserver; - this.timeoutThresholdMillis = timeoutThresholdMillis; - this.consumerLivenessMillis = System.currentTimeMillis(); - this.producerLivenessMillis = System.currentTimeMillis(); + this.producerLivenessInstant = Instant.now(producerLivenessClock); + this.consumerLivenessInstant = Instant.now(consumerLivenessClock); } /** @@ -62,13 +74,15 @@ public LiveStreamObserverImpl( @Override public void notify(final BlockStreamServiceGrpcProto.Block block) { - if (System.currentTimeMillis() - consumerLivenessMillis > timeoutThresholdMillis) { + // Check if the consumer has timed out. If so, unsubscribe the observer from the mediator. + if (Duration.between(consumerLivenessInstant, Instant.now(consumerLivenessClock)).toMillis() > timeoutThresholdMillis) { if (mediator.isSubscribed(this)) { LOGGER.log(System.Logger.Level.DEBUG, "Consumer timeout threshold exceeded. Unsubscribing observer."); mediator.unsubscribe(this); } } else { - producerLivenessMillis = System.currentTimeMillis(); + // Refresh the producer liveness and pass the block to the observer. + producerLivenessInstant = Instant.now(producerLivenessClock); responseStreamObserver.onNext(block); } } @@ -81,14 +95,12 @@ public void notify(final BlockStreamServiceGrpcProto.Block block) { @Override public void onNext(final BlockStreamServiceGrpcProto.BlockResponse blockResponse) { - if (System.currentTimeMillis() - producerLivenessMillis > timeoutThresholdMillis) { - if (mediator.isSubscribed(this)) { - LOGGER.log(System.Logger.Level.DEBUG, "Producer timeout threshold exceeded. Unsubscribing observer."); - mediator.unsubscribe(this); - } + if (Duration.between(producerLivenessInstant, Instant.now(producerLivenessClock)).toMillis() > timeoutThresholdMillis) { + LOGGER.log(System.Logger.Level.DEBUG, "Producer timeout threshold exceeded. Unsubscribing observer."); + mediator.unsubscribe(this); } else { LOGGER.log(System.Logger.Level.DEBUG, "Received response block " + blockResponse); - consumerLivenessMillis = System.currentTimeMillis(); + consumerLivenessInstant = Instant.now(consumerLivenessClock); } } diff --git a/server/src/test/java/com/hedera/block/server/consumer/LiveStreamObserverImplTest.java b/server/src/test/java/com/hedera/block/server/consumer/LiveStreamObserverImplTest.java new file mode 100644 index 000000000..b744925d6 --- /dev/null +++ b/server/src/test/java/com/hedera/block/server/consumer/LiveStreamObserverImplTest.java @@ -0,0 +1,105 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.server.consumer; + +import com.hedera.block.protos.BlockStreamServiceGrpcProto; +import com.hedera.block.server.mediator.StreamMediator; +import io.grpc.stub.StreamObserver; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.time.Clock; + +import static org.mockito.Mockito.*; + +@ExtendWith(MockitoExtension.class) +public class LiveStreamObserverImplTest { + + @Mock + private StreamMediator streamMediator; + + @Mock + private StreamObserver responseStreamObserver; + + + @Test + public void testConsumerTimeoutWithinWindow() { + final LiveStreamObserver liveStreamObserver = new LiveStreamObserverImpl( + 50, + Clock.systemDefaultZone(), + Clock.systemDefaultZone(), + streamMediator, + responseStreamObserver); + BlockStreamServiceGrpcProto.Block newBlock = BlockStreamServiceGrpcProto.Block.newBuilder().build(); + liveStreamObserver.notify(newBlock); + + // verify the observer is called with the next + // block and the stream mediator is not unsubscribed + verify(responseStreamObserver).onNext(newBlock); + verify(streamMediator, never()).unsubscribe(liveStreamObserver); + } + + @Test + public void testConsumerTimeoutOutsideWindow() throws InterruptedException { + final LiveStreamObserver liveStreamObserver = new LiveStreamObserverImpl( + 50, + Clock.systemDefaultZone(), + Clock.systemDefaultZone(), + streamMediator, + responseStreamObserver); + + Thread.sleep(51); + BlockStreamServiceGrpcProto.Block newBlock = BlockStreamServiceGrpcProto.Block.newBuilder().build(); + when(streamMediator.isSubscribed(liveStreamObserver)).thenReturn(true); + liveStreamObserver.notify(newBlock); + verify(streamMediator).unsubscribe(liveStreamObserver); + } + + @Test + public void testProducerTimeoutWithinWindow() { + final LiveStreamObserver liveStreamObserver = new LiveStreamObserverImpl( + 50, + Clock.systemDefaultZone(), + Clock.systemDefaultZone(), + streamMediator, + responseStreamObserver); + + BlockStreamServiceGrpcProto.BlockResponse blockResponse = BlockStreamServiceGrpcProto.BlockResponse.newBuilder().build(); + liveStreamObserver.onNext(blockResponse); + + // verify the mediator is NOT called to unsubscribe the observer + verify(streamMediator, never()).unsubscribe(liveStreamObserver); + } + + @Test + public void testProducerTimeoutOutsideWindow() throws InterruptedException { + final LiveStreamObserver liveStreamObserver = new LiveStreamObserverImpl( + 50, + Clock.systemDefaultZone(), + Clock.systemDefaultZone(), + streamMediator, + responseStreamObserver); + + Thread.sleep(51); + BlockStreamServiceGrpcProto.BlockResponse blockResponse = BlockStreamServiceGrpcProto.BlockResponse.newBuilder().build(); + liveStreamObserver.onNext(blockResponse); + + verify(streamMediator).unsubscribe(liveStreamObserver); + } +} diff --git a/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java b/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java new file mode 100644 index 000000000..2db3a4547 --- /dev/null +++ b/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java @@ -0,0 +1,146 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.server.mediator; + + +import com.hedera.block.protos.BlockStreamServiceGrpcProto; +import com.hedera.block.server.consumer.LiveStreamObserver; +import com.hedera.block.server.persistence.WriteThroughCacheHandler; +import com.hedera.block.server.persistence.cache.BlockCache; +import com.hedera.block.server.persistence.storage.BlockStorage; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.verify; + +@ExtendWith(MockitoExtension.class) +public class LiveStreamMediatorImplTest { + + @Mock + private LiveStreamObserver liveStreamObserver1; + + @Mock + private LiveStreamObserver liveStreamObserver2; + + @Mock + private LiveStreamObserver liveStreamObserver3; + + @Mock + private BlockStorage blockStorage; + + @Mock + private BlockCache blockCache; + + @Test + public void testUnsubscribeAll() { + + final StreamMediator streamMediator = + new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage, blockCache)); + + // Set up the subscribers + streamMediator.subscribe(liveStreamObserver1); + streamMediator.subscribe(liveStreamObserver2); + streamMediator.subscribe(liveStreamObserver3); + + assertTrue(streamMediator.isSubscribed(liveStreamObserver1), "Expected the mediator to have liveStreamObserver1 subscribed"); + assertTrue(streamMediator.isSubscribed(liveStreamObserver2), "Expected the mediator to have liveStreamObserver2 subscribed"); + assertTrue(streamMediator.isSubscribed(liveStreamObserver3), "Expected the mediator to have liveStreamObserver3 subscribed"); + + streamMediator.unsubscribeAll(); + + assertFalse(streamMediator.isSubscribed(liveStreamObserver1), "Expected the mediator to have unsubscribed liveStreamObserver1"); + assertFalse(streamMediator.isSubscribed(liveStreamObserver2), "Expected the mediator to have unsubscribed liveStreamObserver2"); + assertFalse(streamMediator.isSubscribed(liveStreamObserver3), "Expected the mediator to have unsubscribed liveStreamObserver3"); + } + + @Test + public void testUnsubscribeEach() { + + final StreamMediator streamMediator = + new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage, blockCache)); + + // Set up the subscribers + streamMediator.subscribe(liveStreamObserver1); + streamMediator.subscribe(liveStreamObserver2); + streamMediator.subscribe(liveStreamObserver3); + + assertTrue(streamMediator.isSubscribed(liveStreamObserver1), "Expected the mediator to have liveStreamObserver1 subscribed"); + assertTrue(streamMediator.isSubscribed(liveStreamObserver2), "Expected the mediator to have liveStreamObserver2 subscribed"); + assertTrue(streamMediator.isSubscribed(liveStreamObserver3), "Expected the mediator to have liveStreamObserver3 subscribed"); + + streamMediator.unsubscribe(liveStreamObserver1); + assertFalse(streamMediator.isSubscribed(liveStreamObserver1), "Expected the mediator to have unsubscribed liveStreamObserver1"); + + streamMediator.unsubscribe(liveStreamObserver2); + assertFalse(streamMediator.isSubscribed(liveStreamObserver2), "Expected the mediator to have unsubscribed liveStreamObserver2"); + + streamMediator.unsubscribe(liveStreamObserver3); + assertFalse(streamMediator.isSubscribed(liveStreamObserver3), "Expected the mediator to have unsubscribed liveStreamObserver3"); + } + + @Test + public void testMediatorPersistenceWithoutSubscribers() { + + final StreamMediator streamMediator = + new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage, blockCache)); + + final BlockStreamServiceGrpcProto.Block newBlock = BlockStreamServiceGrpcProto.Block.newBuilder().build(); + + // Acting as a producer, notify the mediator of a new block + streamMediator.notifyAll(newBlock); + + // Confirm the block was persisted to storage and cache + // even though there are no subscribers + verify(blockStorage).write(newBlock); + verify(blockCache).insert(newBlock); + } + + @Test + public void testMediatorNotifyAll() { + + final StreamMediator streamMediator = + new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage, blockCache)); + + // Set up the subscribers + streamMediator.subscribe(liveStreamObserver1); + streamMediator.subscribe(liveStreamObserver2); + streamMediator.subscribe(liveStreamObserver3); + + assertTrue(streamMediator.isSubscribed(liveStreamObserver1), "Expected the mediator to have liveStreamObserver1 subscribed"); + assertTrue(streamMediator.isSubscribed(liveStreamObserver2), "Expected the mediator to have liveStreamObserver2 subscribed"); + assertTrue(streamMediator.isSubscribed(liveStreamObserver3), "Expected the mediator to have liveStreamObserver3 subscribed"); + + final BlockStreamServiceGrpcProto.Block newBlock = BlockStreamServiceGrpcProto.Block.newBuilder().build(); + + // Acting as a producer, notify the mediator of a new block + streamMediator.notifyAll(newBlock); + + // Confirm each subscriber was notified of the new block + verify(liveStreamObserver1).notify(newBlock); + verify(liveStreamObserver2).notify(newBlock); + verify(liveStreamObserver3).notify(newBlock); + + // Confirm the block was persisted to storage and cache + verify(blockStorage).write(newBlock); + verify(blockCache).insert(newBlock); + } + +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 2e0cdac7a..9a444f5a7 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -40,7 +40,9 @@ dependencyResolutionManagement { // Testing only versions version("org.assertj.core", "3.23.1") - version("org.junit.jupiter.api", "5.10.0") + version("org.junit.jupiter.api", "5.10.2") + version("org.mockito", "5.8.0") + version("org.mockito.junit.jupiter", "5.8.0") } } }