From 078912d55a46ddf496d43bee4b2dbf1142247c8a Mon Sep 17 00:00:00 2001 From: Matt Peterson Date: Wed, 31 Jul 2024 11:44:22 -0600 Subject: [PATCH] fix:removed producer cancel hook Signed-off-by: Matt Peterson --- .../producer/ProducerBlockItemObserver.java | 26 ++++++------------- 1 file changed, 8 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java b/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java index a935a9990..7847b994d 100644 --- a/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java +++ b/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java @@ -22,7 +22,6 @@ import com.hedera.block.server.ServiceStatus; import com.hedera.block.server.data.ObjectEvent; import com.hedera.block.server.mediator.StreamMediator; -import io.grpc.stub.ServerCallStreamObserver; import io.grpc.stub.StreamObserver; import java.io.IOException; import java.security.NoSuchAlgorithmException; @@ -56,18 +55,6 @@ public ProducerBlockItemObserver( this.publishStreamResponseObserver = publishStreamResponseObserver; this.itemAckBuilder = itemAckBuilder; this.serviceStatus = serviceStatus; - - if (publishStreamResponseObserver - instanceof - ServerCallStreamObserver - serverCallStreamObserver) { - serverCallStreamObserver.setOnCancelHandler( - () -> { - LOGGER.log( - System.Logger.Level.INFO, "ProducerBlockStreamObserver cancelled"); - serviceStatus.stopService(); - }); - } } /** @@ -89,11 +76,8 @@ public void onNext(final PublishStreamRequest publishStreamRequest) { streamMediator.publishEvent(blockItem); try { - // Build the response - final ItemAcknowledgement itemAck = itemAckBuilder.buildAck(blockItem); - final PublishStreamResponse publishStreamResponse = - PublishStreamResponse.newBuilder().setAcknowledgement(itemAck).build(); - publishStreamResponseObserver.onNext(publishStreamResponse); + // Send a successful response + publishStreamResponseObserver.onNext(buildSuccessStreamResponse(blockItem)); } catch (IOException | NoSuchAlgorithmException e) { final var errorResponse = buildErrorStreamResponse(); @@ -117,6 +101,12 @@ public void onNext(final PublishStreamRequest publishStreamRequest) { } } + private PublishStreamResponse buildSuccessStreamResponse(final BlockItem blockItem) + throws IOException, NoSuchAlgorithmException { + final ItemAcknowledgement itemAck = itemAckBuilder.buildAck(blockItem); + return PublishStreamResponse.newBuilder().setAcknowledgement(itemAck).build(); + } + private static PublishStreamResponse buildErrorStreamResponse() { // TODO: Replace this with a real error enum. final EndOfStream endOfStream =