Skip to content

Commit

Permalink
fix: added test coverage
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Jul 23, 2024
1 parent d82ccc6 commit 606562d
Show file tree
Hide file tree
Showing 4 changed files with 279 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void update(final Routing routing) {
routing.serverStream(SERVER_STREAMING_METHOD_NAME, this::subscribeBlockStream);
}

private StreamObserver<PublishStreamRequest> publishBlockStream(
StreamObserver<PublishStreamRequest> publishBlockStream(
final StreamObserver<PublishStreamResponse> publishStreamResponseObserver) {
LOGGER.log(
System.Logger.Level.DEBUG,
Expand All @@ -104,7 +104,7 @@ private StreamObserver<PublishStreamRequest> publishBlockStream(
streamMediator, publishStreamResponseObserver, itemAckBuilder);
}

private void subscribeBlockStream(
void subscribeBlockStream(
final SubscribeStreamRequest subscribeStreamRequest,
final StreamObserver<SubscribeStreamResponse> subscribeStreamResponseObserver) {
LOGGER.log(
Expand Down
4 changes: 2 additions & 2 deletions server/src/main/java/com/hedera/block/server/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ public class Server {

// Function stubs to satisfy the routing param signatures. The implementations are in the
// service class.
private static ServerCalls.BidiStreamingMethod<
static ServerCalls.BidiStreamingMethod<
StreamObserver<PublishStreamRequest>, StreamObserver<PublishStreamResponse>>
clientBidiStreamingMethod;

public static ServerCalls.ServerStreamingMethod<
static ServerCalls.ServerStreamingMethod<
SubscribeStreamRequest, StreamObserver<SubscribeStreamResponse>>
serverStreamingMethod;

Expand Down
220 changes: 220 additions & 0 deletions server/src/test/java/com/hedera/block/server/BlockStreamServiceIT.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.block.server;

import static com.hedera.block.protos.BlockStreamService.*;
import static com.hedera.block.protos.BlockStreamService.PublishStreamResponse.ItemAcknowledgement;
import static com.hedera.block.server.util.PersistTestUtils.generateBlockItems;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import com.google.protobuf.Descriptors;
import com.hedera.block.server.data.ObjectEvent;
import com.hedera.block.server.mediator.LiveStreamMediatorImpl;
import com.hedera.block.server.mediator.StreamMediator;
import com.hedera.block.server.persistence.WriteThroughCacheHandler;
import com.hedera.block.server.persistence.storage.BlockAsDirReader;
import com.hedera.block.server.persistence.storage.BlockAsDirWriter;
import com.hedera.block.server.persistence.storage.BlockReader;
import com.hedera.block.server.persistence.storage.BlockWriter;
import com.hedera.block.server.producer.ItemAckBuilder;
import com.hedera.block.server.util.TestUtils;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

import io.helidon.config.Config;
import io.helidon.config.MapConfigSource;
import io.helidon.config.spi.ConfigSource;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(MockitoExtension.class)
public class BlockStreamServiceIT {

private final System.Logger LOGGER = System.getLogger(getClass().getName());
private final Object lock = new Object();

@Mock private StreamMediator<ObjectEvent<BlockItem>, BlockItem> streamMediator;

@Mock private StreamObserver<PublishStreamResponse> publishStreamResponseObserver;

@Mock private SubscribeStreamRequest subscribeStreamRequest;

@Mock private StreamObserver<SubscribeStreamResponse> subscribeStreamObserver1;
@Mock private StreamObserver<SubscribeStreamResponse> subscribeStreamObserver2;
@Mock private StreamObserver<SubscribeStreamResponse> subscribeStreamObserver3;

@Mock private BlockReader<Block> blockReader;
@Mock private BlockWriter<BlockItem> blockWriter;
@Mock private Consumer<StreamMediator<ObjectEvent<BlockItem>, BlockItem>> testCallback;

private static final String TEMP_DIR = "block-node-unit-test-dir";
private static final String JUNIT = "my-junit-test";

private Path testPath;
private Config testConfig;

@BeforeEach
public void setUp() throws IOException {
testPath = Files.createTempDirectory(TEMP_DIR);
LOGGER.log(System.Logger.Level.INFO, "Created temp directory: " + testPath.toString());

Map<String, String> testProperties = Map.of(JUNIT, testPath.toString());
ConfigSource testConfigSource = MapConfigSource.builder().map(testProperties).build();
testConfig = Config.builder(testConfigSource).build();
}

@AfterEach
public void tearDown() {
TestUtils.deleteDirectory(testPath.toFile());
}

@Test
public void testPublishBlockStreamRegistrationAndExec()
throws InterruptedException, IOException, NoSuchAlgorithmException {

final BlockStreamService blockStreamService =
new BlockStreamService(50L, new ItemAckBuilder(), streamMediator);

final StreamObserver<PublishStreamRequest> streamObserver =
blockStreamService.publishBlockStream(publishStreamResponseObserver);

final BlockItem blockItem = generateBlockItems(1).get(0);
final PublishStreamRequest publishStreamRequest =
PublishStreamRequest.newBuilder().setBlockItem(blockItem).build();

// Calling onNext() as Helidon will
streamObserver.onNext(publishStreamRequest);

synchronized (lock) {
lock.wait(50);
}

final ItemAcknowledgement itemAck = new ItemAckBuilder().buildAck(blockItem);
final PublishStreamResponse publishStreamResponse =
PublishStreamResponse.newBuilder().setAcknowledgement(itemAck).build();

// Verify the BlockItem message is sent to the mediator
verify(streamMediator, times(1)).publishEvent(blockItem);

// Verify our custom StreamObserver implementation builds and sends
// a response back to the producer
verify(publishStreamResponseObserver, times(1)).onNext(publishStreamResponse);

// Close the stream as Helidon does
streamObserver.onCompleted();

synchronized (lock) {
lock.wait(50);
}

// verify the onCompleted() method is invoked on the wrapped StreamObserver
verify(publishStreamResponseObserver, times(1)).onCompleted();
}

@Test
public void testSubscribeBlockStream() throws InterruptedException {
final var streamMediator =
new LiveStreamMediatorImpl(
new WriteThroughCacheHandler(blockReader, blockWriter), testCallback);

// Build the BlockStreamService
final BlockStreamService blockStreamService =
new BlockStreamService(1000L, new ItemAckBuilder(), streamMediator);

// Subscribe the consumers
blockStreamService.subscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver1);
blockStreamService.subscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver2);
blockStreamService.subscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver3);

// Subscribe the producer
final StreamObserver<PublishStreamRequest> streamObserver =
blockStreamService.publishBlockStream(publishStreamResponseObserver);

// Build the BlockItem
final List<BlockItem> blockItems = generateBlockItems(1);
final PublishStreamRequest publishStreamRequest =
PublishStreamRequest.newBuilder().setBlockItem(blockItems.get(0)).build();

// Calling onNext() with a BlockItem
streamObserver.onNext(publishStreamRequest);

synchronized (lock) {
lock.wait(50);
}

final SubscribeStreamResponse subscribeStreamResponse =
SubscribeStreamResponse.newBuilder().setBlockItem(blockItems.get(0)).build();

verify(subscribeStreamObserver1, times(1)).onNext(subscribeStreamResponse);
verify(subscribeStreamObserver2, times(1)).onNext(subscribeStreamResponse);
verify(subscribeStreamObserver3, times(1)).onNext(subscribeStreamResponse);

}

@Test
public void testFullHappyPath() throws IOException, InterruptedException {

// Initialize with concrete a concrete BlockReader, BlockWriter and Mediator
final BlockReader<Block> blockReader = new BlockAsDirReader(JUNIT, testConfig);
final BlockWriter<BlockItem> blockWriter = new BlockAsDirWriter(JUNIT, testConfig);
final var streamMediator =
new LiveStreamMediatorImpl(
new WriteThroughCacheHandler(blockReader, blockWriter), testCallback);

// Build the BlockStreamService
final BlockStreamService blockStreamService =
new BlockStreamService(1000L, new ItemAckBuilder(), streamMediator);

// Pass a StreamObserver to the producer as Helidon will
final StreamObserver<PublishStreamRequest> streamObserver =
blockStreamService.publishBlockStream(publishStreamResponseObserver);

final List<BlockItem> blockItems = generateBlockItems(1);

blockStreamService.subscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver1);
blockStreamService.subscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver2);
blockStreamService.subscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver3);

final PublishStreamRequest publishStreamRequest =
PublishStreamRequest.newBuilder().setBlockItem(blockItems.get(0)).build();
streamObserver.onNext(publishStreamRequest);

synchronized (lock) {
lock.wait(50);
}

final SubscribeStreamResponse subscribeStreamResponse =
SubscribeStreamResponse.newBuilder().setBlockItem(blockItems.get(0)).build();
verify(subscribeStreamObserver1, times(1)).onNext(subscribeStreamResponse);
verify(subscribeStreamObserver2, times(1)).onNext(subscribeStreamResponse);
verify(subscribeStreamObserver3, times(1)).onNext(subscribeStreamResponse);

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.hedera.block.server;

import com.google.protobuf.Descriptors;
import com.hedera.block.server.data.ObjectEvent;
import com.hedera.block.server.mediator.StreamMediator;
import com.hedera.block.server.producer.ItemAckBuilder;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import java.io.IOException;
import java.security.NoSuchAlgorithmException;

import static com.hedera.block.protos.BlockStreamService.BlockItem;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;

@ExtendWith(MockitoExtension.class)
public class BlockStreamServiceTest {

@Mock
private ItemAckBuilder itemAckBuilder;

@Mock private StreamMediator<ObjectEvent<com.hedera.block.protos.BlockStreamService.BlockItem>, com.hedera.block.protos.BlockStreamService.BlockItem> streamMediator;

@Test
public void testServiceName() throws IOException, NoSuchAlgorithmException {
final BlockStreamService blockStreamService =
new BlockStreamService(50L, itemAckBuilder, streamMediator);

// Verify the service name
assertEquals(Constants.SERVICE_NAME, blockStreamService.serviceName());

// Verify other methods not invoked
verify(itemAckBuilder, never()).buildAck(any(BlockItem.class));
verify(streamMediator, never()).publishEvent(any(BlockItem.class));
}

@Test
public void testProto() throws IOException, NoSuchAlgorithmException {
final BlockStreamService blockStreamService =
new BlockStreamService(50L, itemAckBuilder, streamMediator);
Descriptors.FileDescriptor fileDescriptor = blockStreamService.proto();

// Verify the current rpc methods
assertEquals(2, fileDescriptor.getServices().get(0).getMethods().size());

// Verify other methods not invoked
verify(itemAckBuilder, never()).buildAck(any(BlockItem.class));
verify(streamMediator, never()).publishEvent(any(BlockItem.class));
}
}

0 comments on commit 606562d

Please sign in to comment.