Skip to content

Commit

Permalink
feat: added unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
pamburus committed Dec 8, 2024
1 parent 1746923 commit 0ad49ac
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 34 deletions.
15 changes: 6 additions & 9 deletions build/ci/coverage.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,15 @@ function clean() {
function test() {
cargo test --tests --workspace
cargo build
${MAIN_EXECUTABLE:?} > /dev/null
${MAIN_EXECUTABLE:?} --config= > /dev/null
${MAIN_EXECUTABLE:?} --config= --help > /dev/null
${MAIN_EXECUTABLE:?} --config=etc/defaults/config-k8s.yaml > /dev/null
${MAIN_EXECUTABLE:?} --config=etc/defaults/config-ecs.yaml > /dev/null
${MAIN_EXECUTABLE:?} --shell-completions bash > /dev/null
${MAIN_EXECUTABLE:?} --man-page > /dev/null
${MAIN_EXECUTABLE:?} --list-themes > /dev/null
echo "" | ${MAIN_EXECUTABLE:?} --concurrency 4 > /dev/null
if ${MAIN_EXECUTABLE:?} -s test.log.gz 2>/dev/null > /dev/null; then
echo "Expected combination of options `-s 1234.log.gz` to fail"
exit 1
fi
${MAIN_EXECUTABLE:?} --config= --shell-completions bash > /dev/null
${MAIN_EXECUTABLE:?} --config= --man-page > /dev/null
${MAIN_EXECUTABLE:?} --config= --list-themes > /dev/null
${MAIN_EXECUTABLE:?} --config= sample/prometheus.log -P > /dev/null
echo "" | ${MAIN_EXECUTABLE:?} --config= --concurrency 4 > /dev/null
}

function merge() {
Expand Down
1 change: 1 addition & 0 deletions sample/test.log
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"ts":"2024-10-01T01:02:03Z","level":"info","msg":"some test message"}
183 changes: 158 additions & 25 deletions src/input.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
// std imports
use std::cmp::min;
use std::convert::TryInto;
use std::fs::{File, Metadata};
use std::io::{self, stdin, BufRead, BufReader, Cursor, Read, Seek, SeekFrom};
use std::mem::size_of_val;
use std::ops::{Deref, Range};
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::{
cmp::min,
convert::TryInto,
fs::{File, Metadata},
io::{self, stdin, BufRead, BufReader, Cursor, Read, Seek, SeekFrom},
mem::size_of_val,
ops::{Deref, Range},
path::PathBuf,
sync::{Arc, Mutex},
};

// third-party imports
use deko::{bufread::AnyDecoder, Format};
Expand All @@ -29,18 +31,12 @@ pub type BufPool = SQPool<Vec<u8>>;
// ---

/// A reference to an input file or stdin.
#[derive(Clone)]
#[derive(Clone, PartialEq, Eq, Debug)]
pub enum InputReference {
Stdin,
File(PathBuf),
}

impl Into<io::Result<InputHolder>> for InputReference {
fn into(self) -> io::Result<InputHolder> {
self.hold()
}
}

impl InputReference {
/// Preliminarily opens the input file to ensure it exists and is readable
/// and protect it from being suddenly deleted while we need it.
Expand Down Expand Up @@ -230,15 +226,12 @@ impl Input {
const BUF_SIZE: usize = 64 * 1024;
let mut scratch = [0; BUF_SIZE];
let mut count: u64 = 0;
let mut prev_pos = stream.seek(SeekFrom::End(0))?;
let mut pos = prev_pos;
while pos > 0 {
pos -= min(BUF_SIZE as u64, pos);
let mut pos = stream.seek(SeekFrom::End(0))?;
while pos != 0 {
let n = min(BUF_SIZE as u64, pos);
pos -= n;
pos = stream.seek(SeekFrom::Start(pos))?;
if pos == prev_pos {
break;
}
let bn = min(BUF_SIZE, (prev_pos - pos) as usize);
let bn = n as usize;
let buf = scratch[..bn].as_mut();

stream.read_exact(buf)?;
Expand All @@ -252,8 +245,6 @@ impl Input {
count += 1;
}
}

prev_pos = pos;
}
stream.seek(SeekFrom::Start(pos as u64))?;
Ok(())
Expand Down Expand Up @@ -833,6 +824,113 @@ mod tests {
use std::io::ErrorKind;
use std::io::Read;

#[test]
fn test_input_reference() {
let reference = InputReference::Stdin;
assert_eq!(reference.description(), "<stdin>");
assert_eq!(reference.path(), None);
let input = reference.open().unwrap();
assert_eq!(input.reference, reference);
let reference = InputReference::File(PathBuf::from("test.log"));
assert_eq!(reference.description(), "file '\u{1b}[33mtest.log\u{1b}[0m'");
assert_eq!(reference.path(), Some(&PathBuf::from("test.log")));
}

#[test]
fn test_input_holder() {
let reference = InputReference::File(PathBuf::from("sample/test.log"));
let holder = InputHolder::new(reference, None);
let mut stream = holder.stream().unwrap();
let mut buf = Vec::new();
let n = stream.read_to_end(&mut buf).unwrap();
assert!(matches!(stream, Stream::RandomAccess(_)));
let stream = stream.as_sequential();
let meta = stream.metadata().unwrap();
assert_eq!(meta.is_some(), true);
assert_eq!(n, 70);
assert_eq!(
buf,
br#"{"ts":"2024-10-01T01:02:03Z","level":"info","msg":"some test message"}"#
);
}

#[test]
fn test_input() {
let input = Input::stdin().unwrap();
assert!(matches!(input.stream, Stream::Sequential(_)));
assert_eq!(input.reference.description(), "<stdin>");
let input = Input::open(&PathBuf::from("sample/prometheus.log")).unwrap();
assert!(matches!(input.stream, Stream::RandomAccess(_)));
assert_eq!(
input.reference.description(),
"file '\u{1b}[33msample/prometheus.log\u{1b}[0m'"
);
}

#[test]
fn test_input_tail() {
let input = Input::stdin().unwrap().tail(1).unwrap();
assert!(matches!(input.stream, Stream::Sequential(_)));

for &(filename, requested, expected) in &[
("sample/test.log", 1, 1),
("sample/test.log", 2, 1),
("sample/prometheus.log", 2, 2),
] {
let input = Input::open(&PathBuf::from(filename)).unwrap().tail(requested).unwrap();
let mut buf = Vec::new();
let n = input.stream.into_sequential().read_to_end(&mut buf).unwrap();
assert!(n > 0);
assert_eq!(buf.lines().count(), expected);
}
}

#[test]
fn test_stream() {
let stream = Stream::Sequential(Box::new(Cursor::new(b"test")));
let stream = stream.verified().decoded().tagged(InputReference::Stdin);
assert!(matches!(stream, Stream::Sequential(_)));
let mut buf = Vec::new();
let n = stream.into_sequential().read_to_end(&mut buf).unwrap();
assert_eq!(n, 4);
assert_eq!(buf, b"test");

let stream = Stream::RandomAccess(Box::new(UnseekableReader(Cursor::new(b"test"))));
let stream = stream.tagged(InputReference::Stdin).verified();
assert!(matches!(stream, Stream::Sequential(_)));
let mut buf = Vec::new();
let n = stream.into_sequential().read_to_end(&mut buf).unwrap();
assert_eq!(n, 4);
assert_eq!(buf, b"test");

let stream = Stream::RandomAccess(Box::new(UnseekableReader(Cursor::new(b"test"))));
let stream = stream.tagged(InputReference::Stdin).decoded();
assert!(matches!(stream, Stream::Sequential(_)));
let mut buf = Vec::new();
let n = stream.into_sequential().read_to_end(&mut buf).unwrap();
assert_eq!(n, 4);
assert_eq!(buf, b"test");

// echo 't' | gzip -cf | xxd -p | sed 's/\(..\)/\\x\1/g'
let data = b"\x1f\x8b\x08\x00\x03\x87\x55\x67\x00\x03\x2b\xe1\x02\x00\x13\x47\x5f\xea\x02\x00\x00\x00";
let stream = Stream::RandomAccess(Box::new(Cursor::new(data).with_metadata(None)));
let stream = stream.tagged(InputReference::Stdin).decoded();
assert!(matches!(stream, Stream::Sequential(_)));
let mut buf = Vec::new();
let n = stream.into_sequential().read_to_end(&mut buf).unwrap();
assert_eq!(n, 2);
assert_eq!(buf, b"t\n");

let stream = Stream::RandomAccess(Box::new(FailingReader));
let stream = stream.tagged(InputReference::Stdin).decoded();
assert!(matches!(stream, Stream::Sequential(_)));
let mut buf = [0; 128];
let result = stream.into_sequential().read(&mut buf);
assert!(result.is_err());
let err = result.unwrap_err();
assert_eq!(err.kind(), ErrorKind::Other);
}

#[test]
fn test_input_read_error() {
let reference = InputReference::File(PathBuf::from("test.log"));
Expand Down Expand Up @@ -881,6 +979,8 @@ mod tests {
assert_eq!(buf, b"test\n");
}

// ---

struct FailingReader;

impl Read for FailingReader {
Expand All @@ -889,9 +989,42 @@ mod tests {
}
}

impl Seek for FailingReader {
fn seek(&mut self, from: SeekFrom) -> std::io::Result<u64> {
match from {
SeekFrom::Start(0) => Ok(0),
SeekFrom::Current(0) => Ok(0),
SeekFrom::End(0) => Ok(0),
_ => Err(std::io::Error::new(std::io::ErrorKind::Other, "seek error")),

Check warning on line 998 in src/input.rs

View check run for this annotation

Codecov / codecov/patch

src/input.rs#L997-L998

Added lines #L997 - L998 were not covered by tests
}
}
}

impl Meta for FailingReader {
fn metadata(&self) -> std::io::Result<Option<std::fs::Metadata>> {
Ok(None)
}
}

// ---

struct UnseekableReader<R>(R);

impl<R: Read> Read for UnseekableReader<R> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
self.0.read(buf)
}
}

impl<R> Seek for UnseekableReader<R> {
fn seek(&mut self, _: SeekFrom) -> std::io::Result<u64> {
Err(std::io::Error::new(std::io::ErrorKind::Other, "seek error"))
}
}

impl<R> Meta for UnseekableReader<R> {
fn metadata(&self) -> std::io::Result<Option<std::fs::Metadata>> {
Ok(None)
}

Check warning on line 1028 in src/input.rs

View check run for this annotation

Codecov / codecov/patch

src/input.rs#L1026-L1028

Added lines #L1026 - L1028 were not covered by tests
}
}

0 comments on commit 0ad49ac

Please sign in to comment.