Skip to content

Commit

Permalink
Improve tests when errors are received in the consumer thread (apache…
Browse files Browse the repository at this point in the history
  • Loading branch information
gortiz authored Oct 23, 2023
1 parent 8c87710 commit 25b1fcb
Showing 1 changed file with 7 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -391,12 +391,12 @@ private void handleTransientStreamErrors(Exception e)
_serverMetrics.addMeteredTableValue(_tableStreamName, ServerMeter.REALTIME_CONSUMPTION_EXCEPTIONS,
1L);
if (_consecutiveErrorCount > MAX_CONSECUTIVE_ERROR_COUNT) {
_segmentLogger.warn("Stream transient exception when fetching messages, stopping consumption after {} attempts",
_consecutiveErrorCount, e);
_segmentLogger.warn("Stream transient exception when fetching messages, stopping consumption after "
+ _consecutiveErrorCount + " attempts", e);
throw e;
} else {
_segmentLogger
.warn("Stream transient exception when fetching messages, retrying (count={})", _consecutiveErrorCount, e);
_segmentLogger.warn("Stream transient exception when fetching messages, retrying (count="
+ _consecutiveErrorCount + ")", e);
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
recreateStreamConsumer("Too many transient errors");
}
Expand Down Expand Up @@ -444,6 +444,9 @@ protected boolean consumeLoop()
// One such exception seen so far is java.net.SocketTimeoutException
handleTransientStreamErrors(e);
continue;
} catch (Throwable t) {
_segmentLogger.warn("Stream error when fetching messages, stopping consumption", t);
throw t;
}

boolean endCriteriaReached = processStreamEvents(messageBatch, idlePipeSleepTimeMillis);
Expand Down

0 comments on commit 25b1fcb

Please sign in to comment.