From 0fed3db2c1a68560c50c61336491698eb64685e8 Mon Sep 17 00:00:00 2001 From: Matt Peterson Date: Mon, 29 Jul 2024 14:47:47 -0600 Subject: [PATCH] fix:fixed unit tests Signed-off-by: Matt Peterson --- .../main/java/com/hedera/block/server/Server.java | 12 +++++++++--- .../server/mediator/LiveStreamMediatorImpl.java | 7 +++++-- .../hedera/block/server/BlockStreamServiceIT.java | 13 +++++++++++-- 3 files changed, 25 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/com/hedera/block/server/Server.java b/server/src/main/java/com/hedera/block/server/Server.java index fae460c2b..b71eeb675 100644 --- a/server/src/main/java/com/hedera/block/server/Server.java +++ b/server/src/main/java/com/hedera/block/server/Server.java @@ -62,8 +62,9 @@ public static void main(final String[] args) { Config.global(config); try { + final ServiceStatus serviceStatus = new ServiceStatusImpl(); final StreamMediator> streamMediator = - buildStreamMediator(config); + buildStreamMediator(config, serviceStatus); final BlockStreamService blockStreamService = buildBlockStreamService(config, streamMediator); final GrpcRouting.Builder grpcRouting = buildGrpcRouting(blockStreamService); @@ -74,6 +75,10 @@ public static void main(final String[] args) { blockStreamService.register(webServer); + serviceStatus.setWebServer(webServer); + streamMediator.register(serviceStatus); + + // Start the web server webServer.start(); } catch (IOException e) { @@ -82,12 +87,13 @@ public static void main(final String[] args) { } private static StreamMediator> - buildStreamMediator(final Config config) throws IOException { + buildStreamMediator(final Config config, final ServiceStatus serviceStatus) throws IOException { return new LiveStreamMediatorImpl( new ConcurrentHashMap<>(32), new FileSystemPersistenceHandler( new BlockAsDirReader(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config), - new BlockAsDirWriter(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config))); + new BlockAsDirWriter(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config)), + serviceStatus); } private static GrpcRouting.Builder buildGrpcRouting( diff --git a/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java b/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java index 859bc3aaa..3c0c59511 100644 --- a/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java +++ b/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java @@ -17,6 +17,7 @@ package com.hedera.block.server.mediator; import com.hedera.block.server.ServiceStatus; +import com.hedera.block.server.ServiceStatusImpl; import com.hedera.block.server.consumer.BlockItemEventHandler; import com.hedera.block.server.data.ObjectEvent; import com.hedera.block.server.persistence.BlockPersistenceHandler; @@ -67,7 +68,8 @@ public LiveStreamMediatorImpl( BlockItemEventHandler>, BatchEventProcessor>> subscribers, - final BlockPersistenceHandler blockPersistenceHandler) { + final BlockPersistenceHandler blockPersistenceHandler, + final ServiceStatus serviceStatus) { this.subscribers = subscribers; this.blockPersistenceHandler = blockPersistenceHandler; @@ -77,11 +79,12 @@ public LiveStreamMediatorImpl( new Disruptor<>(ObjectEvent::new, 1024, DaemonThreadFactory.INSTANCE); this.ringBuffer = disruptor.start(); this.executor = Executors.newCachedThreadPool(DaemonThreadFactory.INSTANCE); + this.serviceStatus = serviceStatus; } public LiveStreamMediatorImpl( final BlockPersistenceHandler blockPersistenceHandler) { - this(new ConcurrentHashMap<>(), blockPersistenceHandler); + this(new ConcurrentHashMap<>(), blockPersistenceHandler, new ServiceStatusImpl()); } @Override diff --git a/server/src/test/java/com/hedera/block/server/BlockStreamServiceIT.java b/server/src/test/java/com/hedera/block/server/BlockStreamServiceIT.java index a9ab76414..c5bf89e69 100644 --- a/server/src/test/java/com/hedera/block/server/BlockStreamServiceIT.java +++ b/server/src/test/java/com/hedera/block/server/BlockStreamServiceIT.java @@ -145,10 +145,15 @@ public void testPublishBlockStreamRegistrationAndExecution() @Test public void testSubscribeBlockStream() throws InterruptedException { + + final ServiceStatus serviceStatus = new ServiceStatusImpl(); + serviceStatus.setWebServer(webServer); + final var streamMediator = new LiveStreamMediatorImpl( new HashMap<>(), - new FileSystemPersistenceHandler(blockReader, blockWriter)); + new FileSystemPersistenceHandler(blockReader, blockWriter), + serviceStatus); // Build the BlockStreamService final BlockStreamService blockStreamService = @@ -520,8 +525,12 @@ private StreamMediator> buildStr new BlockAsDirReader(JUNIT, testConfig, blockRemover, filePerms); final BlockWriter blockWriter = new BlockAsDirWriter(JUNIT, testConfig, blockRemover, filePerms); + + final ServiceStatus serviceStatus = new ServiceStatusImpl(); + serviceStatus.setWebServer(webServer); return new LiveStreamMediatorImpl( - subscribers, new FileSystemPersistenceHandler(blockReader, blockWriter)); + subscribers, new FileSystemPersistenceHandler(blockReader, blockWriter), + serviceStatus); } private BlockStreamService buildBlockStreamService() throws IOException {