From 264e1339d336a41bf40215bfec29c60511ab05dd Mon Sep 17 00:00:00 2001 From: Pavel Ivanov Date: Sat, 27 Jan 2024 21:00:36 +0100 Subject: [PATCH] new: improved segment scanning performance --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/app.rs | 4 ++-- src/index.rs | 2 +- src/scanning.rs | 64 +++++++++++++++++++++++++++---------------------- 5 files changed, 40 insertions(+), 34 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6e20a95a..bc2a1e4f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -717,7 +717,7 @@ checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" [[package]] name = "hl" -version = "0.25.1-beta.1" +version = "0.25.1-beta.2" dependencies = [ "atoi", "bincode", diff --git a/Cargo.toml b/Cargo.toml index 74fc24bc..317e0ded 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ categories = ["command-line-utilities"] description = "Utility for viewing json-formatted log files." keywords = ["cli", "human", "log"] name = "hl" -version = "0.25.1-beta.1" +version = "0.25.1-beta.2" edition = "2021" build = "build.rs" diff --git a/src/app.rs b/src/app.rs index ef7afebf..a41d4df4 100644 --- a/src/app.rs +++ b/src/app.rs @@ -136,7 +136,7 @@ impl App { // spawn reader thread let reader = scope.spawn(closure!(clone sfi, |_| -> Result<()> { let mut tx = StripedSender::new(txi); - let scanner = Scanner::new(sfi, "\n".to_string()); + let scanner = Scanner::new(sfi, b'\n'); for (i, mut input) in inputs.into_iter().enumerate() { for item in scanner.items(&mut input.stream).with_max_segment_size(self.options.max_message_size.into()) { if tx.send((i, item?)).is_none() { @@ -404,7 +404,7 @@ impl App { let mut readers = Vec::with_capacity(m); for (i, input_ref) in inputs.into_iter().enumerate() { let reader = scope.spawn(closure!(clone sfi, clone txi, |_| -> Result<()> { - let scanner = Scanner::new(sfi.clone(), "\n".to_string()); + let scanner = Scanner::new(sfi.clone(), b'\n'); let mut meta = None; if let InputReference::File(filename) = &input_ref { meta = Some(fs::metadata(filename)?); diff --git a/src/index.rs b/src/index.rs index 88b5c80b..fe7d23d7 100644 --- a/src/index.rs +++ b/src/index.rs @@ -234,7 +234,7 @@ impl Indexer { // spawn reader thread let reader = scope.spawn(closure!(clone sfi, |_| -> Result<()> { let mut sn: usize = 0; - let scanner = Scanner::new(sfi, "\n".to_string()); + let scanner = Scanner::new(sfi, b'\n'); for item in scanner.items(input).with_max_segment_size(self.max_message_size.try_into()?) { if let Err(_) = txi[sn % n].send(item?) { break; diff --git a/src/scanning.rs b/src/scanning.rs index e0b80e98..e0cf8a4b 100644 --- a/src/scanning.rs +++ b/src/scanning.rs @@ -15,17 +15,14 @@ use crate::error::*; /// Scans input stream and splits it into segments containing a whole number of tokens delimited by the given delimiter. /// If a single token exceeds size of a buffer allocated by SegmentBufFactory, it is split into multiple Incomplete segments. pub struct Scanner { - delimiter: String, + delimiter: u8, sf: Arc, } impl Scanner { /// Returns a new Scanner with the given parameters. - pub fn new(sf: Arc, delimiter: String) -> Self { - Self { - delimiter: delimiter.clone(), - sf, - } + pub fn new(sf: Arc, delimiter: u8) -> Self { + Self { delimiter, sf } } /// Returns an iterator over segments found in the input. @@ -236,27 +233,36 @@ impl<'a, 'b> ScannerIter<'a, 'b> { } fn split(&mut self) -> Option { - let k = self.scanner.delimiter.len(); - if self.next.size < k || k == 0 { + if self.next.size < 1 { return None; } - for i in (0..self.next.size - k + 1).rev() { - if self.next.data[i..].starts_with(self.scanner.delimiter.as_bytes()) { - let n = self.next.size - i - k; - let mut result = self.scanner.sf.new_segment(); - if result.data.len() < n { - result.data.resize(n, 0); - } - if n > 0 { - result.data[..n].copy_from_slice(&self.next.data[i + k..i + k + n]); - result.size = n; - self.next.size -= n; - } - return Some(result); - } + self.next.data[..self.next.size] + .rsplit(|x| *x == self.scanner.delimiter) + .next() + .map(|data| data.len()) + .and_then(|n| self.split_n(n)) + } + + #[inline(always)] + fn split_n(&mut self, n: usize) -> Option { + let s = self.next.size; + if n == s { + return None; + } + + let mut result = self.scanner.sf.new_segment(); + if result.data.len() < n { + result.data.resize(n, 0); } - None + + if n > 0 { + result.data[..n].copy_from_slice(&self.next.data[s - n..s]); + result.size = n; + self.next.size -= n; + } + + Some(result) } } @@ -398,7 +404,7 @@ mod tests { #[test] fn test_small_token() { let sf = Arc::new(SegmentBufFactory::new(20)); - let scanner = Scanner::new(sf.clone(), "/".into()); + let scanner = Scanner::new(sf.clone(), b'/'); let mut data = std::io::Cursor::new(b"token"); let tokens = scanner.items(&mut data).collect::>>().unwrap(); assert_eq!(tokens, vec![Segment::Complete(b"token".into())]) @@ -407,7 +413,7 @@ mod tests { #[test] fn test_empty_token_and_small_token() { let sf = Arc::new(SegmentBufFactory::new(20)); - let scanner = Scanner::new(sf.clone(), "/".into()); + let scanner = Scanner::new(sf.clone(), b'/'); let mut data = std::io::Cursor::new(b"/token"); let tokens = scanner.items(&mut data).collect::>>().unwrap(); assert_eq!( @@ -419,7 +425,7 @@ mod tests { #[test] fn test_small_token_and_empty_token() { let sf = Arc::new(SegmentBufFactory::new(20)); - let scanner = Scanner::new(sf.clone(), "/".into()); + let scanner = Scanner::new(sf.clone(), b'/'); let mut data = std::io::Cursor::new(b"token/"); let tokens = scanner.items(&mut data).collect::>>().unwrap(); assert_eq!(tokens, vec![Segment::Complete(b"token/".into())]) @@ -428,7 +434,7 @@ mod tests { #[test] fn test_two_small_tokens() { let sf = Arc::new(SegmentBufFactory::new(20)); - let scanner = Scanner::new(sf.clone(), "/".into()); + let scanner = Scanner::new(sf.clone(), b'/'); let mut data = std::io::Cursor::new(b"test/token/"); let tokens = scanner.items(&mut data).collect::>>().unwrap(); assert_eq!(tokens, vec![Segment::Complete(b"test/token/".into())]) @@ -437,7 +443,7 @@ mod tests { #[test] fn test_two_tokens_over_segment_size() { let sf = Arc::new(SegmentBufFactory::new(10)); - let scanner = Scanner::new(sf.clone(), "/".into()); + let scanner = Scanner::new(sf.clone(), b'/'); let mut data = std::io::Cursor::new(b"test/token/"); let tokens = scanner.items(&mut data).collect::>>().unwrap(); assert_eq!( @@ -449,7 +455,7 @@ mod tests { #[test] fn test_jumbo_1() { let sf = Arc::new(SegmentBufFactory::new(2)); - let scanner = Scanner::new(sf.clone(), "/".into()); + let scanner = Scanner::new(sf.clone(), b'/'); let mut data = std::io::Cursor::new(b"test/token/very/large/"); let tokens = scanner .items(&mut data)