diff --git a/server/src/main/java/com/hedera/block/server/consumer/LiveStreamObserverImpl.java b/server/src/main/java/com/hedera/block/server/consumer/LiveStreamObserverImpl.java index 3b7d6bd3c..241c33055 100644 --- a/server/src/main/java/com/hedera/block/server/consumer/LiveStreamObserverImpl.java +++ b/server/src/main/java/com/hedera/block/server/consumer/LiveStreamObserverImpl.java @@ -46,8 +46,8 @@ public class LiveStreamObserverImpl implements LiveStreamObserver mediator, - StreamObserver responseStreamObserver) { + final StreamMediator mediator, + final StreamObserver responseStreamObserver) { this.mediator = mediator; this.responseStreamObserver = responseStreamObserver; @@ -65,14 +65,14 @@ public LiveStreamObserverImpl(final long timeoutThresholdMillis, @Override public void notify(final BlockStreamServiceGrpcProto.Block block) { - if (System.currentTimeMillis() - this.consumerLivenessMillis > timeoutThresholdMillis) { + if (System.currentTimeMillis() - consumerLivenessMillis > timeoutThresholdMillis) { if (mediator.isSubscribed(this)) { LOGGER.log(System.Logger.Level.DEBUG, "Consumer timeout threshold exceeded. Unsubscribing observer."); mediator.unsubscribe(this); } } else { - this.producerLivenessMillis = System.currentTimeMillis(); - this.responseStreamObserver.onNext(block); + producerLivenessMillis = System.currentTimeMillis(); + responseStreamObserver.onNext(block); } } @@ -84,14 +84,14 @@ public void notify(final BlockStreamServiceGrpcProto.Block block) { @Override public void onNext(final BlockStreamServiceGrpcProto.BlockResponse blockResponse) { - if (System.currentTimeMillis() - this.producerLivenessMillis > timeoutThresholdMillis) { + if (System.currentTimeMillis() - producerLivenessMillis > timeoutThresholdMillis) { if (mediator.isSubscribed(this)) { LOGGER.log(System.Logger.Level.DEBUG, "Producer timeout threshold exceeded. Unsubscribing observer."); mediator.unsubscribe(this); } } else { LOGGER.log(System.Logger.Level.DEBUG, "Received response block " + blockResponse); - this.consumerLivenessMillis = System.currentTimeMillis(); + consumerLivenessMillis = System.currentTimeMillis(); } } diff --git a/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java b/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java index c5af1a99d..87f754f64 100644 --- a/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java +++ b/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java @@ -30,7 +30,6 @@ * LiveStreamMediatorImpl is the implementation of the StreamMediator interface. It is responsible for managing * the subscription and unsubscription operations of downstream consumers. It also proxies new blocks * to the subscribers as they arrive and persists the blocks to the block persistence store. - * */ public class LiveStreamMediatorImpl implements StreamMediator { @@ -56,7 +55,7 @@ public LiveStreamMediatorImpl(final BlockPersistenceHandler liveStreamObserver) { - this.subscribers.add(liveStreamObserver); + subscribers.add(liveStreamObserver); } /** @@ -66,7 +65,7 @@ public void subscribe(final LiveStreamObserver liveStreamObserver) { - if (this.subscribers.remove(liveStreamObserver)) { + if (subscribers.remove(liveStreamObserver)) { LOGGER.log(System.Logger.Level.DEBUG, "Successfully removed observer from subscription list"); } else { LOGGER.log(System.Logger.Level.ERROR, "Failed to remove observer from subscription list"); @@ -81,7 +80,7 @@ public void unsubscribe(final LiveStreamObserver observer) { - return this.subscribers.contains(observer); + return subscribers.contains(observer); } /** @@ -89,7 +88,7 @@ public boolean isSubscribed(final LiveStreamObserver