Skip to content

Commit

Permalink
fix: removed unsubscribeall. we do not want a producer to evict all c…
Browse files Browse the repository at this point in the history
…onsumers

Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Jun 27, 2024
1 parent 6d3e617 commit 0418475
Show file tree
Hide file tree
Showing 4 changed files with 5 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ public void subscribe(final LiveStreamObserver<BlockStreamServiceGrpcProto.Block
public void unsubscribe(final LiveStreamObserver<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> liveStreamObserver) {
if (subscribers.remove(liveStreamObserver)) {
LOGGER.log(System.Logger.Level.DEBUG, "Successfully removed observer from subscription list");
} else {
LOGGER.log(System.Logger.Level.ERROR, "Failed to remove observer from subscription list");
}
}

Expand All @@ -80,15 +78,6 @@ public boolean isSubscribed(final LiveStreamObserver<BlockStreamServiceGrpcProto
return subscribers.contains(observer);
}

/**
* Unsubscribe all observers from the mediator
*/
@Override
public void unsubscribeAll() {
LOGGER.log(System.Logger.Level.DEBUG, "Unsubscribing all observers from the mediator");
subscribers.clear();
}

/**
* Notify all observers of a new block
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,6 @@ public interface StreamMediator<U, V> {
*/
boolean isSubscribed(final LiveStreamObserver<U, V> observer);

/**
* Unsubscribes all LiveStreamObservers from the producer
*/
void unsubscribeAll();

/**
* Passes the newly arrived block to all subscribers
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,11 @@ public ProducerBlockStreamObserver(final StreamMediator<BlockStreamServiceGrpcPr
*/
@Override
public void onNext(final BlockStreamServiceGrpcProto.Block block) {

// Notify all the mediator subscribers
streamMediator.notifyAll(block);

// Send a response back to the upstream producer
final BlockStreamServiceGrpcProto.BlockResponse blockResponse = BlockStreamServiceGrpcProto.BlockResponse.newBuilder().setId(block.getId()).build();
responseStreamObserver.onNext(blockResponse);
}
Expand All @@ -69,6 +72,7 @@ public void onNext(final BlockStreamServiceGrpcProto.Block block) {
@Override
public void onError(final Throwable t) {
LOGGER.log(System.Logger.Level.ERROR, "onError method invoked with an exception", t);
responseStreamObserver.onError(t);
}

/**
Expand All @@ -78,7 +82,6 @@ public void onError(final Throwable t) {
@Override
public void onCompleted() {
LOGGER.log(System.Logger.Level.DEBUG, "ProducerBlockStreamObserver completed");
streamMediator.unsubscribeAll();
LOGGER.log(System.Logger.Level.DEBUG, "Unsubscribed all downstream consumers");
responseStreamObserver.onCompleted();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,28 +49,6 @@ public class LiveStreamMediatorImplTest {
@Mock
private BlockCache<BlockStreamServiceGrpcProto.Block> blockCache;

@Test
public void testUnsubscribeAll() {

final StreamMediator<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> streamMediator =
new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage, blockCache));

// Set up the subscribers
streamMediator.subscribe(liveStreamObserver1);
streamMediator.subscribe(liveStreamObserver2);
streamMediator.subscribe(liveStreamObserver3);

assertTrue(streamMediator.isSubscribed(liveStreamObserver1), "Expected the mediator to have liveStreamObserver1 subscribed");
assertTrue(streamMediator.isSubscribed(liveStreamObserver2), "Expected the mediator to have liveStreamObserver2 subscribed");
assertTrue(streamMediator.isSubscribed(liveStreamObserver3), "Expected the mediator to have liveStreamObserver3 subscribed");

streamMediator.unsubscribeAll();

assertFalse(streamMediator.isSubscribed(liveStreamObserver1), "Expected the mediator to have unsubscribed liveStreamObserver1");
assertFalse(streamMediator.isSubscribed(liveStreamObserver2), "Expected the mediator to have unsubscribed liveStreamObserver2");
assertFalse(streamMediator.isSubscribed(liveStreamObserver3), "Expected the mediator to have unsubscribed liveStreamObserver3");
}

@Test
public void testUnsubscribeEach() {

Expand Down

0 comments on commit 0418475

Please sign in to comment.