From 25b1fcbcf0a8003b63af8e59bda109a89dae490d Mon Sep 17 00:00:00 2001 From: Gonzalo Ortiz Jaureguizar Date: Mon, 23 Oct 2023 23:19:32 +0200 Subject: [PATCH] Improve tests when errors are received in the consumer thread (#11858) --- .../manager/realtime/RealtimeSegmentDataManager.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index f3e8fc588e9e..f954eff0e180 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -391,12 +391,12 @@ private void handleTransientStreamErrors(Exception e) _serverMetrics.addMeteredTableValue(_tableStreamName, ServerMeter.REALTIME_CONSUMPTION_EXCEPTIONS, 1L); if (_consecutiveErrorCount > MAX_CONSECUTIVE_ERROR_COUNT) { - _segmentLogger.warn("Stream transient exception when fetching messages, stopping consumption after {} attempts", - _consecutiveErrorCount, e); + _segmentLogger.warn("Stream transient exception when fetching messages, stopping consumption after " + + _consecutiveErrorCount + " attempts", e); throw e; } else { - _segmentLogger - .warn("Stream transient exception when fetching messages, retrying (count={})", _consecutiveErrorCount, e); + _segmentLogger.warn("Stream transient exception when fetching messages, retrying (count=" + + _consecutiveErrorCount + ")", e); Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); recreateStreamConsumer("Too many transient errors"); } @@ -444,6 +444,9 @@ protected boolean consumeLoop() // One such exception seen so far is java.net.SocketTimeoutException handleTransientStreamErrors(e); continue; + } catch (Throwable t) { + _segmentLogger.warn("Stream error when fetching messages, stopping consumption", t); + throw t; } boolean endCriteriaReached = processStreamEvents(messageBatch, idlePipeSleepTimeMillis);