From 6abffc4b12fc636150c0344847eb5443d94b5f27 Mon Sep 17 00:00:00 2001 From: Pavel Ivanov Date: Sun, 25 Feb 2024 10:51:29 +0100 Subject: [PATCH] new: added support for custom log message delimiters (#138) --- Cargo.lock | 2 +- Cargo.toml | 2 +- README.md | 1 + src/app.rs | 20 +- src/index.rs | 7 +- src/lib.rs | 1 + src/main.rs | 25 ++ src/scanning.rs | 637 +++++++++++++++++++++++++++++++++++++++++++----- 8 files changed, 619 insertions(+), 76 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 689b9e18..2a82c594 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -738,7 +738,7 @@ checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" [[package]] name = "hl" -version = "0.25.3-alpha.4" +version = "0.25.3-alpha.5" dependencies = [ "atoi", "bincode", diff --git a/Cargo.toml b/Cargo.toml index 3b52394f..cc4e0276 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ categories = ["command-line-utilities"] description = "Utility for viewing json-formatted log files." keywords = ["cli", "human", "log"] name = "hl" -version = "0.25.3-alpha.4" +version = "0.25.3-alpha.5" edition = "2021" build = "build.rs" diff --git a/README.md b/README.md index a0edb07f..24aa948f 100644 --- a/README.md +++ b/README.md @@ -471,6 +471,7 @@ Options: --tail Number of last messages to preload from each file in --follow mode [default: 10] --sync-interval-ms Synchronization interval for live streaming mode enabled by --follow option [default: 100] -o, --output Output file + --delimiter Log message delimiter, [NUL, CR, LF, CRLF] or any custom string --dump-index Dump index metadata and exit --help Print help -V, --version Print version diff --git a/src/app.rs b/src/app.rs index 14f98fe3..1914f192 100644 --- a/src/app.rs +++ b/src/app.rs @@ -36,7 +36,7 @@ use crate::index::{Indexer, Timestamp}; use crate::input::{BlockLine, InputHolder, InputReference, Input}; use crate::model::{Filter, Parser, ParserSettings, RawRecord, Record, RecordFilter, RecordWithSourceConstructor}; use crate::query::Query; -use crate::scanning::{BufFactory, Scanner, Segment, SegmentBuf, SegmentBufFactory}; +use crate::scanning::{BufFactory, Delimit, Delimiter, Scanner, SearchExt, Segment, SegmentBuf, SegmentBufFactory}; use crate::serdex::StreamDeserializerWithOffsets; use crate::settings::{Fields, Formatting}; use crate::theme::{Element, StylingPush, Theme}; @@ -68,6 +68,7 @@ pub struct Options { pub dump_index: bool, pub app_dirs: Option, pub tail: u64, + pub delimiter: Delimiter, } impl Options { @@ -135,7 +136,7 @@ impl App { // spawn reader thread let reader = scope.spawn(closure!(clone sfi, |_| -> Result<()> { let mut tx = StripedSender::new(txi); - let scanner = Scanner::new(sfi, b'\n'); + let scanner = Scanner::new(sfi, &self.options.delimiter); for (i, mut input) in inputs.into_iter().enumerate() { for item in scanner.items(&mut input.stream).with_max_segment_size(self.options.max_message_size.into()) { if tx.send((i, item?)).is_none() { @@ -204,6 +205,7 @@ impl App { NonZeroU32::try_from(self.options.max_message_size)?.try_into()?, cache_dir, &self.options.fields.settings.predefined, + self.options.delimiter.clone(), ); let input_badges = self.input_badges(inputs.iter().map(|x| &x.reference)); @@ -403,7 +405,7 @@ impl App { let mut readers = Vec::with_capacity(m); for (i, input_ref) in inputs.into_iter().enumerate() { let reader = scope.spawn(closure!(clone sfi, clone txi, |_| -> Result<()> { - let scanner = Scanner::new(sfi.clone(), b'\n'); + let scanner = Scanner::new(sfi.clone(), &self.options.delimiter); let mut meta = None; if let InputReference::File(filename) = &input_ref { meta = Some(fs::metadata(filename)?); @@ -703,6 +705,7 @@ impl App { let options = SegmentProcessorOptions{ allow_prefix: self.options.allow_prefix, allow_unparsed_data: self.options.filter.is_empty() && self.options.query.is_none(), + delimiter: self.options.delimiter.clone(), }; SegmentProcessor::new(parser, self.formatter(), self.options.filter_and_query(), options) @@ -721,6 +724,7 @@ pub trait SegmentProcess { pub struct SegmentProcessorOptions { pub allow_prefix: bool, pub allow_unparsed_data: bool, + pub delimiter: Delimiter, } // --- @@ -753,7 +757,7 @@ impl<'a, Formatter: RecordWithSourceFormatter, Filter: RecordFilter> SegmentProc where O: RecordObserver, { - for data in rtrim(data, b'\n').split(|c| *c == b'\n') { + for data in self.options.delimiter.clone().into_searcher().split(data) { if data.len() == 0 { continue; } @@ -924,14 +928,6 @@ impl StripedSender { // --- -fn rtrim<'a>(s: &'a [u8], c: u8) -> &'a [u8] { - if s.len() > 0 && s[s.len() - 1] == c { - &s[..s.len() - 1] - } else { - s - } -} - fn common_prefix_len<'a, V, I>(items: &'a Vec) -> usize where V: 'a + Eq + PartialEq + Copy, diff --git a/src/index.rs b/src/index.rs index d40562a5..ac8cdcea 100644 --- a/src/index.rs +++ b/src/index.rs @@ -37,7 +37,7 @@ use crate::index_capnp as schema; use crate::input::Input; use crate::level::Level; use crate::model::{Parser, ParserSettings, RawRecord}; -use crate::scanning::{Scanner, Segment, SegmentBuf, SegmentBufFactory}; +use crate::scanning::{Delimiter, Scanner, Segment, SegmentBuf, SegmentBufFactory}; use crate::settings::PredefinedFields; // types @@ -128,6 +128,7 @@ pub struct Indexer { max_message_size: u32, dir: PathBuf, parser: Parser, + delimiter: Delimiter, } impl Indexer { @@ -138,6 +139,7 @@ impl Indexer { max_message_size: u32, dir: PathBuf, fields: &PredefinedFields, + delimiter: Delimiter, ) -> Self { Self { concurrency, @@ -145,6 +147,7 @@ impl Indexer { max_message_size, dir, parser: Parser::new(ParserSettings::new(&fields, empty(), false)), + delimiter, } } @@ -252,7 +255,7 @@ impl Indexer { // spawn reader thread let reader = scope.spawn(closure!(clone sfi, |_| -> Result<()> { let mut sn: usize = 0; - let scanner = Scanner::new(sfi, b'\n'); + let scanner = Scanner::new(sfi, &self.delimiter); for item in scanner.items(input).with_max_segment_size(self.max_message_size.try_into()?) { if let Err(_) = txi[sn % n].send((sn, item?)) { break; diff --git a/src/lib.rs b/src/lib.rs index 29e6dd68..3e11425c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -43,6 +43,7 @@ pub use filtering::DefaultNormalizing; pub use formatting::RecordFormatter; pub use model::{FieldFilterSet, Filter, Level, Parser, ParserSettings, RecordFilter}; pub use query::Query; +pub use scanning::Delimiter; pub use settings::Settings; pub use theme::Theme; diff --git a/src/main.rs b/src/main.rs index b1fd61f3..9eb7a54f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -28,6 +28,7 @@ use hl::signal::SignalHandler; use hl::theme::{Theme, ThemeOrigin}; use hl::timeparse::parse_time; use hl::timezone::Tz; +use hl::Delimiter; use hl::{IncludeExcludeKeyFilter, KeyMatchOptions}; // --- @@ -203,6 +204,10 @@ struct Opt { #[arg(long, short = 'o', overrides_with = "output")] output: Option, + /// Log message delimiter, [NUL, CR, LF, CRLF] or any custom string. + #[arg(long, overrides_with = "delimiter")] + delimiter: Option, + /// Dump index metadata and exit. #[arg(long)] dump_index: bool, @@ -389,6 +394,25 @@ fn run() -> Result<()> { } } + let mut delimiter = Delimiter::default(); + if let Some(d) = opt.delimiter { + delimiter = match d.to_lowercase().as_str() { + "nul" => Delimiter::Byte(0), + "lf" => Delimiter::Byte(b'\n'), + "cr" => Delimiter::Byte(b'\r'), + "crlf" => Delimiter::default(), + _ => { + if d.len() == 1 { + Delimiter::Byte(d.as_bytes()[0]) + } else if d.len() > 1 { + Delimiter::Str(d) + } else { + Delimiter::default() + } + } + }; + } + // Create app. let app = hl::App::new(hl::Options { theme: Arc::new(theme), @@ -421,6 +445,7 @@ fn run() -> Result<()> { dump_index: opt.dump_index, app_dirs: Some(app_dirs), tail: opt.tail, + delimiter, }); // Configure input. diff --git a/src/scanning.rs b/src/scanning.rs index 5ca775b0..3b4fcaa9 100644 --- a/src/scanning.rs +++ b/src/scanning.rs @@ -1,7 +1,9 @@ // std imports +use std::cmp::min; use std::collections::VecDeque; use std::convert::From; use std::io::Read; +use std::ops::Range; use std::sync::Arc; // third-party imports @@ -14,58 +16,457 @@ use crate::error::*; /// Scans input stream and splits it into segments containing a whole number of tokens delimited by the given delimiter. /// If a single token exceeds size of a buffer allocated by SegmentBufFactory, it is split into multiple Incomplete segments. -pub struct Scanner { - delimiter: u8, +pub struct Scanner { + delimiter: D, sf: Arc, } -impl Scanner { +impl Scanner { /// Returns a new Scanner with the given parameters. - pub fn new(sf: Arc, delimiter: u8) -> Self { + pub fn new(sf: Arc, delimiter: D) -> Self { Self { delimiter, sf } } /// Returns an iterator over segments found in the input. - pub fn items<'a, 'b>(&'a self, input: &'b mut dyn Read) -> ScannerIter<'a, 'b> { + pub fn items<'a, 'b>(&'a self, input: &'b mut dyn Read) -> ScannerIter<'a, 'b, D> { return ScannerIter::new(self, input); } } // --- +/// Defines a token delimiter for Scanner. +#[derive(Clone)] +pub enum Delimiter { + Byte(u8), + Bytes(Vec), + Char(char), + Str(String), + SmartNewLine, +} + +impl Default for Delimiter { + fn default() -> Self { + Self::SmartNewLine + } +} + +impl From for Delimiter { + fn from(d: u8) -> Self { + Self::Byte(d) + } +} + +impl From> for Delimiter { + fn from(d: Vec) -> Self { + Self::Bytes(d) + } +} + +impl From<&[u8]> for Delimiter { + fn from(d: &[u8]) -> Self { + Self::Bytes(d.into()) + } +} + +impl From for Delimiter { + fn from(d: char) -> Self { + Self::Char(d) + } +} + +impl From<&str> for Delimiter { + fn from(d: &str) -> Self { + Self::Str(d.into()) + } +} + +impl From for Delimiter { + fn from(d: String) -> Self { + Self::Str(d) + } +} + +impl From for Delimiter { + fn from(_: SmartNewLine) -> Self { + Self::SmartNewLine + } +} + +impl Delimit for Delimiter { + type Searcher = Box; + + fn into_searcher(self) -> Self::Searcher { + match self { + Self::Byte(b) => Box::new(b.into_searcher()), + Self::Bytes(b) => Box::new(b.into_searcher()), + Self::Char(c) => Box::new(c.into_searcher()), + Self::Str(s) => Box::new(s.into_searcher()), + Self::SmartNewLine => Box::new(SmartNewLine.into_searcher()), + } + } +} + +// --- + +/// Defines a trait for token delimiters for Scanner. +pub trait Delimit: Clone { + type Searcher: Search; + + fn into_searcher(self) -> Self::Searcher; +} + +impl Delimit for u8 { + type Searcher = u8; + + fn into_searcher(self) -> Self::Searcher { + self + } +} + +impl<'a> Delimit for &'a [u8] { + type Searcher = SubStrSearcher; + + fn into_searcher(self) -> Self::Searcher { + SubStrSearcher::new(self) + } +} + +impl Delimit for char { + type Searcher = SubStrSearcher>; + + fn into_searcher(self) -> Self::Searcher { + let mut buf = [0; 4]; + self.encode_utf8(&mut buf); + SubStrSearcher::new(heapless::Vec::from_slice(&buf[..self.len_utf8()]).unwrap()) + } +} + +impl<'a> Delimit for &'a str { + type Searcher = SubStrSearcher; + + fn into_searcher(self) -> Self::Searcher { + SubStrSearcher::new(self) + } +} + +impl<'a> Delimit for &'a String { + type Searcher = SubStrSearcher; + + fn into_searcher(self) -> Self::Searcher { + SubStrSearcher::new(self) + } +} + +impl Delimit for String { + type Searcher = SubStrSearcher; + + fn into_searcher(self) -> Self::Searcher { + SubStrSearcher::new(self) + } +} + +impl Delimit for Vec { + type Searcher = SubStrSearcher; + + fn into_searcher(self) -> Self::Searcher { + SubStrSearcher::new(self) + } +} + +impl Delimit for &Delimiter { + type Searcher = Box; + + fn into_searcher(self) -> Self::Searcher { + self.clone().into_searcher() + } +} + +// --- + +/// Defines a smart new line delimiter that can be either LF or CRLF. +#[derive(Clone)] +pub struct SmartNewLine; + +impl Delimit for SmartNewLine { + type Searcher = SmartNewLineSearcher; + + fn into_searcher(self) -> Self::Searcher { + Self::Searcher {} + } +} + +// --- + +/// Defines a token delimiter search algorithm. +pub trait Search { + fn search_r(&self, buf: &[u8], edge: bool) -> Option>; + fn search_l(&self, buf: &[u8], edge: bool) -> Option>; + fn partial_match_r(&self, buf: &[u8]) -> Option; + fn partial_match_l(&self, buf: &[u8]) -> Option; +} + +impl Search for u8 { + fn search_r(&self, buf: &[u8], _: bool) -> Option> { + buf.iter().rposition(|x| x == self).map(|x| x..x + 1) + } + + fn search_l(&self, buf: &[u8], _: bool) -> Option> { + buf.iter().position(|x| x == self).map(|x| x..x + 1) + } + + fn partial_match_l(&self, _: &[u8]) -> Option { + None + } + + fn partial_match_r(&self, _: &[u8]) -> Option { + None + } +} + +impl Search for Box { + fn search_r(&self, buf: &[u8], edge: bool) -> Option> { + self.as_ref().search_r(buf, edge) + } + + fn search_l(&self, buf: &[u8], edge: bool) -> Option> { + self.as_ref().search_l(buf, edge) + } + + fn partial_match_r(&self, buf: &[u8]) -> Option { + self.as_ref().partial_match_r(buf) + } + + fn partial_match_l(&self, buf: &[u8]) -> Option { + self.as_ref().partial_match_l(buf) + } +} + +// --- + +// Extends Search with a split method. +pub trait SearchExt: Search { + fn split<'a, 'b>(&'a self, buf: &'b [u8]) -> SplitIter<'a, 'b, Self> + where + Self: Sized, + { + SplitIter { + searcher: self, + buf, + pos: 0, + } + } +} + +impl SearchExt for T {} + +// Iterates over the input buffer and returns slices of the buffer separated by the delimiter. +pub struct SplitIter<'a, 'b, S: Search + ?Sized> { + searcher: &'a S, + buf: &'b [u8], + pos: usize, +} + +impl<'a, 'b, S: Search> Iterator for SplitIter<'a, 'b, S> { + type Item = &'b [u8]; + + fn next(&mut self) -> Option { + if self.pos >= self.buf.len() { + return None; + } + + let buf = &self.buf[self.pos..]; + let range = self.searcher.search_l(buf, true); + if let Some(range) = range { + self.pos += range.end; + return Some(&buf[..range.start]); + } + + None + } +} + +// --- + +/// Searches for a substring in a byte slice. +pub struct SubStrSearcher { + delimiter: D, +} + +impl> SubStrSearcher { + pub fn new(delimiter: D) -> Self { + Self { delimiter } + } + + fn len(&self) -> usize { + self.delimiter.as_ref().len() + } +} + +impl> Search for SubStrSearcher { + fn search_r(&self, buf: &[u8], _edge: bool) -> Option> { + let needle = self.delimiter.as_ref(); + if needle.len() == 0 { + return None; + } + + let b = needle[0]; + let mut pos = buf.len(); + loop { + if let Some(i) = buf[..pos].iter().rposition(|x| *x == b) { + pos = i; + } else { + return None; + } + if buf[pos..].starts_with(needle) { + return Some(pos..pos + needle.len()); + } + } + } + + fn search_l(&self, buf: &[u8], _edge: bool) -> Option> { + let needle = self.delimiter.as_ref(); + if needle.len() == 0 { + return None; + } + + let b = needle[0]; + let mut pos = 0; + loop { + if let Some(i) = buf[pos..].iter().position(|x| *x == b) { + pos = i; + } else { + return None; + } + if buf[pos..].starts_with(needle) { + return Some(pos..pos + needle.len()); + } + pos += 1; + } + } + + fn partial_match_r(&self, buf: &[u8]) -> Option { + if self.len() < 2 { + return None; + } + + let end = buf.len(); + let begin = end.saturating_sub(self.len() - 1); + for i in begin..end { + if self.delimiter.as_ref().starts_with(&buf[i..]) { + return Some(i); + } + } + + None + } + + fn partial_match_l(&self, buf: &[u8]) -> Option { + if self.len() < 2 { + return None; + } + + let begin = 0; + let end = begin + min(buf.len(), self.len() - 1); + for i in (begin..end).rev() { + if self.delimiter.as_ref().ends_with(&buf[..i]) { + return Some(i); + } + } + + None + } +} + +// --- + +/// Searches for a new line in a byte slice that can be either LF or CRLF. +pub struct SmartNewLineSearcher; + +impl Search for SmartNewLineSearcher { + fn search_r(&self, buf: &[u8], _edge: bool) -> Option> { + buf.iter().rposition(|x| *x == b'\n').and_then(|i| { + if i > 0 && buf[i - 1] == b'\r' { + Some(i - 1..i + 1) + } else { + Some(i..i + 1) + } + }) + } + + fn search_l(&self, buf: &[u8], edge: bool) -> Option> { + if buf.len() < 2 { + return None; + } + + let b = if edge { 0 } else { 1 }; + + buf[b..].iter().position(|x| *x == b'\n').and_then(|i| { + if i > 0 && buf[i] == b'\r' { + Some(b + i - 1..b + i + 1) + } else { + Some(b + i..b + i + 1) + } + }) + } + + fn partial_match_r(&self, buf: &[u8]) -> Option { + if buf.len() > 0 && buf[buf.len() - 1] == b'\r' { + Some(buf.len() - 1) + } else { + None + } + } + + fn partial_match_l(&self, buf: &[u8]) -> Option { + if buf.len() > 0 && buf[0] == b'\n' { + Some(1) + } else { + None + } + } +} + +// --- + /// Contains a pre-allocated data buffer for a Segment and data size. #[derive(Eq)] pub struct SegmentBuf { - data: Vec, + buf: Vec, size: usize, } impl SegmentBuf { /// Returns a reference to the contained data. pub fn data(&self) -> &[u8] { - &self.data[..self.size] + &self.buf[..self.size] } - /// Converts the SegmentBuf to a Vec. + /// Returns data size. + pub fn len(&self) -> usize { + self.size + } + + /// Transforms the SegmentBuf into inner Vec. pub fn into_inner(self) -> Vec { - self.data + self.buf } fn new(capacity: usize) -> Self { - let mut data = Vec::with_capacity(capacity); - data.resize(capacity, 0); - Self { data, size: 0 } + let mut buf = Vec::with_capacity(capacity); + buf.resize(capacity, 0); + Self { buf, size: 0 } } fn zero() -> Self { Self { - data: Vec::new(), + buf: Vec::new(), size: 0, } } fn reset(&mut self) { - self.data.resize(self.data.capacity(), 0); + self.buf.resize(self.buf.capacity(), 0); self.size = 0; } @@ -100,7 +501,7 @@ impl> From for SegmentBuf { fn from(data: T) -> Self { let size = data.as_ref().len(); Self { - data: data.as_ref().into(), + buf: data.as_ref().into(), size, } } @@ -165,7 +566,7 @@ impl SegmentBufFactory { /// Recycles the given SegmentBuf. pub fn recycle(&self, buf: SegmentBuf) { - if buf.data.capacity() == self.buf_size { + if buf.buf.capacity() == self.buf_size { self.recycled.push(buf); } } @@ -208,55 +609,65 @@ impl BufFactory { // --- /// Iterates over the input stream and returns segments containing one or more tokens. -pub struct ScannerIter<'a, 'b> { - scanner: &'a Scanner, +pub struct ScannerIter<'a, 'b, D: Delimit> { + scanner: &'a Scanner, input: &'b mut dyn Read, next: SegmentBuf, + searcher: D::Searcher, placement: Option, done: bool, } -impl<'a, 'b> ScannerIter<'a, 'b> { - pub fn with_max_segment_size(self, max_segment_size: usize) -> ScannerJumboIter<'a, 'b> { +impl<'a, 'b, D: Delimit> ScannerIter<'a, 'b, D> { + pub fn with_max_segment_size(self, max_segment_size: usize) -> ScannerJumboIter<'a, 'b, D> { ScannerJumboIter::new(self, max_segment_size) } - fn new(scanner: &'a Scanner, input: &'b mut dyn Read) -> Self { + fn new(scanner: &'a Scanner, input: &'b mut dyn Read) -> Self { return Self { scanner, input, next: scanner.sf.new_segment(), + searcher: scanner.delimiter.clone().into_searcher(), placement: None, done: false, }; } - fn split(&mut self) -> Option { - if self.next.size < 1 { + fn split(&mut self, full: bool, edge: bool) -> Option<(SegmentBuf, bool)> { + if self.next.len() < 1 { return None; } - self.next.data[..self.next.size] - .rsplit(|x| *x == self.scanner.delimiter) - .next() - .map(|data| data.len()) - .and_then(|n| self.split_n(n)) + let buf = self.next.data(); + let bs = buf.len(); + self.searcher + .search_r(buf, edge) + .map(|range| (range.end, true)) + .or_else(|| { + if full { + self.searcher.partial_match_r(buf).map(|n| (n, false)) + } else { + None + } + }) + .and_then(|(n, ok)| self.split_n(bs - n).map(|sb| (sb, ok))) } #[inline(always)] fn split_n(&mut self, n: usize) -> Option { - let s = self.next.size; - if n == s { + let bs = self.next.len(); + if n == bs { return None; } let mut result = self.scanner.sf.new_segment(); - if result.data.len() < n { - result.data.resize(n, 0); + if result.buf.len() < n { + result.buf.resize(n, 0); } if n > 0 { - result.data[..n].copy_from_slice(&self.next.data[s - n..s]); + result.buf[..n].copy_from_slice(&self.next.buf[bs - n..bs]); result.size = n; self.next.size -= n; } @@ -265,35 +676,43 @@ impl<'a, 'b> ScannerIter<'a, 'b> { } } -impl<'a, 'b> Iterator for ScannerIter<'a, 'b> { +impl<'a, 'b, D: Delimit> Iterator for ScannerIter<'a, 'b, D> { type Item = Result; fn next(&mut self) -> Option { - if self.done { - return None; - } + let bs = self.next.buf.len(); loop { - let n = match self.input.read(&mut self.next.data[self.next.size..]) { - Ok(value) => value, + let begin = self.next.size; + let end = bs; + let n = match self.input.read(&mut self.next.buf[begin..end]) { + Ok(n) => n, Err(err) => { self.done = true; return Some(Err(err.into())); } }; + self.next.size += n; - let full = self.next.size == self.next.data.capacity(); + let full = self.next.size == end; let (next, placement) = if n == 0 { self.done = true; (SegmentBuf::zero(), self.placement.and(Some(PartialPlacement::Last))) } else { - match self.split() { - Some(next) => { + match self.split(full, self.done) { + Some((next, true)) => { let result = (next, self.placement.and(Some(PartialPlacement::Last))); self.placement = None; result } + Some((next, false)) => { + self.placement = self + .placement + .and(Some(PartialPlacement::Next)) + .or(Some(PartialPlacement::First)); + (next, self.placement) + } None => { if !full { continue; @@ -322,15 +741,15 @@ impl<'a, 'b> Iterator for ScannerIter<'a, 'b> { /// Iterates over the input stream and returns segments containing tokens. /// Unlike ScannerIter ScannerJumboIter joins incomplete segments into a single complete segment /// if its size does not exceed max_segment_size. -pub struct ScannerJumboIter<'a, 'b> { - inner: ScannerIter<'a, 'b>, +pub struct ScannerJumboIter<'a, 'b, D: Delimit> { + inner: ScannerIter<'a, 'b, D>, max_segment_size: usize, fetched: VecDeque<(SegmentBuf, PartialPlacement)>, next: Option>, } -impl<'a, 'b> ScannerJumboIter<'a, 'b> { - fn new(inner: ScannerIter<'a, 'b>, max_segment_size: usize) -> Self { +impl<'a, 'b, D: Delimit> ScannerJumboIter<'a, 'b, D> { + fn new(inner: ScannerIter<'a, 'b, D>, max_segment_size: usize) -> Self { return Self { inner, max_segment_size, @@ -339,12 +758,21 @@ impl<'a, 'b> ScannerJumboIter<'a, 'b> { }; } - fn complete(&mut self, next: Option>) -> Option> { - if self.fetched.len() == 0 { - return next; - } + fn push(&mut self, buf: SegmentBuf, placement: PartialPlacement) { + self.fetched.push_back((buf, placement)); + } + + fn pop(&mut self) -> Option<(SegmentBuf, PartialPlacement)> { + self.fetched.pop_front() + } - self.next = next; + fn can_complete(&self) -> bool { + self.fetched.len() > 0 + && self.fetched.front().map(|x| x.1) == Some(PartialPlacement::First) + && self.fetched.back().map(|x| x.1) == Some(PartialPlacement::Last) + } + + fn complete(&mut self) -> Option> { let buf = self .fetched .iter() @@ -359,12 +787,12 @@ impl<'a, 'b> ScannerJumboIter<'a, 'b> { } } -impl<'a, 'b> Iterator for ScannerJumboIter<'a, 'b> { +impl<'a, 'b, D: Delimit> Iterator for ScannerJumboIter<'a, 'b, D> { type Item = Result; fn next(&mut self) -> Option { loop { - if let Some((buf, placement)) = self.fetched.pop_front() { + if let Some((buf, placement)) = self.pop() { return Some(Ok(Segment::Incomplete(buf, placement))); } if let Some(next) = self.next.take() { @@ -372,18 +800,26 @@ impl<'a, 'b> Iterator for ScannerJumboIter<'a, 'b> { } let mut total = 0; + loop { let next = self.inner.next(); match next { Some(Ok(Segment::Incomplete(buf, placement))) => { - total += buf.data().len(); - self.fetched.push_back((buf, placement)); - if placement == PartialPlacement::Last { - return self.complete(None); + total += buf.len(); + self.push(buf, placement); + if self.can_complete() { + return self.complete(); } } - next @ _ => { - return self.complete(next); + next @ Some(_) => { + self.next = next; + break; + } + None => { + if self.fetched.len() == 0 && self.next.is_none() { + return None; + } + break; } }; if total > self.max_segment_size { @@ -400,6 +836,17 @@ impl<'a, 'b> Iterator for ScannerJumboIter<'a, 'b> { mod tests { use super::*; + #[test] + fn test_split_iter() { + let searcher = b'/'.into_searcher(); + let buf = b"test/token/"; + let mut iter = searcher.split(buf); + + assert_eq!(iter.next(), Some(&b"test"[..])); + assert_eq!(iter.next(), Some(&b"token"[..])); + assert_eq!(iter.next(), None); + } + #[test] fn test_small_token() { let sf = Arc::new(SegmentBufFactory::new(20)); @@ -454,7 +901,7 @@ mod tests { #[test] fn test_jumbo_1() { let sf = Arc::new(SegmentBufFactory::new(2)); - let scanner = Scanner::new(sf.clone(), b'/'); + let scanner = Scanner::new(sf.clone(), '/'); let mut data = std::io::Cursor::new(b"test/token/very/large/"); let tokens = scanner .items(&mut data) @@ -471,4 +918,74 @@ mod tests { ] ) } + + #[test] + fn test_jumbo_2() { + let sf = Arc::new(SegmentBufFactory::new(3)); + let scanner = Scanner::new(sf.clone(), "/:"); + let mut data = std::io::Cursor::new(b"test/:token/:very/:large/:x/"); + let tokens = scanner + .items(&mut data) + .with_max_segment_size(7) + .collect::>>() + .unwrap(); + assert_eq!( + tokens, + vec![ + Segment::Complete(b"test/:".into()), + Segment::Complete(b"token/:".into()), + Segment::Complete(b"very/:".into()), + Segment::Complete(b"large/:".into()), + Segment::Complete(b"x/".into()), + ] + ) + } + + #[test] + fn test_jumbo_0() { + let sf = Arc::new(SegmentBufFactory::new(3)); + let scanner = Scanner::new(sf.clone(), ""); + let mut data = std::io::Cursor::new(b"test/:token/:very/:large/:"); + let tokens = scanner + .items(&mut data) + .with_max_segment_size(7) + .collect::>>() + .unwrap(); + assert_eq!( + tokens, + vec![ + Segment::Incomplete(b"tes".into(), PartialPlacement::First), + Segment::Incomplete(b"t/:".into(), PartialPlacement::Next), + Segment::Incomplete(b"tok".into(), PartialPlacement::Next), + Segment::Incomplete(b"en/".into(), PartialPlacement::Next), + Segment::Incomplete(b":ve".into(), PartialPlacement::Next), + Segment::Incomplete(b"ry/".into(), PartialPlacement::Next), + Segment::Incomplete(b":la".into(), PartialPlacement::Next), + Segment::Incomplete(b"rge".into(), PartialPlacement::Next), + Segment::Incomplete(b"/:".into(), PartialPlacement::Last), + ] + ) + } + + #[test] + fn test_jumbo_smart_new_line() { + let sf = Arc::new(SegmentBufFactory::new(3)); + let scanner = Scanner::new(sf.clone(), SmartNewLine); + let mut data = std::io::Cursor::new(b"test\r\ntoken\r\nvery\r\nlarge\nx/"); + let tokens = scanner + .items(&mut data) + .with_max_segment_size(7) + .collect::>>() + .unwrap(); + assert_eq!( + tokens, + vec![ + Segment::Complete(b"test\r\n".into()), + Segment::Complete(b"token\r\n".into()), + Segment::Complete(b"very\r\n".into()), + Segment::Complete(b"large\n".into()), + Segment::Complete(b"x/".into()), + ] + ) + } }