Skip to content

Commit

Permalink
fix: removing this prefixes when they are not needed
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Jun 25, 2024
1 parent d60a907 commit 2f51579
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ public class LiveStreamObserverImpl implements LiveStreamObserver<BlockStreamSer
*
*/
public LiveStreamObserverImpl(final long timeoutThresholdMillis,
StreamMediator<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> mediator,
StreamObserver<BlockStreamServiceGrpcProto.Block> responseStreamObserver) {
final StreamMediator<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> mediator,
final StreamObserver<BlockStreamServiceGrpcProto.Block> responseStreamObserver) {

this.mediator = mediator;
this.responseStreamObserver = responseStreamObserver;
Expand All @@ -65,14 +65,14 @@ public LiveStreamObserverImpl(final long timeoutThresholdMillis,
@Override
public void notify(final BlockStreamServiceGrpcProto.Block block) {

if (System.currentTimeMillis() - this.consumerLivenessMillis > timeoutThresholdMillis) {
if (System.currentTimeMillis() - consumerLivenessMillis > timeoutThresholdMillis) {
if (mediator.isSubscribed(this)) {
LOGGER.log(System.Logger.Level.DEBUG, "Consumer timeout threshold exceeded. Unsubscribing observer.");
mediator.unsubscribe(this);
}
} else {
this.producerLivenessMillis = System.currentTimeMillis();
this.responseStreamObserver.onNext(block);
producerLivenessMillis = System.currentTimeMillis();
responseStreamObserver.onNext(block);
}
}

Expand All @@ -84,14 +84,14 @@ public void notify(final BlockStreamServiceGrpcProto.Block block) {
@Override
public void onNext(final BlockStreamServiceGrpcProto.BlockResponse blockResponse) {

if (System.currentTimeMillis() - this.producerLivenessMillis > timeoutThresholdMillis) {
if (System.currentTimeMillis() - producerLivenessMillis > timeoutThresholdMillis) {
if (mediator.isSubscribed(this)) {
LOGGER.log(System.Logger.Level.DEBUG, "Producer timeout threshold exceeded. Unsubscribing observer.");
mediator.unsubscribe(this);
}
} else {
LOGGER.log(System.Logger.Level.DEBUG, "Received response block " + blockResponse);
this.consumerLivenessMillis = System.currentTimeMillis();
consumerLivenessMillis = System.currentTimeMillis();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
* LiveStreamMediatorImpl is the implementation of the StreamMediator interface. It is responsible for managing
* the subscription and unsubscription operations of downstream consumers. It also proxies new blocks
* to the subscribers as they arrive and persists the blocks to the block persistence store.
*
*/
public class LiveStreamMediatorImpl implements StreamMediator<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> {

Expand All @@ -56,7 +55,7 @@ public LiveStreamMediatorImpl(final BlockPersistenceHandler<BlockStreamServiceGr
*/
@Override
public void subscribe(final LiveStreamObserver<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> liveStreamObserver) {
this.subscribers.add(liveStreamObserver);
subscribers.add(liveStreamObserver);
}

/**
Expand All @@ -66,7 +65,7 @@ public void subscribe(final LiveStreamObserver<BlockStreamServiceGrpcProto.Block
*/
@Override
public void unsubscribe(final LiveStreamObserver<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> liveStreamObserver) {
if (this.subscribers.remove(liveStreamObserver)) {
if (subscribers.remove(liveStreamObserver)) {
LOGGER.log(System.Logger.Level.DEBUG, "Successfully removed observer from subscription list");
} else {
LOGGER.log(System.Logger.Level.ERROR, "Failed to remove observer from subscription list");
Expand All @@ -81,15 +80,15 @@ public void unsubscribe(final LiveStreamObserver<BlockStreamServiceGrpcProto.Blo
*/
@Override
public boolean isSubscribed(final LiveStreamObserver<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> observer) {
return this.subscribers.contains(observer);
return subscribers.contains(observer);
}

/**
* Unsubscribe all observers from the mediator
*/
@Override
public void unsubscribeAll() {
this.subscribers.clear();
subscribers.clear();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void onNext(final BlockStreamServiceGrpcProto.Block block) {
streamMediator.notifyAll(block);

final BlockStreamServiceGrpcProto.BlockResponse blockResponse = BlockStreamServiceGrpcProto.BlockResponse.newBuilder().setId(block.getId()).build();
this.responseStreamObserver.onNext(blockResponse);
responseStreamObserver.onNext(blockResponse);
}

/**
Expand All @@ -82,7 +82,7 @@ public void onError(final Throwable t) {
@Override
public void onCompleted() {
LOGGER.log(System.Logger.Level.DEBUG, "ProducerBlockStreamObserver completed");
this.streamMediator.unsubscribeAll();
streamMediator.unsubscribeAll();
LOGGER.log(System.Logger.Level.DEBUG, "Unsubscribed all downstream consumers");
}
}

0 comments on commit 2f51579

Please sign in to comment.