Skip to content

Commit

Permalink
fix: fixed error and debug messages
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Aug 22, 2024
1 parent 9d64c30 commit 05475fc
Showing 1 changed file with 16 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ public class ConsumerStreamResponseObserver
private final ResponseSender statusResponseSender = new StatusResponseSender();
private final ResponseSender blockItemResponseSender = new BlockItemResponseSender();

private static final String PROTOCOL_VIOLATION_MESSAGE =
"Protocol Violation. %s is OneOf type %s but %s is null.\n%s";

/**
* The onCancel handler to execute when the consumer cancels the stream. This handler is
* protected to facilitate testing.
Expand Down Expand Up @@ -153,7 +156,9 @@ public void onEvent(
final long currentMillis = producerLivenessClock.millis();
if (currentMillis - producerLivenessMillis > timeoutThresholdMillis) {
subscriptionHandler.unsubscribe(this);
LOGGER.log(DEBUG, "Unsubscribed ConsumerBlockItemObserver due to producer timeout");
LOGGER.log(
DEBUG,
"Producer liveness timeout. Unsubscribed ConsumerBlockItemObserver.");
} else {
// Refresh the producer liveness and pass the BlockItem to the downstream observer.
producerLivenessMillis = currentMillis;
Expand Down Expand Up @@ -192,20 +197,21 @@ public void send(@NonNull final SubscribeStreamResponse subscribeStreamResponse)
// the beginning of a block.
final BlockItem blockItem = subscribeStreamResponse.blockItem();
if (blockItem == null) {
LOGGER.log(
ERROR,
"SubscribeStreamResponse was of type BLOCK_ITEM but block_item is null."
+ " This is a protocol violation: {0}",
subscribeStreamResponse.toString());
throw new IllegalArgumentException(
"SubscribeStreamResponse was of type BlockItem but block_item is null");
final String message =
PROTOCOL_VIOLATION_MESSAGE.formatted(
"SubscribeStreamResponse",
"BLOCK_ITEM",
"block_item",
subscribeStreamResponse);
LOGGER.log(ERROR, message);
throw new IllegalArgumentException(message);
} else {
if (!streamStarted && blockItem.hasBlockHeader()) {
streamStarted = true;
}

if (streamStarted) {
LOGGER.log(DEBUG, "Send BlockItem downstream: {0} ", blockItem);
LOGGER.log(DEBUG, "Sending BlockItem downstream: {0}", blockItem);
subscribeStreamResponseObserver.onNext(
toProtocSubscribeStreamResponse(subscribeStreamResponse));
}
Expand All @@ -217,7 +223,7 @@ private final class StatusResponseSender implements ResponseSender {
public void send(@NonNull final SubscribeStreamResponse subscribeStreamResponse) {
LOGGER.log(
DEBUG,
"Send SubscribeStreamResponse downstream: {0} ",
"Sending SubscribeStreamResponse downstream: {0} ",
subscribeStreamResponse);
subscribeStreamResponseObserver.onNext(
toProtocSubscribeStreamResponse(subscribeStreamResponse));
Expand Down

0 comments on commit 05475fc

Please sign in to comment.