Skip to content

Commit

Permalink
fix:removed producer cancel hook
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Aug 1, 2024
1 parent d03ef75 commit d78eebe
Showing 1 changed file with 8 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.hedera.block.server.ServiceStatus;
import com.hedera.block.server.data.ObjectEvent;
import com.hedera.block.server.mediator.StreamMediator;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.security.NoSuchAlgorithmException;
Expand Down Expand Up @@ -56,18 +55,6 @@ public ProducerBlockItemObserver(
this.publishStreamResponseObserver = publishStreamResponseObserver;
this.itemAckBuilder = itemAckBuilder;
this.serviceStatus = serviceStatus;

if (publishStreamResponseObserver
instanceof
ServerCallStreamObserver<PublishStreamResponse>
serverCallStreamObserver) {
serverCallStreamObserver.setOnCancelHandler(
() -> {
LOGGER.log(
System.Logger.Level.INFO, "ProducerBlockStreamObserver cancelled");
serviceStatus.stopService();
});
}
}

/**
Expand All @@ -89,11 +76,8 @@ public void onNext(final PublishStreamRequest publishStreamRequest) {
streamMediator.publishEvent(blockItem);

try {
// Build the response
final ItemAcknowledgement itemAck = itemAckBuilder.buildAck(blockItem);
final PublishStreamResponse publishStreamResponse =
PublishStreamResponse.newBuilder().setAcknowledgement(itemAck).build();
publishStreamResponseObserver.onNext(publishStreamResponse);
// Send a successful response
publishStreamResponseObserver.onNext(buildSuccessStreamResponse(blockItem));

} catch (IOException | NoSuchAlgorithmException e) {
final var errorResponse = buildErrorStreamResponse();
Expand All @@ -117,6 +101,12 @@ public void onNext(final PublishStreamRequest publishStreamRequest) {
}
}

private PublishStreamResponse buildSuccessStreamResponse(final BlockItem blockItem)
throws IOException, NoSuchAlgorithmException {
final ItemAcknowledgement itemAck = itemAckBuilder.buildAck(blockItem);
return PublishStreamResponse.newBuilder().setAcknowledgement(itemAck).build();
}

private static PublishStreamResponse buildErrorStreamResponse() {
// TODO: Replace this with a real error enum.
final EndOfStream endOfStream =
Expand Down

0 comments on commit d78eebe

Please sign in to comment.