diff --git a/Cargo.lock b/Cargo.lock index 1a8c0742..22c7b889 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -611,7 +611,7 @@ checksum = "edd0f118536f44f5ccd48bcb8b111bdc3de888b58c74639dfb034a357d0f206d" [[package]] name = "encstr" -version = "0.29.2-alpha.4" +version = "0.29.2-alpha.5" [[package]] name = "enum-map" @@ -768,7 +768,7 @@ checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" [[package]] name = "hl" -version = "0.29.2-alpha.4" +version = "0.29.2-alpha.5" dependencies = [ "atoi", "bincode", diff --git a/Cargo.toml b/Cargo.toml index 2c613031..4771968a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ members = [".", "crate/encstr"] [workspace.package] repository = "https://github.com/pamburus/hl" authors = ["Pavel Ivanov "] -version = "0.29.2-alpha.4" +version = "0.29.2-alpha.5" edition = "2021" license = "MIT" diff --git a/benches/parse-and-format.rs b/benches/parse-and-format.rs index 4befb3f3..b600d20d 100644 --- a/benches/parse-and-format.rs +++ b/benches/parse-and-format.rs @@ -39,7 +39,7 @@ fn benchmark(c: &mut Criterion) { SegmentProcessor::new(&parser, &formatter, &filter, SegmentProcessorOptions::default()); let mut buf = Vec::new(); b.iter(|| { - processor.process(record, &mut buf, "", &mut RecordIgnorer {}); + processor.process(record, &mut buf, "", None, &mut RecordIgnorer {}); buf.clear(); }); }); diff --git a/src/app.rs b/src/app.rs index fdf8a735..7818f06f 100644 --- a/src/app.rs +++ b/src/app.rs @@ -231,7 +231,7 @@ impl App { match segment { Segment::Complete(segment) => { let mut buf = bfo.new_buf(); - processor.process(segment.data(), &mut buf, prefix, &mut RecordIgnorer{}); + processor.process(segment.data(), &mut buf, prefix, None, &mut RecordIgnorer{}); sfi.recycle(segment); if let Err(_) = txo.send((i, buf.into())) { break; @@ -384,6 +384,7 @@ impl App { line.bytes(), &mut buf, "", + Some(1), &mut |record: &Record, location: Range| { if let Some(ts) = &record.ts { if let Some(unix_ts) = ts.unix_utc() { @@ -562,7 +563,7 @@ impl App { Segment::Complete(segment) => { let mut buf = bfo.new_buf(); let mut index_builder = TimestampIndexBuilder{result: TimestampIndex::new(j)}; - processor.process(segment.data(), &mut buf, prefix, &mut index_builder); + processor.process(segment.data(), &mut buf, prefix, None, &mut index_builder); sfi.recycle(segment); if txo.send((i, buf, index_builder.result)).is_err() { return; @@ -795,7 +796,14 @@ impl App { // --- pub trait SegmentProcess { - fn process(&mut self, data: &[u8], buf: &mut Vec, prefix: &str, observer: &mut O); + fn process( + &mut self, + data: &[u8], + buf: &mut Vec, + prefix: &str, + limit: Option, + observer: &mut O, + ); } // --- @@ -836,10 +844,13 @@ impl<'a, Formatter: RecordWithSourceFormatter, Filter: RecordFilter> SegmentProc impl<'a, Formatter: RecordWithSourceFormatter, Filter: RecordFilter> SegmentProcess for SegmentProcessor<'a, Formatter, Filter> { - fn process(&mut self, data: &[u8], buf: &mut Vec, prefix: &str, observer: &mut O) + fn process(&mut self, data: &[u8], buf: &mut Vec, prefix: &str, limit: Option, observer: &mut O) where O: RecordObserver, { + let mut i = 0; + let limit = limit.unwrap_or(usize::MAX); + for line in self.options.delimiter.clone().into_searcher().split(data) { if line.len() == 0 { buf.push(b'\n'); @@ -854,6 +865,7 @@ impl<'a, Formatter: RecordWithSourceFormatter, Filter: RecordFilter> SegmentProc let mut produced_some = false; let mut last_offset = 0; while let Some(Ok(ar)) = stream.next() { + i += 1; last_offset = ar.offsets.end; if parsed_some { buf.push(b'\n'); @@ -872,6 +884,9 @@ impl<'a, Formatter: RecordWithSourceFormatter, Filter: RecordFilter> SegmentProc observer.observe_record(&record, begin..end); produced_some = true; } + if i >= limit { + break; + } } let remainder = if parsed_some { &line[last_offset..] } else { line }; if remainder.len() != 0 && self.show_unparsed() { @@ -1204,6 +1219,44 @@ mod tests { ); } + #[test] + fn test_sort_with_clingy_lines() { + let input = input(concat!( + r#"{"level":"debug","ts":"2024-01-25T19:10:20.435369+01:00","msg":"m2"}"#, + r#"{"level":"debug","ts":"2024-01-25T19:09:16.860711+01:00","msg":"m1"}"#, + "\n", + )); + + let mut output = Vec::new(); + let app = App::new(options().with_sort(true)); + app.run(vec![input], &mut output).unwrap(); + assert_eq!( + std::str::from_utf8(&output).unwrap(), + concat!( + "2024-01-25 18:09:16.860 |DBG| m1\n", + "2024-01-25 18:10:20.435 |DBG| m2\n", + ), + ); + } + + #[test] + fn test_sort_with_clingy_and_invalid_lines() { + let input = input(concat!( + r#"{"level":"debug","ts":"2024-01-25T19:10:20.435369+01:00","msg":"m2"}"#, + r#"{invalid}"#, + r#"{"level":"debug","ts":"2024-01-25T19:09:16.860711+01:00","msg":"m1"}"#, + "\n", + )); + + let mut output = Vec::new(); + let app = App::new(options().with_sort(true)); + app.run(vec![input], &mut output).unwrap(); + assert_eq!( + std::str::from_utf8(&output).unwrap(), + "2024-01-25 18:10:20.435 |DBG| m2\n", + ); + } + fn input>(s: S) -> InputHolder { InputHolder::new(InputReference::Stdin, Some(Box::new(Cursor::new(s.into())))) } diff --git a/src/index.rs b/src/index.rs index b0176fb5..a5bee5c8 100644 --- a/src/index.rs +++ b/src/index.rs @@ -401,10 +401,12 @@ impl Indexer { let mut prev_ts = None; let mut lines = Vec::<(Option, u32, u32)>::with_capacity(segment.data().len() / 512); let mut offset = 0; - for (i, data) in rtrim(segment.data(), b'\n').split(|c| *c == b'\n').enumerate() { + let mut i = 0; + for data in rtrim(segment.data(), b'\n').split(|c| *c == b'\n') { let data_len = data.len(); let data = strip(data, b'\r'); let mut ts = None; + let mut rel = 0; if data.len() != 0 { let mut stream = RawRecord::parser() .allow_prefix(self.allow_prefix) @@ -434,18 +436,25 @@ impl Indexer { if ts < prev_ts { sorted = false; } - prev_ts = ts; stat.add_valid(ts, flags); + lines.push((ts.or(prev_ts), i as u32, offset + ar.offsets.start as u32)); + rel = ar.offsets.end; + i += 1; + prev_ts = ts; } _ => { stat.add_invalid(); + lines.push((ts.or(prev_ts), i as u32, offset + rel as u32)); + i += 1; + break; } } } } else { stat.add_invalid(); + lines.push((ts.or(prev_ts), i as u32, offset)); + i += 1; } - lines.push((ts.or(prev_ts), i as u32, offset)); offset += data_len as u32 + 1; } let chronology = if sorted {