Skip to content

Commit

Permalink
fix: fixed sorting when some json messages are on a single line (#250)
Browse files Browse the repository at this point in the history
  • Loading branch information
pamburus authored May 5, 2024
1 parent 6e0787e commit 4a30f53
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 11 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ members = [".", "crate/encstr"]
[workspace.package]
repository = "https://github.com/pamburus/hl"
authors = ["Pavel Ivanov <[email protected]>"]
version = "0.29.2-alpha.4"
version = "0.29.2-alpha.5"
edition = "2021"
license = "MIT"

Expand Down
2 changes: 1 addition & 1 deletion benches/parse-and-format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ fn benchmark(c: &mut Criterion) {
SegmentProcessor::new(&parser, &formatter, &filter, SegmentProcessorOptions::default());
let mut buf = Vec::new();
b.iter(|| {
processor.process(record, &mut buf, "", &mut RecordIgnorer {});
processor.process(record, &mut buf, "", None, &mut RecordIgnorer {});
buf.clear();
});
});
Expand Down
61 changes: 57 additions & 4 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ impl App {
match segment {
Segment::Complete(segment) => {
let mut buf = bfo.new_buf();
processor.process(segment.data(), &mut buf, prefix, &mut RecordIgnorer{});
processor.process(segment.data(), &mut buf, prefix, None, &mut RecordIgnorer{});
sfi.recycle(segment);
if let Err(_) = txo.send((i, buf.into())) {
break;
Expand Down Expand Up @@ -384,6 +384,7 @@ impl App {
line.bytes(),
&mut buf,
"",
Some(1),
&mut |record: &Record, location: Range<usize>| {
if let Some(ts) = &record.ts {
if let Some(unix_ts) = ts.unix_utc() {
Expand Down Expand Up @@ -562,7 +563,7 @@ impl App {
Segment::Complete(segment) => {
let mut buf = bfo.new_buf();
let mut index_builder = TimestampIndexBuilder{result: TimestampIndex::new(j)};
processor.process(segment.data(), &mut buf, prefix, &mut index_builder);
processor.process(segment.data(), &mut buf, prefix, None, &mut index_builder);
sfi.recycle(segment);
if txo.send((i, buf, index_builder.result)).is_err() {
return;
Expand Down Expand Up @@ -795,7 +796,14 @@ impl App {
// ---

pub trait SegmentProcess {
fn process<O: RecordObserver>(&mut self, data: &[u8], buf: &mut Vec<u8>, prefix: &str, observer: &mut O);
fn process<O: RecordObserver>(
&mut self,
data: &[u8],
buf: &mut Vec<u8>,
prefix: &str,
limit: Option<usize>,
observer: &mut O,
);
}

// ---
Expand Down Expand Up @@ -836,10 +844,13 @@ impl<'a, Formatter: RecordWithSourceFormatter, Filter: RecordFilter> SegmentProc
impl<'a, Formatter: RecordWithSourceFormatter, Filter: RecordFilter> SegmentProcess
for SegmentProcessor<'a, Formatter, Filter>
{
fn process<O>(&mut self, data: &[u8], buf: &mut Vec<u8>, prefix: &str, observer: &mut O)
fn process<O>(&mut self, data: &[u8], buf: &mut Vec<u8>, prefix: &str, limit: Option<usize>, observer: &mut O)
where
O: RecordObserver,
{
let mut i = 0;
let limit = limit.unwrap_or(usize::MAX);

for line in self.options.delimiter.clone().into_searcher().split(data) {
if line.len() == 0 {
buf.push(b'\n');
Expand All @@ -854,6 +865,7 @@ impl<'a, Formatter: RecordWithSourceFormatter, Filter: RecordFilter> SegmentProc
let mut produced_some = false;
let mut last_offset = 0;
while let Some(Ok(ar)) = stream.next() {
i += 1;
last_offset = ar.offsets.end;
if parsed_some {
buf.push(b'\n');
Expand All @@ -872,6 +884,9 @@ impl<'a, Formatter: RecordWithSourceFormatter, Filter: RecordFilter> SegmentProc
observer.observe_record(&record, begin..end);
produced_some = true;
}
if i >= limit {
break;
}
}
let remainder = if parsed_some { &line[last_offset..] } else { line };
if remainder.len() != 0 && self.show_unparsed() {
Expand Down Expand Up @@ -1204,6 +1219,44 @@ mod tests {
);
}

#[test]
fn test_sort_with_clingy_lines() {
let input = input(concat!(
r#"{"level":"debug","ts":"2024-01-25T19:10:20.435369+01:00","msg":"m2"}"#,
r#"{"level":"debug","ts":"2024-01-25T19:09:16.860711+01:00","msg":"m1"}"#,
"\n",
));

let mut output = Vec::new();
let app = App::new(options().with_sort(true));
app.run(vec![input], &mut output).unwrap();
assert_eq!(
std::str::from_utf8(&output).unwrap(),
concat!(
"2024-01-25 18:09:16.860 |DBG| m1\n",
"2024-01-25 18:10:20.435 |DBG| m2\n",
),
);
}

#[test]
fn test_sort_with_clingy_and_invalid_lines() {
let input = input(concat!(
r#"{"level":"debug","ts":"2024-01-25T19:10:20.435369+01:00","msg":"m2"}"#,
r#"{invalid}"#,
r#"{"level":"debug","ts":"2024-01-25T19:09:16.860711+01:00","msg":"m1"}"#,
"\n",
));

let mut output = Vec::new();
let app = App::new(options().with_sort(true));
app.run(vec![input], &mut output).unwrap();
assert_eq!(
std::str::from_utf8(&output).unwrap(),
"2024-01-25 18:10:20.435 |DBG| m2\n",
);
}

fn input<S: Into<String>>(s: S) -> InputHolder {
InputHolder::new(InputReference::Stdin, Some(Box::new(Cursor::new(s.into()))))
}
Expand Down
15 changes: 12 additions & 3 deletions src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,10 +401,12 @@ impl Indexer {
let mut prev_ts = None;
let mut lines = Vec::<(Option<Timestamp>, u32, u32)>::with_capacity(segment.data().len() / 512);
let mut offset = 0;
for (i, data) in rtrim(segment.data(), b'\n').split(|c| *c == b'\n').enumerate() {
let mut i = 0;
for data in rtrim(segment.data(), b'\n').split(|c| *c == b'\n') {
let data_len = data.len();
let data = strip(data, b'\r');
let mut ts = None;
let mut rel = 0;
if data.len() != 0 {
let mut stream = RawRecord::parser()
.allow_prefix(self.allow_prefix)
Expand Down Expand Up @@ -434,18 +436,25 @@ impl Indexer {
if ts < prev_ts {
sorted = false;
}
prev_ts = ts;
stat.add_valid(ts, flags);
lines.push((ts.or(prev_ts), i as u32, offset + ar.offsets.start as u32));
rel = ar.offsets.end;
i += 1;
prev_ts = ts;
}
_ => {
stat.add_invalid();
lines.push((ts.or(prev_ts), i as u32, offset + rel as u32));
i += 1;
break;
}
}
}
} else {
stat.add_invalid();
lines.push((ts.or(prev_ts), i as u32, offset));
i += 1;
}
lines.push((ts.or(prev_ts), i as u32, offset));
offset += data_len as u32 + 1;
}
let chronology = if sorted {
Expand Down

0 comments on commit 4a30f53

Please sign in to comment.