diff --git a/server/src/main/java/com/hedera/block/server/BlockStreamService.java b/server/src/main/java/com/hedera/block/server/BlockStreamService.java index f3740e997..aa3ae2d55 100644 --- a/server/src/main/java/com/hedera/block/server/BlockStreamService.java +++ b/server/src/main/java/com/hedera/block/server/BlockStreamService.java @@ -62,7 +62,7 @@ public BlockStreamService( this.timeoutThresholdMillis = timeoutThresholdMillis; this.itemAckBuilder = itemAckBuilder; this.streamMediator = streamMediator; - this.serviceStatus = new ServiceStatus(); + this.serviceStatus = new ServiceStatusImpl(); } /** diff --git a/server/src/main/java/com/hedera/block/server/ServiceStatus.java b/server/src/main/java/com/hedera/block/server/ServiceStatus.java index c43397142..b5f0e8f0c 100644 --- a/server/src/main/java/com/hedera/block/server/ServiceStatus.java +++ b/server/src/main/java/com/hedera/block/server/ServiceStatus.java @@ -1,45 +1,13 @@ -/* - * Copyright (C) 2024 Hedera Hashgraph, LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package com.hedera.block.server; import io.helidon.webserver.WebServer; -public class ServiceStatus { - private boolean isRunning; - private WebServer webServer; - - public ServiceStatus() { - this.isRunning = true; - } - - public boolean isRunning() { - return isRunning; - } +public interface ServiceStatus { + boolean isRunning(); - public void setRunning(final boolean running) { - isRunning = running; - } + void setRunning(final boolean running); - public void setWebServer(final WebServer webServer) { - this.webServer = webServer; - } + void setWebServer(final WebServer webServer); - public void stopService() { - isRunning = false; - webServer.stop(); - } + void stopService(); } diff --git a/server/src/main/java/com/hedera/block/server/ServiceStatusImpl.java b/server/src/main/java/com/hedera/block/server/ServiceStatusImpl.java new file mode 100644 index 000000000..955d459a8 --- /dev/null +++ b/server/src/main/java/com/hedera/block/server/ServiceStatusImpl.java @@ -0,0 +1,43 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.server; + +import io.helidon.webserver.WebServer; + +import java.util.concurrent.atomic.AtomicBoolean; + +public class ServiceStatusImpl implements ServiceStatus { + private final AtomicBoolean isRunning = new AtomicBoolean(true); + private WebServer webServer; + + public boolean isRunning() { + return isRunning.get(); + } + + public void setRunning(final boolean running) { + isRunning.set(running); + } + + public void setWebServer(final WebServer webServer) { + this.webServer = webServer; + } + + public void stopService() { + isRunning.set(false); + webServer.stop(); + } +} diff --git a/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java b/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java index 94a9e5b3b..859bc3aaa 100644 --- a/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java +++ b/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java @@ -16,8 +16,7 @@ package com.hedera.block.server.mediator; -import static com.hedera.block.protos.BlockStreamService.*; - +import com.hedera.block.server.ServiceStatus; import com.hedera.block.server.consumer.BlockItemEventHandler; import com.hedera.block.server.data.ObjectEvent; import com.hedera.block.server.persistence.BlockPersistenceHandler; @@ -26,12 +25,14 @@ import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.util.DaemonThreadFactory; + import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicBoolean; + +import static com.hedera.block.protos.BlockStreamService.*; /** * LiveStreamMediatorImpl is the implementation of the StreamMediator interface. It is responsible @@ -54,7 +55,7 @@ public class LiveStreamMediatorImpl private final BlockPersistenceHandler blockPersistenceHandler; - private final AtomicBoolean isPublishing = new AtomicBoolean(true); + private ServiceStatus serviceStatus; /** * Constructor for the LiveStreamMediatorImpl class. @@ -87,7 +88,7 @@ public LiveStreamMediatorImpl( public void publishEvent(final BlockItem blockItem) throws IOException { try { - if (isPublishing.get()) { + if (serviceStatus.isRunning()) { // Publish the block for all subscribers to receive LOGGER.log(System.Logger.Level.INFO, "Publishing BlockItem: {0}", blockItem); @@ -103,7 +104,7 @@ public void publishEvent(final BlockItem blockItem) throws IOException { } } catch (IOException e) { // Disable publishing BlockItems from upstream producers - isPublishing.set(false); + serviceStatus.setRunning(false); LOGGER.log( System.Logger.Level.ERROR, "An exception occurred while attempting to persist the BlockItem: {0}", @@ -128,7 +129,7 @@ public void publishEvent(final BlockItem blockItem) throws IOException { @Override public boolean isPublishing() { - return isPublishing.get(); + return serviceStatus.isRunning(); } @Override @@ -178,4 +179,9 @@ private static SubscribeStreamResponse buildEndStreamResponse() { // TODO: Replace READ_STREAM_SUCCESS (2) with a generic error code? return SubscribeStreamResponse.newBuilder().setStatus(2).build(); } + + @Override + public void register(final ServiceStatus serviceStatus) { + this.serviceStatus = serviceStatus; + } } diff --git a/server/src/main/java/com/hedera/block/server/mediator/StreamMediator.java b/server/src/main/java/com/hedera/block/server/mediator/StreamMediator.java index 65c84a360..8eaf00498 100644 --- a/server/src/main/java/com/hedera/block/server/mediator/StreamMediator.java +++ b/server/src/main/java/com/hedera/block/server/mediator/StreamMediator.java @@ -16,7 +16,9 @@ package com.hedera.block.server.mediator; +import com.hedera.block.server.ServiceStatus; import com.hedera.block.server.consumer.BlockItemEventHandler; + import java.io.IOException; /** @@ -41,4 +43,6 @@ public interface StreamMediator { void unsubscribe(final BlockItemEventHandler handler); boolean isSubscribed(final BlockItemEventHandler handler); + + void register(final ServiceStatus serviceStatus); } diff --git a/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java b/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java index 9aa0cb810..a27da3662 100644 --- a/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java +++ b/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java @@ -16,16 +16,17 @@ package com.hedera.block.server.producer; -import static com.hedera.block.protos.BlockStreamService.*; -import static com.hedera.block.protos.BlockStreamService.PublishStreamResponse.*; - import com.hedera.block.server.ServiceStatus; import com.hedera.block.server.data.ObjectEvent; import com.hedera.block.server.mediator.StreamMediator; import io.grpc.stub.StreamObserver; + import java.io.IOException; import java.security.NoSuchAlgorithmException; +import static com.hedera.block.protos.BlockStreamService.*; +import static com.hedera.block.protos.BlockStreamService.PublishStreamResponse.*; + /** * The ProducerBlockStreamObserver class plugs into Helidon's server-initiated bidirectional gRPC * service implementation. Helidon calls methods on this class as networking events occur with the