Skip to content

Commit

Permalink
wip: used an interface
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Jul 29, 2024
1 parent 3385041 commit bf9c3da
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public BlockStreamService(
this.timeoutThresholdMillis = timeoutThresholdMillis;
this.itemAckBuilder = itemAckBuilder;
this.streamMediator = streamMediator;
this.serviceStatus = new ServiceStatus();
this.serviceStatus = new ServiceStatusImpl();
}

/**
Expand Down
42 changes: 5 additions & 37 deletions server/src/main/java/com/hedera/block/server/ServiceStatus.java
Original file line number Diff line number Diff line change
@@ -1,45 +1,13 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.block.server;

import io.helidon.webserver.WebServer;

public class ServiceStatus {
private boolean isRunning;
private WebServer webServer;

public ServiceStatus() {
this.isRunning = true;
}

public boolean isRunning() {
return isRunning;
}
public interface ServiceStatus {
boolean isRunning();

public void setRunning(final boolean running) {
isRunning = running;
}
void setRunning(final boolean running);

public void setWebServer(final WebServer webServer) {
this.webServer = webServer;
}
void setWebServer(final WebServer webServer);

public void stopService() {
isRunning = false;
webServer.stop();
}
void stopService();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.block.server;

import io.helidon.webserver.WebServer;

import java.util.concurrent.atomic.AtomicBoolean;

public class ServiceStatusImpl implements ServiceStatus {
private final AtomicBoolean isRunning = new AtomicBoolean(true);
private WebServer webServer;

public boolean isRunning() {
return isRunning.get();
}

public void setRunning(final boolean running) {
isRunning.set(running);
}

public void setWebServer(final WebServer webServer) {
this.webServer = webServer;
}

public void stopService() {
isRunning.set(false);
webServer.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@

package com.hedera.block.server.mediator;

import static com.hedera.block.protos.BlockStreamService.*;

import com.hedera.block.server.ServiceStatus;
import com.hedera.block.server.consumer.BlockItemEventHandler;
import com.hedera.block.server.data.ObjectEvent;
import com.hedera.block.server.persistence.BlockPersistenceHandler;
Expand All @@ -26,12 +25,14 @@
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.hedera.block.protos.BlockStreamService.*;

/**
* LiveStreamMediatorImpl is the implementation of the StreamMediator interface. It is responsible
Expand All @@ -54,7 +55,7 @@ public class LiveStreamMediatorImpl

private final BlockPersistenceHandler<BlockItem, Block> blockPersistenceHandler;

private final AtomicBoolean isPublishing = new AtomicBoolean(true);
private ServiceStatus serviceStatus;

/**
* Constructor for the LiveStreamMediatorImpl class.
Expand Down Expand Up @@ -87,7 +88,7 @@ public LiveStreamMediatorImpl(
public void publishEvent(final BlockItem blockItem) throws IOException {

try {
if (isPublishing.get()) {
if (serviceStatus.isRunning()) {

// Publish the block for all subscribers to receive
LOGGER.log(System.Logger.Level.INFO, "Publishing BlockItem: {0}", blockItem);
Expand All @@ -103,7 +104,7 @@ public void publishEvent(final BlockItem blockItem) throws IOException {
}
} catch (IOException e) {
// Disable publishing BlockItems from upstream producers
isPublishing.set(false);
serviceStatus.setRunning(false);
LOGGER.log(
System.Logger.Level.ERROR,
"An exception occurred while attempting to persist the BlockItem: {0}",
Expand All @@ -128,7 +129,7 @@ public void publishEvent(final BlockItem blockItem) throws IOException {

@Override
public boolean isPublishing() {
return isPublishing.get();
return serviceStatus.isRunning();
}

@Override
Expand Down Expand Up @@ -178,4 +179,9 @@ private static SubscribeStreamResponse buildEndStreamResponse() {
// TODO: Replace READ_STREAM_SUCCESS (2) with a generic error code?
return SubscribeStreamResponse.newBuilder().setStatus(2).build();
}

@Override
public void register(final ServiceStatus serviceStatus) {
this.serviceStatus = serviceStatus;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

package com.hedera.block.server.mediator;

import com.hedera.block.server.ServiceStatus;
import com.hedera.block.server.consumer.BlockItemEventHandler;

import java.io.IOException;

/**
Expand All @@ -41,4 +43,6 @@ public interface StreamMediator<U, V> {
void unsubscribe(final BlockItemEventHandler<V> handler);

boolean isSubscribed(final BlockItemEventHandler<V> handler);

void register(final ServiceStatus serviceStatus);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@

package com.hedera.block.server.producer;

import static com.hedera.block.protos.BlockStreamService.*;
import static com.hedera.block.protos.BlockStreamService.PublishStreamResponse.*;

import com.hedera.block.server.ServiceStatus;
import com.hedera.block.server.data.ObjectEvent;
import com.hedera.block.server.mediator.StreamMediator;
import io.grpc.stub.StreamObserver;

import java.io.IOException;
import java.security.NoSuchAlgorithmException;

import static com.hedera.block.protos.BlockStreamService.*;
import static com.hedera.block.protos.BlockStreamService.PublishStreamResponse.*;

/**
* The ProducerBlockStreamObserver class plugs into Helidon's server-initiated bidirectional gRPC
* service implementation. Helidon calls methods on this class as networking events occur with the
Expand Down

0 comments on commit bf9c3da

Please sign in to comment.