From ce3660125fa471a95e3ca594d0656137c2c19ae0 Mon Sep 17 00:00:00 2001 From: Brian Martin Date: Wed, 3 Nov 2021 15:27:59 -0700 Subject: [PATCH] bugfix: rust command log freezes (#362) We've observed that the command log output may freeze under high load. This happens due to the implementation of the `SamplingLogger`. By changing the logic, it will be able to handle the edge case that was unaccounted for in the initial implementation. This brings the `SamplingLogger` implementation more in-line with the original C implementation of klog. Fixes the `log_skip` metric to represent the number of log messages omitted due to sampling. Removes `log_skip_bytes` so we don't pay to format a log message we will omit. Adds `log_drop` and `log_drop_bytes` to capture log messages which are dropped due to a full queue between the logger and its drain. --- src/rust/logger/src/lib.rs | 5 ++++- src/rust/logger/src/sampling.rs | 7 +++++-- src/rust/logger/src/single.rs | 21 ++++++++++++++------- 3 files changed, 23 insertions(+), 10 deletions(-) diff --git a/src/rust/logger/src/lib.rs b/src/rust/logger/src/lib.rs index 6835be3bb..e54c82a8d 100644 --- a/src/rust/logger/src/lib.rs +++ b/src/rust/logger/src/lib.rs @@ -70,7 +70,8 @@ static_metrics! { static LOG_WRITE_BYTE: Counter; static LOG_WRITE_EX: Counter; static LOG_SKIP: Counter; - static LOG_SKIP_BYTE: Counter; + static LOG_DROP: Counter; + static LOG_DROP_BYTE: Counter; static LOG_FLUSH: Counter; static LOG_FLUSH_EX: Counter; } @@ -150,3 +151,5 @@ pub fn configure_logging(debug_config: &DebugConfig, klog_config: &KlogConfig) - .build() .start() } + +metrics::test_no_duplicates!(); diff --git a/src/rust/logger/src/sampling.rs b/src/rust/logger/src/sampling.rs index b739631c7..289a35364 100644 --- a/src/rust/logger/src/sampling.rs +++ b/src/rust/logger/src/sampling.rs @@ -29,10 +29,13 @@ impl Log for SamplingLogger { return; } + let count = self.counter.fetch_add(1, Ordering::Relaxed); + // if this is the Nth message, we should log it - if self.counter.fetch_add(1, Ordering::Relaxed) == self.sample { - self.counter.fetch_sub(self.sample, Ordering::Relaxed); + if (count % self.sample) == 0 { self.logger.log(record) + } else { + LOG_SKIP.increment(); } } diff --git a/src/rust/logger/src/single.rs b/src/rust/logger/src/single.rs index 9fd0e906a..389c6b990 100644 --- a/src/rust/logger/src/single.rs +++ b/src/rust/logger/src/single.rs @@ -53,8 +53,8 @@ impl Log for Logger { LOG_WRITE.increment(); LOG_WRITE_BYTE.add(bytes as _); } else { - LOG_SKIP.increment(); - LOG_SKIP_BYTE.add(bytes as _); + LOG_DROP.increment(); + LOG_DROP_BYTE.add(bytes as _); } } } @@ -73,8 +73,13 @@ pub(crate) struct LogDrain { impl Drain for LogDrain { fn flush(&mut self) -> Result<(), Error> { + LOG_FLUSH.increment(); while let Some(mut log_buffer) = self.log_filled.pop() { - let _ = self.output.write(&log_buffer); + if let Err(e) = self.output.write_all(&log_buffer) { + LOG_WRITE_EX.increment(); + warn!("failed write to log buffer: {}", e); + return Err(e); + } // shrink oversized buffer if log_buffer.len() > self.buffer_size { @@ -87,12 +92,14 @@ impl Drain for LogDrain { log_buffer.clear(); let _ = self.log_cleared.push(log_buffer); } - LOG_FLUSH.increment(); - let result = self.output.flush(); - if result.is_err() { + + if let Err(e) = self.output.flush() { LOG_FLUSH_EX.increment(); + warn!("failed to flush log: {}", e); + Err(e) + } else { + Ok(()) } - result } }