Skip to content

Commit

Permalink
log processing errors
Browse files Browse the repository at this point in the history
  • Loading branch information
bsideup committed May 14, 2018
1 parent 106b302 commit 55830a9
Showing 1 changed file with 12 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,13 @@ public Mono<PublishReply> publish(Mono<PublishRequest> requestMono) {
))
.transform(mono -> {
for (RecordPreProcessor processor : recordPreProcessorChain.getAll()) {
mono = mono.flatMap(envelope -> Mono.fromCompletionStage(processor.preProcess(envelope)));
mono = mono.flatMap(envelope -> {
try {
return Mono.fromCompletionStage(processor.preProcess(envelope));
} catch (Throwable e) {
return Mono.error(new RuntimeException(processor + " failed", e));
}
});
}
return mono;
})
Expand All @@ -69,7 +75,8 @@ public Mono<PublishReply> publish(Mono<PublishRequest> requestMono) {
.setPartition(it.getPartition())
.setOffset(it.getOffset())
.build()
);
)
.log("publish", Level.SEVERE, SignalType.ON_ERROR);
}

@Override
Expand Down Expand Up @@ -124,7 +131,7 @@ public Flux<SubscribeReply> subscribe(Mono<SubscribeRequest> requestFlux) {
subscriptions.remove(sessionId, subscription);
});
})
.log("subscribe", Level.WARNING, SignalType.ON_ERROR);
.log("subscribe", Level.SEVERE, SignalType.ON_ERROR);
}

@Override
Expand Down Expand Up @@ -162,7 +169,7 @@ public Flux<ReceiveReply> receive(Mono<ReceiveRequest> requestMono) {
.build()
);
})
.log("receive", Level.WARNING, SignalType.ON_ERROR);
.log("receive", Level.SEVERE, SignalType.ON_ERROR);
}

@Override
Expand All @@ -184,7 +191,7 @@ public Mono<Empty> ack(Mono<AckRequest> request) {
));
})
.then(Mono.just(Empty.getDefaultInstance()))
.log("ack", Level.WARNING, SignalType.ON_ERROR);
.log("ack", Level.SEVERE, SignalType.ON_ERROR);
}

@Override
Expand Down

0 comments on commit 55830a9

Please sign in to comment.