Skip to content

Commit

Permalink
fix: enabled mockito. changed LiveStreamObserverImpl to use Java Cloc…
Browse files Browse the repository at this point in the history
…k. added tests"

Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Jun 26, 2024
1 parent 95e4da4 commit 48c8d90
Show file tree
Hide file tree
Showing 8 changed files with 290 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,8 @@ extraJavaModuleInfo {
}
module("io.grpc:grpc-util", "io.grpc.util")
module("io.perfmark:perfmark-api", "io.perfmark")

module("junit:junit", "junit")
module("org.mockito:mockito-core", "org.mockito")
module("org.mockito:mockito-junit-jupiter", "org.mockito.junit.jupiter")
}
2 changes: 1 addition & 1 deletion gradle/modules.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ io.helidon.webserver=io.helidon.webserver:helidon-webserver
io.helidon.webserver.grpc=io.helidon.webserver:helidon-webserver-grpc
io.helidon.webserver.testing.junit5=io.helidon.webserver.testing.junit5:helidon-webserver-testing-junit5
io.grpc=io.grpc:grpc-stub
grpc.protobuf=io.grpc:grpc-protobuf:1.20.0
grpc.protobuf=io.grpc:grpc-protobuf
2 changes: 2 additions & 0 deletions server/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,6 @@ application {

testModuleInfo {
requires("org.junit.jupiter.api")
requires("org.mockito")
requires("org.mockito.junit.jupiter")
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import io.grpc.stub.StreamObserver;
import io.helidon.webserver.grpc.GrpcService;

import java.time.Clock;

import static com.hedera.block.server.Constants.*;

/**
Expand Down Expand Up @@ -117,6 +119,8 @@ private StreamObserver<BlockStreamServiceGrpcProto.BlockResponse> streamSource(f
// Return a custom StreamObserver to handle streaming blocks from the producer.
final LiveStreamObserver<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> streamObserver = new LiveStreamObserverImpl(
timeoutThresholdMillis,
Clock.systemDefaultZone(),
Clock.systemDefaultZone(),
streamMediator,
responseStreamObserver);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
import com.hedera.block.server.mediator.StreamMediator;
import io.grpc.stub.StreamObserver;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;

/**
* The LiveStreamObserverImpl class implements the LiveStreamObserver interface to pass blocks to the downstream consumer
* via the notify method and manage the bidirectional stream to the consumer via the onNext, onError, and onCompleted methods.
Expand All @@ -31,10 +35,14 @@ public class LiveStreamObserverImpl implements LiveStreamObserver<BlockStreamSer
private final StreamMediator<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> mediator;
private final StreamObserver<BlockStreamServiceGrpcProto.Block> responseStreamObserver;

private long consumerLivenessMillis;
private long producerLivenessMillis;
private final long timeoutThresholdMillis;

private final Clock producerLivenessClock;
private Instant producerLivenessInstant;

private final Clock consumerLivenessClock;
private Instant consumerLivenessInstant;

/**
* Constructor for the LiveStreamObserverImpl class.
*
Expand All @@ -43,15 +51,19 @@ public class LiveStreamObserverImpl implements LiveStreamObserver<BlockStreamSer
*/
public LiveStreamObserverImpl(
final long timeoutThresholdMillis,
final Clock producerLivenessClock,
final Clock consumerLivenessClock,
final StreamMediator<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> mediator,
final StreamObserver<BlockStreamServiceGrpcProto.Block> responseStreamObserver) {

this.timeoutThresholdMillis = timeoutThresholdMillis;
this.producerLivenessClock = producerLivenessClock;
this.consumerLivenessClock = consumerLivenessClock;
this.mediator = mediator;
this.responseStreamObserver = responseStreamObserver;

this.timeoutThresholdMillis = timeoutThresholdMillis;
this.consumerLivenessMillis = System.currentTimeMillis();
this.producerLivenessMillis = System.currentTimeMillis();
this.producerLivenessInstant = Instant.now(producerLivenessClock);
this.consumerLivenessInstant = Instant.now(consumerLivenessClock);
}

/**
Expand All @@ -62,13 +74,15 @@ public LiveStreamObserverImpl(
@Override
public void notify(final BlockStreamServiceGrpcProto.Block block) {

if (System.currentTimeMillis() - consumerLivenessMillis > timeoutThresholdMillis) {
// Check if the consumer has timed out. If so, unsubscribe the observer from the mediator.
if (Duration.between(consumerLivenessInstant, Instant.now(consumerLivenessClock)).toMillis() > timeoutThresholdMillis) {
if (mediator.isSubscribed(this)) {
LOGGER.log(System.Logger.Level.DEBUG, "Consumer timeout threshold exceeded. Unsubscribing observer.");
mediator.unsubscribe(this);
}
} else {
producerLivenessMillis = System.currentTimeMillis();
// Refresh the producer liveness and pass the block to the observer.
producerLivenessInstant = Instant.now(producerLivenessClock);
responseStreamObserver.onNext(block);
}
}
Expand All @@ -81,14 +95,12 @@ public void notify(final BlockStreamServiceGrpcProto.Block block) {
@Override
public void onNext(final BlockStreamServiceGrpcProto.BlockResponse blockResponse) {

if (System.currentTimeMillis() - producerLivenessMillis > timeoutThresholdMillis) {
if (mediator.isSubscribed(this)) {
LOGGER.log(System.Logger.Level.DEBUG, "Producer timeout threshold exceeded. Unsubscribing observer.");
mediator.unsubscribe(this);
}
if (Duration.between(producerLivenessInstant, Instant.now(producerLivenessClock)).toMillis() > timeoutThresholdMillis) {
LOGGER.log(System.Logger.Level.DEBUG, "Producer timeout threshold exceeded. Unsubscribing observer.");
mediator.unsubscribe(this);
} else {
LOGGER.log(System.Logger.Level.DEBUG, "Received response block " + blockResponse);
consumerLivenessMillis = System.currentTimeMillis();
consumerLivenessInstant = Instant.now(consumerLivenessClock);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.block.server.consumer;

import com.hedera.block.protos.BlockStreamServiceGrpcProto;
import com.hedera.block.server.mediator.StreamMediator;
import io.grpc.stub.StreamObserver;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import java.time.Clock;

import static org.mockito.Mockito.*;

@ExtendWith(MockitoExtension.class)
public class LiveStreamObserverImplTest {

@Mock
private StreamMediator<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> streamMediator;

@Mock
private StreamObserver<BlockStreamServiceGrpcProto.Block> responseStreamObserver;


@Test
public void testConsumerTimeoutWithinWindow() {
final LiveStreamObserver<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> liveStreamObserver = new LiveStreamObserverImpl(
50,
Clock.systemDefaultZone(),
Clock.systemDefaultZone(),
streamMediator,
responseStreamObserver);
BlockStreamServiceGrpcProto.Block newBlock = BlockStreamServiceGrpcProto.Block.newBuilder().build();
liveStreamObserver.notify(newBlock);

// verify the observer is called with the next
// block and the stream mediator is not unsubscribed
verify(responseStreamObserver).onNext(newBlock);
verify(streamMediator, never()).unsubscribe(liveStreamObserver);
}

@Test
public void testConsumerTimeoutOutsideWindow() throws InterruptedException {
final LiveStreamObserver<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> liveStreamObserver = new LiveStreamObserverImpl(
50,
Clock.systemDefaultZone(),
Clock.systemDefaultZone(),
streamMediator,
responseStreamObserver);

Thread.sleep(51);
BlockStreamServiceGrpcProto.Block newBlock = BlockStreamServiceGrpcProto.Block.newBuilder().build();
when(streamMediator.isSubscribed(liveStreamObserver)).thenReturn(true);
liveStreamObserver.notify(newBlock);
verify(streamMediator).unsubscribe(liveStreamObserver);
}

@Test
public void testProducerTimeoutWithinWindow() {
final LiveStreamObserver<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> liveStreamObserver = new LiveStreamObserverImpl(
50,
Clock.systemDefaultZone(),
Clock.systemDefaultZone(),
streamMediator,
responseStreamObserver);

BlockStreamServiceGrpcProto.BlockResponse blockResponse = BlockStreamServiceGrpcProto.BlockResponse.newBuilder().build();
liveStreamObserver.onNext(blockResponse);

// verify the mediator is NOT called to unsubscribe the observer
verify(streamMediator, never()).unsubscribe(liveStreamObserver);
}

@Test
public void testProducerTimeoutOutsideWindow() throws InterruptedException {
final LiveStreamObserver<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> liveStreamObserver = new LiveStreamObserverImpl(
50,
Clock.systemDefaultZone(),
Clock.systemDefaultZone(),
streamMediator,
responseStreamObserver);

Thread.sleep(51);
BlockStreamServiceGrpcProto.BlockResponse blockResponse = BlockStreamServiceGrpcProto.BlockResponse.newBuilder().build();
liveStreamObserver.onNext(blockResponse);

verify(streamMediator).unsubscribe(liveStreamObserver);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.block.server.mediator;


import com.hedera.block.protos.BlockStreamServiceGrpcProto;
import com.hedera.block.server.consumer.LiveStreamObserver;
import com.hedera.block.server.persistence.WriteThroughCacheHandler;
import com.hedera.block.server.persistence.cache.BlockCache;
import com.hedera.block.server.persistence.storage.BlockStorage;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.verify;

@ExtendWith(MockitoExtension.class)
public class LiveStreamMediatorImplTest {

@Mock
private LiveStreamObserver<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> liveStreamObserver1;

@Mock
private LiveStreamObserver<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> liveStreamObserver2;

@Mock
private LiveStreamObserver<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> liveStreamObserver3;

@Mock
private BlockStorage<BlockStreamServiceGrpcProto.Block> blockStorage;

@Mock
private BlockCache<BlockStreamServiceGrpcProto.Block> blockCache;

@Test
public void testUnsubscribeAll() {

final StreamMediator<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> streamMediator =
new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage, blockCache));

// Set up the subscribers
streamMediator.subscribe(liveStreamObserver1);
streamMediator.subscribe(liveStreamObserver2);
streamMediator.subscribe(liveStreamObserver3);

assertTrue(streamMediator.isSubscribed(liveStreamObserver1), "Expected the mediator to have liveStreamObserver1 subscribed");
assertTrue(streamMediator.isSubscribed(liveStreamObserver2), "Expected the mediator to have liveStreamObserver2 subscribed");
assertTrue(streamMediator.isSubscribed(liveStreamObserver3), "Expected the mediator to have liveStreamObserver3 subscribed");

streamMediator.unsubscribeAll();

assertFalse(streamMediator.isSubscribed(liveStreamObserver1), "Expected the mediator to have unsubscribed liveStreamObserver1");
assertFalse(streamMediator.isSubscribed(liveStreamObserver2), "Expected the mediator to have unsubscribed liveStreamObserver2");
assertFalse(streamMediator.isSubscribed(liveStreamObserver3), "Expected the mediator to have unsubscribed liveStreamObserver3");
}

@Test
public void testUnsubscribeEach() {

final StreamMediator<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> streamMediator =
new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage, blockCache));

// Set up the subscribers
streamMediator.subscribe(liveStreamObserver1);
streamMediator.subscribe(liveStreamObserver2);
streamMediator.subscribe(liveStreamObserver3);

assertTrue(streamMediator.isSubscribed(liveStreamObserver1), "Expected the mediator to have liveStreamObserver1 subscribed");
assertTrue(streamMediator.isSubscribed(liveStreamObserver2), "Expected the mediator to have liveStreamObserver2 subscribed");
assertTrue(streamMediator.isSubscribed(liveStreamObserver3), "Expected the mediator to have liveStreamObserver3 subscribed");

streamMediator.unsubscribe(liveStreamObserver1);
assertFalse(streamMediator.isSubscribed(liveStreamObserver1), "Expected the mediator to have unsubscribed liveStreamObserver1");

streamMediator.unsubscribe(liveStreamObserver2);
assertFalse(streamMediator.isSubscribed(liveStreamObserver2), "Expected the mediator to have unsubscribed liveStreamObserver2");

streamMediator.unsubscribe(liveStreamObserver3);
assertFalse(streamMediator.isSubscribed(liveStreamObserver3), "Expected the mediator to have unsubscribed liveStreamObserver3");
}

@Test
public void testMediatorPersistenceWithoutSubscribers() {

final StreamMediator<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> streamMediator =
new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage, blockCache));

final BlockStreamServiceGrpcProto.Block newBlock = BlockStreamServiceGrpcProto.Block.newBuilder().build();

// Acting as a producer, notify the mediator of a new block
streamMediator.notifyAll(newBlock);

// Confirm the block was persisted to storage and cache
// even though there are no subscribers
verify(blockStorage).write(newBlock);
verify(blockCache).insert(newBlock);
}

@Test
public void testMediatorNotifyAll() {

final StreamMediator<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> streamMediator =
new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage, blockCache));

// Set up the subscribers
streamMediator.subscribe(liveStreamObserver1);
streamMediator.subscribe(liveStreamObserver2);
streamMediator.subscribe(liveStreamObserver3);

assertTrue(streamMediator.isSubscribed(liveStreamObserver1), "Expected the mediator to have liveStreamObserver1 subscribed");
assertTrue(streamMediator.isSubscribed(liveStreamObserver2), "Expected the mediator to have liveStreamObserver2 subscribed");
assertTrue(streamMediator.isSubscribed(liveStreamObserver3), "Expected the mediator to have liveStreamObserver3 subscribed");

final BlockStreamServiceGrpcProto.Block newBlock = BlockStreamServiceGrpcProto.Block.newBuilder().build();

// Acting as a producer, notify the mediator of a new block
streamMediator.notifyAll(newBlock);

// Confirm each subscriber was notified of the new block
verify(liveStreamObserver1).notify(newBlock);
verify(liveStreamObserver2).notify(newBlock);
verify(liveStreamObserver3).notify(newBlock);

// Confirm the block was persisted to storage and cache
verify(blockStorage).write(newBlock);
verify(blockCache).insert(newBlock);
}

}
4 changes: 3 additions & 1 deletion settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ dependencyResolutionManagement {

// Testing only versions
version("org.assertj.core", "3.23.1")
version("org.junit.jupiter.api", "5.10.0")
version("org.junit.jupiter.api", "5.10.2")
version("org.mockito", "5.8.0")
version("org.mockito.junit.jupiter", "5.8.0")
}
}
}
Expand Down

0 comments on commit 48c8d90

Please sign in to comment.