Skip to content

Commit

Permalink
fix: enable streamvalidator to trigger serverstatus
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Sep 10, 2024
1 parent 2b6ecd7 commit c9f1d26
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.hedera.block.server.validator;

import com.hedera.block.server.ServiceStatus;
import com.hedera.block.server.config.BlockNodeContext;
import com.hedera.block.server.events.BlockNodeEventHandler;
import com.hedera.block.server.events.ObjectEvent;
Expand All @@ -29,20 +30,25 @@
public class StreamValidatorBuilder {
private final BlockWriter<BlockItem> blockWriter;
private final BlockNodeContext blockNodeContext;
private final ServiceStatus serviceStatus;

private SubscriptionHandler<SubscribeStreamResponse> subscriptionHandler;
private Notifier notifier;

private StreamValidatorBuilder(
@NonNull final BlockWriter<BlockItem> blockWriter,
@NonNull final BlockNodeContext blockNodeContext) {
@NonNull final BlockNodeContext blockNodeContext,
@NonNull final ServiceStatus serviceStatus) {
this.blockWriter = blockWriter;
this.blockNodeContext = blockNodeContext;
this.serviceStatus = serviceStatus;
}

public static StreamValidatorBuilder newBuilder(
@NonNull final BlockWriter<BlockItem> blockWriter,
@NonNull final BlockNodeContext blockNodeContext) {
return new StreamValidatorBuilder(blockWriter, blockNodeContext);
@NonNull final BlockNodeContext blockNodeContext,
@NonNull final ServiceStatus serviceStatus) {
return new StreamValidatorBuilder(blockWriter, blockNodeContext, serviceStatus);
}

public StreamValidatorBuilder subscriptionHandler(
Expand All @@ -58,6 +64,6 @@ public StreamValidatorBuilder notifier(@NonNull final Notifier notifier) {

public BlockNodeEventHandler<ObjectEvent<SubscribeStreamResponse>> build() {
return new StreamValidatorImpl(
subscriptionHandler, blockWriter, notifier, blockNodeContext);
subscriptionHandler, blockWriter, notifier, blockNodeContext, serviceStatus);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.hedera.block.server.validator;

import com.hedera.block.server.ServiceStatus;
import com.hedera.block.server.config.BlockNodeContext;
import com.hedera.block.server.events.BlockNodeEventHandler;
import com.hedera.block.server.events.ObjectEvent;
Expand All @@ -38,16 +39,19 @@ public class StreamValidatorImpl
private final BlockWriter<BlockItem> blockWriter;
private final Notifier notifier;
private final MetricsService metricsService;
private final ServiceStatus serviceStatus;

public StreamValidatorImpl(
@NonNull final SubscriptionHandler<SubscribeStreamResponse> subscriptionHandler,
@NonNull final BlockWriter<BlockItem> blockWriter,
@NonNull final Notifier notifier,
@NonNull final BlockNodeContext blockNodeContext) {
@NonNull final BlockNodeContext blockNodeContext,
@NonNull final ServiceStatus serviceStatus) {
this.subscriptionHandler = subscriptionHandler;
this.blockWriter = blockWriter;
this.notifier = notifier;
this.metricsService = blockNodeContext.metricsService();
this.serviceStatus = serviceStatus;
}

@Override
Expand All @@ -64,6 +68,9 @@ public void onEvent(

} catch (IOException e) {

// Trigger the server to stop accepting new requests
serviceStatus.stopRunning(getClass().getName());

// Unsubscribe from the mediator to avoid additional onEvent calls.
subscriptionHandler.unsubscribe(this);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.hedera.block.server.validator;

import com.hedera.block.server.ServiceStatus;
import com.hedera.block.server.config.BlockNodeContext;
import com.hedera.block.server.persistence.storage.write.BlockWriter;
import com.hedera.hapi.block.stream.BlockItem;
Expand All @@ -31,7 +32,8 @@ public interface ValidatorInjectionModule {
@Singleton
static StreamValidatorBuilder providesStreamValidatorBuilder(
@NonNull final BlockWriter<BlockItem> blockWriter,
@NonNull final BlockNodeContext blockNodeContext) {
return StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext);
@NonNull final BlockNodeContext blockNodeContext,
@NonNull final ServiceStatus serviceStatus) {
return StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext, serviceStatus);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public void testPublishBlockStreamRegistrationAndExecution()
final var streamMediator =
LiveStreamMediatorBuilder.newBuilder(blockNodeContext, serviceStatus).build();
final var streamValidatorBuilder =
StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext);
StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext, serviceStatus);
final BlockStreamService blockStreamService =
new BlockStreamService(
streamMediator,
Expand Down Expand Up @@ -270,7 +270,7 @@ public void testSubscribeBlockStream() throws IOException {

// Build the BlockStreamService
final var streamValidatorBuilder =
StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext);
StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext, serviceStatus);
final BlockStreamService blockStreamService =
new BlockStreamService(
streamMediator,
Expand Down Expand Up @@ -724,7 +724,7 @@ private BlockStreamService buildBlockStreamService(
final BlockNodeContext blockNodeContext = TestConfigUtil.getTestBlockNodeContext();

final var streamValidatorBuilder =
StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext);
StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext, serviceStatus);
return new BlockStreamService(
streamMediator,
blockReader,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public void tearDown() {
public void testServiceName() throws IOException {

final var streamValidatorBuilder =
StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext);
StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext, serviceStatus);
final BlockStreamService blockStreamService =
new BlockStreamService(
streamMediator,
Expand All @@ -130,10 +130,10 @@ public void testServiceName() throws IOException {
}

@Test
public void testProto() throws IOException {
public void testProto() {

final var streamValidatorBuilder =
StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext);
StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext, serviceStatus);
final BlockStreamService blockStreamService =
new BlockStreamService(
streamMediator,
Expand All @@ -154,7 +154,7 @@ public void testProto() throws IOException {
void testSingleBlockHappyPath() throws IOException, ParseException {

final var streamValidatorBuilder =
StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext);
StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext, serviceStatus);
final BlockReader<Block> blockReader = BlockAsDirReaderBuilder.newBuilder(config).build();
final BlockStreamService blockStreamService =
new BlockStreamService(
Expand Down Expand Up @@ -215,7 +215,7 @@ void testSingleBlockNotFoundPath() throws IOException, ParseException {

// Call the service
final var streamValidatorBuilder =
StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext);
StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext, serviceStatus);
final BlockStreamService blockStreamService =
new BlockStreamService(
streamMediator,
Expand All @@ -235,7 +235,7 @@ void testSingleBlockNotFoundPath() throws IOException, ParseException {
void testSingleBlockServiceNotAvailable() {

final var streamValidatorBuilder =
StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext);
StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext, serviceStatus);
final BlockStreamService blockStreamService =
new BlockStreamService(
streamMediator,
Expand All @@ -262,7 +262,7 @@ void testSingleBlockServiceNotAvailable() {
@Test
public void testSingleBlockIOExceptionPath() throws IOException, ParseException {
final var streamValidatorBuilder =
StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext);
StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext, serviceStatus);
final BlockStreamService blockStreamService =
new BlockStreamService(
streamMediator,
Expand All @@ -289,7 +289,7 @@ public void testSingleBlockIOExceptionPath() throws IOException, ParseException
@Test
public void testSingleBlockParseExceptionPath() throws IOException, ParseException {
final var streamValidatorBuilder =
StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext);
StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext, serviceStatus);
final BlockStreamService blockStreamService =
new BlockStreamService(
streamMediator,
Expand Down Expand Up @@ -317,7 +317,7 @@ public void testSingleBlockParseExceptionPath() throws IOException, ParseExcepti
public void testUpdateInvokesRoutingWithLambdas() {

final var streamValidatorBuilder =
StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext);
StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext, serviceStatus);
final BlockStreamService blockStreamService =
new BlockStreamService(
streamMediator,
Expand All @@ -344,7 +344,7 @@ public void testProtocParseExceptionHandling() {
// TODO: We might be able to remove this test once we can remove the Translator class

final var streamValidatorBuilder =
StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext);
StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext, serviceStatus);
final BlockStreamService blockStreamService =
new BlockStreamService(
streamMediator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -159,7 +158,12 @@ public void testMediatorPersistenceWithoutSubscribers() throws IOException {
// register the stream validator
when(blockWriter.write(blockItem)).thenReturn(Optional.empty());
final var streamValidator =
new StreamValidatorImpl(streamMediator, blockWriter, notifier, blockNodeContext);
new StreamValidatorImpl(
streamMediator,
blockWriter,
notifier,
blockNodeContext,
new ServiceStatusImpl());
streamMediator.subscribe(streamValidator);

// Acting as a producer, notify the mediator of a new block
Expand Down Expand Up @@ -218,7 +222,12 @@ public void testMediatorPublishEventToSubscribers() throws IOException {
// register the stream validator
when(blockWriter.write(blockItem)).thenReturn(Optional.empty());
final var streamValidator =
new StreamValidatorImpl(streamMediator, blockWriter, notifier, blockNodeContext);
new StreamValidatorImpl(
streamMediator,
blockWriter,
notifier,
blockNodeContext,
new ServiceStatusImpl());
streamMediator.subscribe(streamValidator);

// Acting as a producer, notify the mediator of a new block
Expand Down Expand Up @@ -288,7 +297,12 @@ public void testOnCancelSubscriptionHandling() throws IOException {
// register the stream validator
when(blockWriter.write(blockItems.getFirst())).thenReturn(Optional.empty());
final var streamValidator =
new StreamValidatorImpl(streamMediator, blockWriter, notifier, blockNodeContext);
new StreamValidatorImpl(
streamMediator,
blockWriter,
notifier,
blockNodeContext,
new ServiceStatusImpl());
streamMediator.subscribe(streamValidator);

// register the test observer
Expand Down Expand Up @@ -337,7 +351,12 @@ public void testOnCloseSubscriptionHandling() throws IOException {
// register the stream validator
when(blockWriter.write(blockItems.getFirst())).thenReturn(Optional.empty());
final var streamValidator =
new StreamValidatorImpl(streamMediator, blockWriter, notifier, blockNodeContext);
new StreamValidatorImpl(
streamMediator,
blockWriter,
notifier,
blockNodeContext,
new ServiceStatusImpl());
streamMediator.subscribe(streamValidator);

final var testConsumerBlockItemObserver =
Expand Down Expand Up @@ -372,7 +391,7 @@ public void testOnCloseSubscriptionHandling() throws IOException {
@Test
public void testMediatorBlocksPublishAfterException() throws IOException {

final ServiceStatus serviceStatus = mock(ServiceStatus.class);
final ServiceStatus serviceStatus = new ServiceStatusImpl();
final BlockNodeContext blockNodeContext = TestConfigUtil.getTestBlockNodeContext();
final var streamMediator =
LiveStreamMediatorBuilder.newBuilder(blockNodeContext, serviceStatus).build();
Expand All @@ -399,16 +418,15 @@ public void testMediatorBlocksPublishAfterException() throws IOException {
.blockStreamService(blockStreamService)
.build();
final var streamValidator =
new StreamValidatorImpl(streamMediator, blockWriter, notifier, blockNodeContext);
new StreamValidatorImpl(
streamMediator, blockWriter, notifier, blockNodeContext, serviceStatus);

// Set up the stream verifier
streamMediator.subscribe(streamValidator);

final List<BlockItem> blockItems = generateBlockItems(1);
final BlockItem firstBlockItem = blockItems.getFirst();

when(serviceStatus.isRunning()).thenReturn(true);

// Right now, only a single producer calls publishEvent. In
// that case, they will get an IOException bubbled up to them.
// However, we will need to support multiple producers in the
Expand All @@ -419,8 +437,6 @@ public void testMediatorBlocksPublishAfterException() throws IOException {
streamMediator.publish(firstBlockItem);

verify(blockStreamService, timeout(testTimeout).times(1)).notifyUnrecoverableError();
verify(serviceStatus, timeout(testTimeout).times(1)).isRunning();
verify(serviceStatus, timeout(testTimeout).times(1)).stopRunning(any());

// Confirm the counter was incremented only once
assertEquals(1, blockNodeContext.metricsService().get(LiveBlockItems).get());
Expand All @@ -446,7 +462,7 @@ public void testMediatorBlocksPublishAfterException() throws IOException {
verify(streamObserver2, timeout(testTimeout).times(1)).onNext(fromPbj(endOfStreamResponse));
verify(streamObserver3, timeout(testTimeout).times(1)).onNext(fromPbj(endOfStreamResponse));

// once despite the second block being published.
// verify write method only called once despite the second block being published.
verify(blockWriter, timeout(testTimeout).times(1)).write(firstBlockItem);
}

Expand All @@ -460,7 +476,12 @@ public void testUnsubscribeWhenNotSubscribed() throws IOException {

// register the stream validator
final var streamValidator =
new StreamValidatorImpl(streamMediator, blockWriter, notifier, blockNodeContext);
new StreamValidatorImpl(
streamMediator,
blockWriter,
notifier,
blockNodeContext,
new ServiceStatusImpl());
streamMediator.subscribe(streamValidator);

final var testConsumerBlockItemObserver =
Expand Down

0 comments on commit c9f1d26

Please sign in to comment.