Skip to content

Commit

Permalink
Merge pull request #111 from pamburus/feature/prefix
Browse files Browse the repository at this point in the history
new: Added option `--allow-prefix`
  • Loading branch information
pamburus authored Dec 16, 2023
2 parents 31a0594 + 36f111b commit be42643
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 25 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ categories = ["command-line-utilities"]
description = "Utility for viewing json-formatted log files."
keywords = ["cli", "human", "log"]
name = "hl"
version = "0.23.1-beta.2"
version = "0.23.1"
edition = "2021"
build = "build.rs"

Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ Options:
--theme <THEME> Color theme [env: HL_THEME=] [default: universal]
-r, --raw Output raw JSON messages instead of formatter messages, it can be useful for applying filters and saving results in original format
--raw-fields Disable unescaping and prettifying of field values
--allow-prefix Allow non-JSON prefixes before JSON messages [env: HL_ALLOW_PREFIX=]
--interrupt-ignore-count <INTERRUPT_IGNORE_COUNT> Number of interrupts to ignore, i.e. Ctrl-C (SIGINT) [env: HL_INTERRUPT_IGNORE_COUNT=] [default: 3]
--buffer-size <BUFFER_SIZE> Buffer size [env: HL_BUFFER_SIZE=] [default: "256 KiB"]
--max-message-size <MAX_MESSAGE_SIZE> Maximum message size [env: HL_MAX_MESSAGE_SIZE=] [default: "64 MiB"]
Expand Down
67 changes: 50 additions & 17 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ pub struct Options {
pub time_format: DateTimeFormat,
pub raw: bool,
pub raw_fields: bool,
pub allow_prefix: bool,
pub buffer_size: NonZeroUsize,
pub max_message_size: NonZeroUsize,
pub concurrency: usize,
Expand Down Expand Up @@ -133,8 +134,7 @@ impl App {
// spawn processing threads
for (rxi, txo) in izip!(rxi, txo) {
scope.spawn(closure!(ref bfo, ref parser, ref sfi, ref input_badges, |_| {
let formatter = self.formatter();
let mut processor = SegmentProcessor::new(&parser, &formatter, &self.options.filter);
let mut processor = self.new_segment_processor(&parser);
for (i, segment) in rxi.iter() {
let prefix = input_badges.as_ref().map(|b|b[i].as_str()).unwrap_or("");
match segment {
Expand Down Expand Up @@ -266,8 +266,7 @@ impl App {
let mut workers = Vec::with_capacity(n);
for (rxp, txw) in izip!(rxp, txw) {
workers.push(scope.spawn(closure!(ref parser, |_| -> Result<()> {
let formatter = self.formatter();
let mut processor = SegmentProcessor::new(&parser, &formatter, &self.options.filter);
let mut processor = self.new_segment_processor(&parser);
for (block, ts_min, i, j) in rxp.iter() {
let mut buf = Vec::with_capacity(2 * usize::try_from(block.size())?);
let mut items = Vec::with_capacity(2 * usize::try_from(block.lines_valid())?);
Expand All @@ -282,8 +281,6 @@ impl App {
} else {
eprintln!("skipped message because timestamp cannot be parsed: {:#?}", ts)
}
} else {
eprintln!("skipped message with missing timestamp")
}
});
}
Expand Down Expand Up @@ -340,6 +337,7 @@ impl App {
output.write_all(&badges[item.2].as_bytes())?;
}
output.write_all((item.0).1.bytes())?;
output.write_all(&[b'\n'])?;
match item.1.next() {
Some(head) => item.0 = head,
None => drop(workspace.swap_remove(k)),
Expand Down Expand Up @@ -443,8 +441,7 @@ impl App {
let mut workers = Vec::with_capacity(n);
for _ in 0..n {
let worker = scope.spawn(closure!(ref bfo, ref parser, ref sfi, ref input_badges, clone rxi, clone txo, |_| {
let formatter = self.formatter();
let mut processor = SegmentProcessor::new(&parser, &formatter, &self.options.filter);
let mut processor = self.new_segment_processor(&parser);
for (i, j, segment) in rxi.iter() {
let prefix = input_badges.as_ref().map(|b|b[i].as_str()).unwrap_or("");
match segment {
Expand Down Expand Up @@ -674,53 +671,89 @@ impl App {

Some(result)
}

fn new_segment_processor<'a>(&'a self, parser: &'a Parser) -> impl SegmentProcess+'a {
SegmentProcessor::new(parser, self.formatter(), &self.options.filter, self.options.allow_prefix)
}
}

// ---

pub struct SegmentProcessor<'a, F: RecordWithSourceFormatter> {
pub trait SegmentProcess {
fn run<O: RecordObserver>(&mut self, data: &[u8], buf: &mut Vec<u8>, prefix: &str, observer: &mut O);
}

// ---

pub struct SegmentProcessor<'a, F> {
parser: &'a Parser,
formatter: F,
filter: &'a Filter,
allow_prefix: bool,
}

impl<'a, F: RecordWithSourceFormatter> SegmentProcessor<'a, F> {
pub fn new(parser: &'a Parser, formatter: F, filter: &'a Filter) -> Self {
pub fn new(parser: &'a Parser, formatter: F, filter: &'a Filter, allow_prefix: bool) -> Self {
Self {
parser,
formatter,
filter,
allow_prefix,
}
}

pub fn run<O>(&mut self, data: &[u8], buf: &mut Vec<u8>, prefix: &str, observer: &mut O)
fn show_unparsed(&self) -> bool {
self.filter.is_empty()
}
}

impl<'a, F: RecordWithSourceFormatter> SegmentProcess for SegmentProcessor<'a, F> {
fn run<O>(&mut self, data: &[u8], buf: &mut Vec<u8>, prefix: &str, observer: &mut O)
where
O: RecordObserver,
{
for data in rtrim(data, b'\n').split(|c| *c == b'\n') {
if data.len() == 0 {
continue;
}
let extra_prefix = data.split(|c|*c==b'{').next().unwrap();

let extra_prefix = if self.allow_prefix {
data.split(|c|*c==b'{').next().unwrap()
} else {
b""
};

let xn = extra_prefix.len();
let json_data = &data[xn..];
let stream = json::Deserializer::from_slice(json_data).into_iter::<RawRecord>();
let mut stream = StreamDeserializerWithOffsets(stream);
let mut some = false;
let mut parsed_some = false;
let mut produced_some = false;
while let Some(Ok((record, offsets))) = stream.next() {
some = true;
if parsed_some {
buf.push(b'\n');
}
parsed_some = true;
let record = self.parser.parse(record).with_prefix(extra_prefix);
if record.matches(self.filter) {
let begin = buf.len();
buf.extend(prefix.as_bytes());
self.formatter.format_record(buf, record.with_source(&data[offsets.start..xn+offsets.end]));
let end = buf.len();
observer.observe_record(&record, begin..end);
produced_some = true;
}
}
let remainder = if some { &data[xn+stream.0.byte_offset()..] } else { data };
if remainder.len() != 0 && self.filter.is_empty() {
buf.extend_from_slice(remainder);
let remainder = if parsed_some { &data[xn+stream.0.byte_offset()..] } else { data };
if remainder.len() != 0 && self.show_unparsed() {
if !parsed_some {
buf.extend(prefix.as_bytes());
}
if !parsed_some || produced_some {
buf.extend_from_slice(remainder);
buf.push(b'\n');
}
} else if produced_some {
buf.push(b'\n');
}
}
Expand Down
7 changes: 1 addition & 6 deletions src/formatting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ pub struct RawRecordFormatter {}
impl RecordWithSourceFormatter for RawRecordFormatter {
fn format_record(&self, buf: &mut Buf, rec: model::RecordWithSource) {
buf.extend_from_slice(rec.source);
buf.push(b'\n');
}
}

Expand Down Expand Up @@ -210,10 +209,6 @@ impl RecordFormatter {
});
};
});
//
// eol
//
buf.push(b'\n')
}

fn format_field<S: StylingPush<Buf>>(
Expand Down Expand Up @@ -520,7 +515,7 @@ mod tests {
]).unwrap(),
extrax: Vec::default(),
}).unwrap(),
String::from("\u{1b}[0;2;3m00-01-02 03:04:05.123 \u{1b}[0;36m|\u{1b}[0;95mDBG\u{1b}[0;36m|\u{1b}[0;2;3m \u{1b}[0;2;4mtl:\u{1b}[0;2;3m \u{1b}[0;1;39mtm \u{1b}[0;32mka\u{1b}[0;2m:\u{1b}[0;33m{ \u{1b}[0;32mva\u{1b}[0;2m:\u{1b}[0;33m{ \u{1b}[0;32mkb\u{1b}[0;2m:\u{1b}[0;94m42\u{1b}[0;33m } }\u{1b}[0;2;3m @ tc\u{1b}[0m\n"),
String::from("\u{1b}[0;2;3m00-01-02 03:04:05.123 \u{1b}[0;36m|\u{1b}[0;95mDBG\u{1b}[0;36m|\u{1b}[0;2;3m \u{1b}[0;2;4mtl:\u{1b}[0;2;3m \u{1b}[0;1;39mtm \u{1b}[0;32mka\u{1b}[0;2m:\u{1b}[0;33m{ \u{1b}[0;32mva\u{1b}[0;2m:\u{1b}[0;33m{ \u{1b}[0;32mkb\u{1b}[0;2m:\u{1b}[0;94m42\u{1b}[0;33m } }\u{1b}[0;2;3m @ tc\u{1b}[0m"),
);
}
}
5 changes: 5 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ struct Opt {
#[arg(long)]
raw_fields: bool,
//
/// Allow non-JSON prefixes before JSON messages.
#[arg(long, env = "HL_ALLOW_PREFIX")]
allow_prefix: bool,
//
/// Number of interrupts to ignore, i.e. Ctrl-C (SIGINT).
#[arg(
long,
Expand Down Expand Up @@ -363,6 +367,7 @@ fn run() -> Result<()> {
theme: Arc::new(theme),
raw: opt.raw,
raw_fields: opt.raw_fields,
allow_prefix: opt.allow_prefix,
time_format,
buffer_size,
max_message_size,
Expand Down

0 comments on commit be42643

Please sign in to comment.