Skip to content

Commit

Permalink
Improve exception logging when we fail to index / transform message (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
tibrewalpratik17 authored Mar 7, 2024
1 parent 0a2debf commit 61aeb6f
Showing 1 changed file with 13 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,7 @@ private boolean processStreamEvents(MessageBatch messagesAndOffsets, long idlePi
// Decode message
StreamDataDecoderResult decodedRow = _streamDataDecoder.decode(messagesAndOffsets.getStreamMessage(index));
msgMetadata = messagesAndOffsets.getStreamMessage(index).getMetadata();
StreamPartitionMsgOffset messageOffset = messagesAndOffsets.getNextStreamPartitionMsgOffsetAtIndex(index);
if (decodedRow.getException() != null) {
// TODO: based on a config, decide whether the record should be silently dropped or stop further consumption on
// decode error
Expand All @@ -587,7 +588,8 @@ private boolean processStreamEvents(MessageBatch messagesAndOffsets, long idlePi
_numRowsErrored++;
// when exception happens we prefer abandoning the whole batch and not partially indexing some rows
reusedResult.getTransformedRows().clear();
String errorMessage = String.format("Caught exception while transforming the record: %s", decodedRow);
String errorMessage = String.format("Caught exception while transforming the record at offset: %s , row: %s",
messageOffset, decodedRow.getResult());
_segmentLogger.error(errorMessage, e);
_realtimeTableDataManager.addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), errorMessage, e));
}
Expand Down Expand Up @@ -617,14 +619,14 @@ private boolean processStreamEvents(MessageBatch messagesAndOffsets, long idlePi
_serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_ROWS_CONSUMED, 1L);
} catch (Exception e) {
_numRowsErrored++;
String errorMessage = String.format("Caught exception while indexing the record: %s", transformedRow);
String errorMessage = String.format("Caught exception while indexing the record at offset: %s , row: %s",
messageOffset, transformedRow);
_segmentLogger.error(errorMessage, e);
_realtimeTableDataManager.addSegmentError(_segmentNameStr,
new SegmentErrorInfo(now(), errorMessage, e));
_realtimeTableDataManager.addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), errorMessage, e));
}
}
}
_currentOffset = messagesAndOffsets.getNextStreamPartitionMsgOffsetAtIndex(index);
_currentOffset = messageOffset;
_numRowsIndexed = _realtimeSegment.getNumDocsIndexed();
_numRowsConsumed++;
streamMessageCount++;
Expand Down Expand Up @@ -799,8 +801,7 @@ public void run() {
_segmentLogger.error(errorMessage, e);
postStopConsumedMsg(e.getClass().getName());
_state = State.ERROR;
_realtimeTableDataManager
.addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), errorMessage, e));
_realtimeTableDataManager.addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), errorMessage, e));
_serverMetrics.setValueOfTableGauge(_clientId, ServerGauge.LLC_PARTITION_CONSUMING, 0);
return;
}
Expand Down Expand Up @@ -980,8 +981,7 @@ SegmentBuildDescriptor buildSegmentInternal(boolean forCommit) {
String errorMessage =
String.format("Caught exception while moving index directory from: %s to: %s", tempIndexDir, indexDir);
_segmentLogger.error(errorMessage, e);
_realtimeTableDataManager
.addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), errorMessage, e));
_realtimeTableDataManager.addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), errorMessage, e));
return null;
} finally {
FileUtils.deleteQuietly(tempSegmentFolder);
Expand All @@ -1001,8 +1001,7 @@ SegmentBuildDescriptor buildSegmentInternal(boolean forCommit) {
String errorMessage =
String.format("Caught exception while taring index directory from: %s to: %s", indexDir, segmentTarFile);
_segmentLogger.error(errorMessage, e);
_realtimeTableDataManager
.addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), errorMessage, e));
_realtimeTableDataManager.addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), errorMessage, e));
return null;
}

Expand All @@ -1011,17 +1010,15 @@ SegmentBuildDescriptor buildSegmentInternal(boolean forCommit) {
String errorMessage = String.format("Failed to find file: %s under index directory: %s",
V1Constants.MetadataKeys.METADATA_FILE_NAME, indexDir);
_segmentLogger.error(errorMessage);
_realtimeTableDataManager
.addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), errorMessage, null));
_realtimeTableDataManager.addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), errorMessage, null));
return null;
}
File creationMetaFile = SegmentDirectoryPaths.findCreationMetaFile(indexDir);
if (creationMetaFile == null) {
String errorMessage = String.format("Failed to find file: %s under index directory: %s",
V1Constants.SEGMENT_CREATION_META, indexDir);
_segmentLogger.error(errorMessage);
_realtimeTableDataManager
.addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), errorMessage, null));
_realtimeTableDataManager.addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), errorMessage, null));
return null;
}
Map<String, File> metadataFiles = new HashMap<>();
Expand All @@ -1037,8 +1034,7 @@ SegmentBuildDescriptor buildSegmentInternal(boolean forCommit) {
} catch (InterruptedException e) {
String errorMessage = "Interrupted while waiting for semaphore";
_segmentLogger.error(errorMessage, e);
_realtimeTableDataManager
.addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), errorMessage, e));
_realtimeTableDataManager.addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), errorMessage, e));
return null;
} finally {
if (_segBuildSemaphore != null) {
Expand Down

0 comments on commit 61aeb6f

Please sign in to comment.