Skip to content

Commit

Permalink
new: improved segment scanning performance
Browse files Browse the repository at this point in the history
  • Loading branch information
pamburus committed Jan 28, 2024
1 parent 9f51a7c commit 264e133
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 34 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ categories = ["command-line-utilities"]
description = "Utility for viewing json-formatted log files."
keywords = ["cli", "human", "log"]
name = "hl"
version = "0.25.1-beta.1"
version = "0.25.1-beta.2"
edition = "2021"
build = "build.rs"

Expand Down
4 changes: 2 additions & 2 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl App {
// spawn reader thread
let reader = scope.spawn(closure!(clone sfi, |_| -> Result<()> {
let mut tx = StripedSender::new(txi);
let scanner = Scanner::new(sfi, "\n".to_string());
let scanner = Scanner::new(sfi, b'\n');
for (i, mut input) in inputs.into_iter().enumerate() {
for item in scanner.items(&mut input.stream).with_max_segment_size(self.options.max_message_size.into()) {
if tx.send((i, item?)).is_none() {
Expand Down Expand Up @@ -404,7 +404,7 @@ impl App {
let mut readers = Vec::with_capacity(m);
for (i, input_ref) in inputs.into_iter().enumerate() {
let reader = scope.spawn(closure!(clone sfi, clone txi, |_| -> Result<()> {
let scanner = Scanner::new(sfi.clone(), "\n".to_string());
let scanner = Scanner::new(sfi.clone(), b'\n');
let mut meta = None;
if let InputReference::File(filename) = &input_ref {
meta = Some(fs::metadata(filename)?);
Expand Down
2 changes: 1 addition & 1 deletion src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ impl Indexer {
// spawn reader thread
let reader = scope.spawn(closure!(clone sfi, |_| -> Result<()> {
let mut sn: usize = 0;
let scanner = Scanner::new(sfi, "\n".to_string());
let scanner = Scanner::new(sfi, b'\n');
for item in scanner.items(input).with_max_segment_size(self.max_message_size.try_into()?) {
if let Err(_) = txi[sn % n].send(item?) {
break;
Expand Down
64 changes: 35 additions & 29 deletions src/scanning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,14 @@ use crate::error::*;
/// Scans input stream and splits it into segments containing a whole number of tokens delimited by the given delimiter.
/// If a single token exceeds size of a buffer allocated by SegmentBufFactory, it is split into multiple Incomplete segments.
pub struct Scanner {
delimiter: String,
delimiter: u8,
sf: Arc<SegmentBufFactory>,
}

impl Scanner {
/// Returns a new Scanner with the given parameters.
pub fn new(sf: Arc<SegmentBufFactory>, delimiter: String) -> Self {
Self {
delimiter: delimiter.clone(),
sf,
}
pub fn new(sf: Arc<SegmentBufFactory>, delimiter: u8) -> Self {
Self { delimiter, sf }
}

/// Returns an iterator over segments found in the input.
Expand Down Expand Up @@ -236,27 +233,36 @@ impl<'a, 'b> ScannerIter<'a, 'b> {
}

fn split(&mut self) -> Option<SegmentBuf> {
let k = self.scanner.delimiter.len();
if self.next.size < k || k == 0 {
if self.next.size < 1 {
return None;
}

for i in (0..self.next.size - k + 1).rev() {
if self.next.data[i..].starts_with(self.scanner.delimiter.as_bytes()) {
let n = self.next.size - i - k;
let mut result = self.scanner.sf.new_segment();
if result.data.len() < n {
result.data.resize(n, 0);
}
if n > 0 {
result.data[..n].copy_from_slice(&self.next.data[i + k..i + k + n]);
result.size = n;
self.next.size -= n;
}
return Some(result);
}
self.next.data[..self.next.size]
.rsplit(|x| *x == self.scanner.delimiter)
.next()
.map(|data| data.len())
.and_then(|n| self.split_n(n))
}

#[inline(always)]
fn split_n(&mut self, n: usize) -> Option<SegmentBuf> {
let s = self.next.size;
if n == s {
return None;
}

let mut result = self.scanner.sf.new_segment();
if result.data.len() < n {
result.data.resize(n, 0);
}
None

if n > 0 {
result.data[..n].copy_from_slice(&self.next.data[s - n..s]);
result.size = n;
self.next.size -= n;
}

Some(result)
}
}

Expand Down Expand Up @@ -398,7 +404,7 @@ mod tests {
#[test]
fn test_small_token() {
let sf = Arc::new(SegmentBufFactory::new(20));
let scanner = Scanner::new(sf.clone(), "/".into());
let scanner = Scanner::new(sf.clone(), b'/');
let mut data = std::io::Cursor::new(b"token");
let tokens = scanner.items(&mut data).collect::<Result<Vec<_>>>().unwrap();
assert_eq!(tokens, vec![Segment::Complete(b"token".into())])
Expand All @@ -407,7 +413,7 @@ mod tests {
#[test]
fn test_empty_token_and_small_token() {
let sf = Arc::new(SegmentBufFactory::new(20));
let scanner = Scanner::new(sf.clone(), "/".into());
let scanner = Scanner::new(sf.clone(), b'/');
let mut data = std::io::Cursor::new(b"/token");
let tokens = scanner.items(&mut data).collect::<Result<Vec<_>>>().unwrap();
assert_eq!(
Expand All @@ -419,7 +425,7 @@ mod tests {
#[test]
fn test_small_token_and_empty_token() {
let sf = Arc::new(SegmentBufFactory::new(20));
let scanner = Scanner::new(sf.clone(), "/".into());
let scanner = Scanner::new(sf.clone(), b'/');
let mut data = std::io::Cursor::new(b"token/");
let tokens = scanner.items(&mut data).collect::<Result<Vec<_>>>().unwrap();
assert_eq!(tokens, vec![Segment::Complete(b"token/".into())])
Expand All @@ -428,7 +434,7 @@ mod tests {
#[test]
fn test_two_small_tokens() {
let sf = Arc::new(SegmentBufFactory::new(20));
let scanner = Scanner::new(sf.clone(), "/".into());
let scanner = Scanner::new(sf.clone(), b'/');
let mut data = std::io::Cursor::new(b"test/token/");
let tokens = scanner.items(&mut data).collect::<Result<Vec<_>>>().unwrap();
assert_eq!(tokens, vec![Segment::Complete(b"test/token/".into())])
Expand All @@ -437,7 +443,7 @@ mod tests {
#[test]
fn test_two_tokens_over_segment_size() {
let sf = Arc::new(SegmentBufFactory::new(10));
let scanner = Scanner::new(sf.clone(), "/".into());
let scanner = Scanner::new(sf.clone(), b'/');
let mut data = std::io::Cursor::new(b"test/token/");
let tokens = scanner.items(&mut data).collect::<Result<Vec<_>>>().unwrap();
assert_eq!(
Expand All @@ -449,7 +455,7 @@ mod tests {
#[test]
fn test_jumbo_1() {
let sf = Arc::new(SegmentBufFactory::new(2));
let scanner = Scanner::new(sf.clone(), "/".into());
let scanner = Scanner::new(sf.clone(), b'/');
let mut data = std::io::Cursor::new(b"test/token/very/large/");
let tokens = scanner
.items(&mut data)
Expand Down

0 comments on commit 264e133

Please sign in to comment.