From 158df81644585431507129d8a6965d37786d0595 Mon Sep 17 00:00:00 2001 From: Pavel Ivanov Date: Sun, 8 Dec 2024 10:39:35 +0100 Subject: [PATCH] feat: added unit tests --- Cargo.lock | 68 +++++++------- Cargo.toml | 2 +- build/ci/coverage.sh | 15 ++- sample/test.log | 1 + src/index.rs | 85 ++++++++++++++--- src/input.rs | 214 ++++++++++++++++++++++++++++++++++++++----- src/settings.rs | 9 +- src/themecfg.rs | 2 +- 8 files changed, 311 insertions(+), 85 deletions(-) create mode 100644 sample/test.log diff --git a/Cargo.lock b/Cargo.lock index 845cc0f3..9c187d7b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -125,7 +125,7 @@ checksum = "721cae7de5c34fbb2acd27e21e6d2cf7b886dce0c27388d46c4e6c47ea4318dd" dependencies = [ "proc-macro2", "quote", - "syn 2.0.90", + "syn", ] [[package]] @@ -196,7 +196,7 @@ checksum = "62f7e0e71f98d6c71bfe42b0a7a47d0f870ad808401fad2d44fa156ed5b0ae03" dependencies = [ "proc-macro2", "quote", - "syn 2.0.90", + "syn", ] [[package]] @@ -381,7 +381,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.90", + "syn", ] [[package]] @@ -605,7 +605,7 @@ dependencies = [ "ident_case", "proc-macro2", "quote", - "syn 2.0.90", + "syn", ] [[package]] @@ -616,7 +616,7 @@ checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806" dependencies = [ "darling_core", "quote", - "syn 2.0.90", + "syn", ] [[package]] @@ -632,14 +632,23 @@ dependencies = [ ] [[package]] -name = "derive_deref" -version = "1.1.1" +name = "derive_more" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dcdbcee2d9941369faba772587a565f4f534e42cb8d17e5295871de730163b2b" +checksum = "4a9b99b9cbbe49445b21764dc0625032a89b145a2642e67603e1c936f5458d05" +dependencies = [ + "derive_more-impl", +] + +[[package]] +name = "derive_more-impl" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb7330aeadfbe296029522e6c40f315320aba36fc43a5b3632f3795348f3bd22" dependencies = [ "proc-macro2", "quote", - "syn 1.0.109", + "syn", ] [[package]] @@ -724,7 +733,7 @@ checksum = "f282cfdfe92516eb26c2af8589c274c7c17681f5ecc03c18255fe741c6aa64eb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.90", + "syn", ] [[package]] @@ -752,7 +761,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.90", + "syn", ] [[package]] @@ -952,7 +961,7 @@ dependencies = [ "crossbeam-queue", "crossbeam-utils", "deko", - "derive_deref", + "derive_more", "dirs", "dirs-sys", "encstr", @@ -1415,7 +1424,7 @@ dependencies = [ "pest_meta", "proc-macro2", "quote", - "syn 2.0.90", + "syn", ] [[package]] @@ -1641,7 +1650,7 @@ dependencies = [ "proc-macro2", "quote", "rust-embed-utils", - "syn 2.0.90", + "syn", "walkdir", ] @@ -1723,7 +1732,7 @@ checksum = "ad1e866f866923f252f05c889987993144fb74e722403468a4ebd70c3cd756c0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.90", + "syn", ] [[package]] @@ -1857,18 +1866,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.90", -] - -[[package]] -name = "syn" -version = "1.0.109" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" -dependencies = [ - "proc-macro2", - "quote", - "unicode-ident", + "syn", ] [[package]] @@ -1890,7 +1888,7 @@ checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" dependencies = [ "proc-macro2", "quote", - "syn 2.0.90", + "syn", ] [[package]] @@ -1929,7 +1927,7 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.90", + "syn", ] [[package]] @@ -1940,7 +1938,7 @@ checksum = "8381894bb3efe0c4acac3ded651301ceee58a15d47c2e34885ed1908ad667061" dependencies = [ "proc-macro2", "quote", - "syn 2.0.90", + "syn", ] [[package]] @@ -2079,7 +2077,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.90", + "syn", "wasm-bindgen-shared", ] @@ -2101,7 +2099,7 @@ checksum = "98c9ae5a76e46f4deecd0f0255cc223cfa18dc9b261213b8aa0c7b36f61b3f1d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.90", + "syn", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -2370,7 +2368,7 @@ checksum = "2380878cad4ac9aac1e2435f3eb4020e8374b5f13c296cb75b4620ff8e229154" dependencies = [ "proc-macro2", "quote", - "syn 2.0.90", + "syn", "synstructure", ] @@ -2391,7 +2389,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.90", + "syn", ] [[package]] @@ -2411,7 +2409,7 @@ checksum = "595eed982f7d355beb85837f651fa22e90b3c044842dc7f2c2842c086f295808" dependencies = [ "proc-macro2", "quote", - "syn 2.0.90", + "syn", "synstructure", ] diff --git a/Cargo.toml b/Cargo.toml index a05ebe2c..b214e97d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,7 +43,7 @@ crossbeam-channel = "0" crossbeam-queue = "0" crossbeam-utils = "0" deko = "0" -derive_deref = "1" +derive_more = {version="1", features = ["deref", "from"]} dirs = "5" dirs-sys = "0" encstr = { path = "./crate/encstr" } diff --git a/build/ci/coverage.sh b/build/ci/coverage.sh index d31efcfb..c9392605 100755 --- a/build/ci/coverage.sh +++ b/build/ci/coverage.sh @@ -41,18 +41,15 @@ function clean() { function test() { cargo test --tests --workspace cargo build - ${MAIN_EXECUTABLE:?} > /dev/null + ${MAIN_EXECUTABLE:?} --config= > /dev/null ${MAIN_EXECUTABLE:?} --config= --help > /dev/null ${MAIN_EXECUTABLE:?} --config=etc/defaults/config-k8s.yaml > /dev/null ${MAIN_EXECUTABLE:?} --config=etc/defaults/config-ecs.yaml > /dev/null - ${MAIN_EXECUTABLE:?} --shell-completions bash > /dev/null - ${MAIN_EXECUTABLE:?} --man-page > /dev/null - ${MAIN_EXECUTABLE:?} --list-themes > /dev/null - echo "" | ${MAIN_EXECUTABLE:?} --concurrency 4 > /dev/null - if ${MAIN_EXECUTABLE:?} -s test.log.gz 2>/dev/null > /dev/null; then - echo "Expected combination of options `-s 1234.log.gz` to fail" - exit 1 - fi + ${MAIN_EXECUTABLE:?} --config= --shell-completions bash > /dev/null + ${MAIN_EXECUTABLE:?} --config= --man-page > /dev/null + ${MAIN_EXECUTABLE:?} --config= --list-themes > /dev/null + ${MAIN_EXECUTABLE:?} --config= sample/prometheus.log -P > /dev/null + echo "" | ${MAIN_EXECUTABLE:?} --config= --concurrency 4 > /dev/null } function merge() { diff --git a/sample/test.log b/sample/test.log new file mode 100644 index 00000000..8d66ed89 --- /dev/null +++ b/sample/test.log @@ -0,0 +1 @@ +{"ts":"2024-10-01T01:02:03Z","level":"info","msg":"some test message"} \ No newline at end of file diff --git a/src/index.rs b/src/index.rs index 194f407d..a9a51c1c 100644 --- a/src/index.rs +++ b/src/index.rs @@ -12,12 +12,12 @@ // std imports use std::{ cmp::{max, min}, - convert::{TryFrom, TryInto}, + convert::{Into, TryFrom, TryInto}, fmt::{self, Display}, fs::File, io::{Read, Write}, iter::empty, - num::NonZeroU32, + num::{NonZero, NonZeroU32}, path::{Path, PathBuf}, sync::Arc, time::{SystemTime, UNIX_EPOCH}, @@ -29,7 +29,9 @@ use closure::closure; use crossbeam_channel as channel; use crossbeam_channel::RecvError; use crossbeam_utils::thread; +use derive_more::{Deref, From}; use itertools::izip; +use nonzero_ext::nonzero; use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; @@ -125,9 +127,10 @@ impl std::ops::Sub for Timestamp { // --- +#[derive(Default)] pub struct IndexerSettings<'a> { - buffer_size: NonZeroU32, - max_message_size: NonZeroU32, + buffer_size: BufferSize, + max_message_size: MessageSize, fields: &'a PredefinedFields, delimiter: Delimiter, allow_prefix: bool, @@ -137,8 +140,8 @@ pub struct IndexerSettings<'a> { impl<'a> IndexerSettings<'a> { pub fn new( - buffer_size: NonZeroU32, - max_message_size: NonZeroU32, + buffer_size: BufferSize, + max_message_size: MessageSize, fields: &'a PredefinedFields, delimiter: Delimiter, allow_prefix: bool, @@ -174,6 +177,64 @@ impl<'a> IndexerSettings<'a> { } } +#[derive(Deref, From, Serialize, Deserialize)] +pub struct BufferSize(NonZeroU32); + +impl Default for BufferSize { + fn default() -> Self { + Self(nonzero!(4 * 1024u32)) + } +} + +impl TryFrom> for BufferSize { + type Error = std::num::TryFromIntError; + + fn try_from(value: NonZero) -> std::result::Result { + Ok(Self(value.try_into()?)) + } +} + +impl From for NonZeroU32 { + fn from(value: BufferSize) -> NonZeroU32 { + value.0.into() + } +} + +impl From for u32 { + fn from(value: BufferSize) -> u32 { + value.0.into() + } +} + +#[derive(Deref, From, Serialize, Deserialize)] +pub struct MessageSize(NonZeroU32); + +impl Default for MessageSize { + fn default() -> Self { + Self(nonzero!(64 * 1024u32)) + } +} + +impl TryFrom> for MessageSize { + type Error = std::num::TryFromIntError; + + fn try_from(value: NonZero) -> std::result::Result { + Ok(Self(value.try_into()?)) + } +} + +impl From for NonZeroU32 { + fn from(value: MessageSize) -> NonZeroU32 { + value.0.into() + } +} + +impl From for u32 { + fn from(value: MessageSize) -> u32 { + value.0.into() + } +} + // --- /// Allows log files indexing to enable message sorting. @@ -290,9 +351,7 @@ impl Indexer { self.build_index_from_stream(stream, &source_path, &index_path, meta, existing_index) } - /// Builds index for the given stream. - /// - /// Builds the index and returns it. + /// Builds an in-memory index for the given stream. pub fn index_in_memory(&self, input: &mut Reader) -> Result { self.process_file( &PathBuf::from(""), @@ -1092,8 +1151,8 @@ mod tests { 1, PathBuf::from("/tmp/cache"), IndexerSettings::new( - NonZeroU32::new(1024).unwrap(), - NonZeroU32::new(1024).unwrap(), + BufferSize::from(nonzero!(1024u32)), + MessageSize::from(nonzero!(1024u32)), &PredefinedFields::default(), Delimiter::default(), false, @@ -1154,8 +1213,8 @@ mod tests { 1, PathBuf::from("/tmp/cache"), IndexerSettings::new( - NonZeroU32::new(1024).unwrap(), - NonZeroU32::new(1024).unwrap(), + BufferSize::from(nonzero!(1024u32)), + MessageSize::from(nonzero!(1024u32)), &PredefinedFields::default(), Delimiter::default(), false, diff --git a/src/input.rs b/src/input.rs index f3332ba3..46f8b86c 100644 --- a/src/input.rs +++ b/src/input.rs @@ -1,12 +1,14 @@ // std imports -use std::cmp::min; -use std::convert::TryInto; -use std::fs::{File, Metadata}; -use std::io::{self, stdin, BufRead, BufReader, Cursor, Read, Seek, SeekFrom}; -use std::mem::size_of_val; -use std::ops::{Deref, Range}; -use std::path::PathBuf; -use std::sync::{Arc, Mutex}; +use std::{ + cmp::min, + convert::TryInto, + fs::{File, Metadata}, + io::{self, stdin, BufRead, BufReader, Cursor, Read, Seek, SeekFrom}, + mem::size_of_val, + ops::{Deref, Range}, + path::PathBuf, + sync::{Arc, Mutex}, +}; // third-party imports use deko::{bufread::AnyDecoder, Format}; @@ -29,18 +31,12 @@ pub type BufPool = SQPool>; // --- /// A reference to an input file or stdin. -#[derive(Clone)] +#[derive(Clone, PartialEq, Eq, Debug)] pub enum InputReference { Stdin, File(PathBuf), } -impl Into> for InputReference { - fn into(self) -> io::Result { - self.hold() - } -} - impl InputReference { /// Preliminarily opens the input file to ensure it exists and is readable /// and protect it from being suddenly deleted while we need it. @@ -230,15 +226,12 @@ impl Input { const BUF_SIZE: usize = 64 * 1024; let mut scratch = [0; BUF_SIZE]; let mut count: u64 = 0; - let mut prev_pos = stream.seek(SeekFrom::End(0))?; - let mut pos = prev_pos; - while pos > 0 { - pos -= min(BUF_SIZE as u64, pos); + let mut pos = stream.seek(SeekFrom::End(0))?; + while pos != 0 { + let n = min(BUF_SIZE as u64, pos); + pos -= n; pos = stream.seek(SeekFrom::Start(pos))?; - if pos == prev_pos { - break; - } - let bn = min(BUF_SIZE, (prev_pos - pos) as usize); + let bn = n as usize; let buf = scratch[..bn].as_mut(); stream.read_exact(buf)?; @@ -252,8 +245,6 @@ impl Input { count += 1; } } - - prev_pos = pos; } stream.seek(SeekFrom::Start(pos as u64))?; Ok(()) @@ -353,6 +344,16 @@ impl Read for Stream { } } +impl Meta for Stream { + #[inline] + fn metadata(&self) -> io::Result> { + match self { + Self::Sequential(stream) => stream.metadata(), + Self::RandomAccess(stream) => stream.metadata(), + } + } +} + // --- /// A wrapper around a stream that adds context to the returned errors. @@ -829,10 +830,123 @@ impl Meta for WithMetadata { #[cfg(test)] mod tests { + use itertools::Itertools; + + use crate::index::IndexerSettings; + use super::*; use std::io::ErrorKind; use std::io::Read; + #[test] + fn test_input_reference() { + let reference = InputReference::Stdin; + assert_eq!(reference.description(), ""); + assert_eq!(reference.path(), None); + let input = reference.open().unwrap(); + assert_eq!(input.reference, reference); + let reference = InputReference::File(PathBuf::from("test.log")); + assert_eq!(reference.description(), "file '\u{1b}[33mtest.log\u{1b}[0m'"); + assert_eq!(reference.path(), Some(&PathBuf::from("test.log"))); + } + + #[test] + fn test_input_holder() { + let reference = InputReference::File(PathBuf::from("sample/test.log")); + let holder = InputHolder::new(reference, None); + let mut stream = holder.stream().unwrap(); + let mut buf = Vec::new(); + let n = stream.read_to_end(&mut buf).unwrap(); + assert!(matches!(stream, Stream::RandomAccess(_))); + let stream = stream.as_sequential(); + let meta = stream.metadata().unwrap(); + assert_eq!(meta.is_some(), true); + assert_eq!(n, 70); + assert_eq!( + buf, + br#"{"ts":"2024-10-01T01:02:03Z","level":"info","msg":"some test message"}"# + ); + } + + #[test] + fn test_input() { + let input = Input::stdin().unwrap(); + assert!(matches!(input.stream, Stream::Sequential(_))); + assert_eq!(input.reference.description(), ""); + let input = Input::open(&PathBuf::from("sample/prometheus.log")).unwrap(); + assert!(matches!(input.stream, Stream::RandomAccess(_))); + assert_eq!( + input.reference.description(), + "file '\u{1b}[33msample/prometheus.log\u{1b}[0m'" + ); + } + + #[test] + fn test_input_tail() { + let input = Input::stdin().unwrap().tail(1).unwrap(); + assert!(matches!(input.stream, Stream::Sequential(_))); + + for &(filename, requested, expected) in &[ + ("sample/test.log", 1, 1), + ("sample/test.log", 2, 1), + ("sample/prometheus.log", 2, 2), + ] { + let input = Input::open(&PathBuf::from(filename)).unwrap().tail(requested).unwrap(); + let mut buf = Vec::new(); + let n = input.stream.into_sequential().read_to_end(&mut buf).unwrap(); + assert!(n > 0); + assert_eq!(buf.lines().count(), expected); + } + } + + #[test] + fn test_stream() { + let stream = Stream::Sequential(Box::new(Cursor::new(b"test"))); + let stream = stream.verified().decoded().tagged(InputReference::Stdin); + assert!(matches!(stream, Stream::Sequential(_))); + let mut buf = Vec::new(); + let n = stream.into_sequential().read_to_end(&mut buf).unwrap(); + assert_eq!(n, 4); + assert_eq!(buf, b"test"); + + let stream = Stream::RandomAccess(Box::new(UnseekableReader(Cursor::new(b"test")))); + let stream = stream.tagged(InputReference::Stdin).verified(); + assert!(matches!(stream, Stream::Sequential(_))); + let mut buf = Vec::new(); + let n = stream.into_sequential().read_to_end(&mut buf).unwrap(); + assert_eq!(n, 4); + assert_eq!(buf, b"test"); + + let stream = Stream::RandomAccess(Box::new(UnseekableReader(Cursor::new(b"test")))); + assert!(matches!(stream.metadata(), Ok(None))); + let stream = stream.tagged(InputReference::Stdin).decoded(); + assert!(matches!(stream, Stream::Sequential(_))); + assert!(matches!(stream.metadata(), Ok(None))); + let mut buf = Vec::new(); + let n = stream.into_sequential().read_to_end(&mut buf).unwrap(); + assert_eq!(n, 4); + assert_eq!(buf, b"test"); + + // echo 't' | gzip -cf | xxd -p | sed 's/\(..\)/\\x\1/g' + let data = b"\x1f\x8b\x08\x00\x03\x87\x55\x67\x00\x03\x2b\xe1\x02\x00\x13\x47\x5f\xea\x02\x00\x00\x00"; + let stream = Stream::RandomAccess(Box::new(Cursor::new(data).with_metadata(None))); + let stream = stream.tagged(InputReference::Stdin).decoded(); + assert!(matches!(stream, Stream::Sequential(_))); + let mut buf = Vec::new(); + let n = stream.into_sequential().read_to_end(&mut buf).unwrap(); + assert_eq!(n, 2); + assert_eq!(buf, b"t\n"); + + let stream = Stream::RandomAccess(Box::new(FailingReader)); + let stream = stream.tagged(InputReference::Stdin).decoded(); + assert!(matches!(stream, Stream::Sequential(_))); + let mut buf = [0; 128]; + let result = stream.into_sequential().read(&mut buf); + assert!(result.is_err()); + let err = result.unwrap_err(); + assert_eq!(err.kind(), ErrorKind::Other); + } + #[test] fn test_input_read_error() { let reference = InputReference::File(PathBuf::from("test.log")); @@ -881,6 +995,23 @@ mod tests { assert_eq!(buf, b"test\n"); } + #[test] + fn test_indexed_input() { + let data = br#"{"ts":"2024-10-01T01:02:03Z","level":"info","msg":"some test message"}\n"#; + let stream = Stream::RandomAccess(Box::new(Cursor::new(data))); + let indexer = Indexer::new(1, PathBuf::new(), IndexerSettings::default()); + let input = IndexedInput::from_stream(InputReference::Stdin, stream, &indexer).unwrap(); + let mut blocks = input.into_blocks().collect_vec(); + assert_eq!(blocks.len(), 1); + let block = blocks.drain(..).next().unwrap(); + assert_eq!(block.lines_valid(), 1); + let mut lines = block.into_lines().unwrap().collect_vec(); + let line = lines.drain(..).next().unwrap(); + assert_eq!(line.bytes(), data); + } + + // --- + struct FailingReader; impl Read for FailingReader { @@ -889,9 +1020,42 @@ mod tests { } } + impl Seek for FailingReader { + fn seek(&mut self, from: SeekFrom) -> std::io::Result { + match from { + SeekFrom::Start(0) => Ok(0), + SeekFrom::Current(0) => Ok(0), + SeekFrom::End(0) => Ok(0), + _ => Err(std::io::Error::new(std::io::ErrorKind::Other, "seek error")), + } + } + } + impl Meta for FailingReader { fn metadata(&self) -> std::io::Result> { Ok(None) } } + + // --- + + struct UnseekableReader(R); + + impl Read for UnseekableReader { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + self.0.read(buf) + } + } + + impl Seek for UnseekableReader { + fn seek(&mut self, _: SeekFrom) -> std::io::Result { + Err(std::io::Error::new(std::io::ErrorKind::Other, "seek error")) + } + } + + impl Meta for UnseekableReader { + fn metadata(&self) -> std::io::Result> { + Ok(None) + } + } } diff --git a/src/settings.rs b/src/settings.rs index 84ab32e1..26cd9558 100644 --- a/src/settings.rs +++ b/src/settings.rs @@ -5,7 +5,7 @@ use std::include_str; // third-party imports use chrono_tz::Tz; use config::{Config, File, FileFormat}; -use derive_deref::Deref; +use derive_more::Deref; use once_cell::sync::Lazy; use serde::{Deserialize, Serialize, Serializer}; use strum::IntoEnumIterator; @@ -113,6 +113,13 @@ pub struct PredefinedFields { pub caller_line: CallerLineField, } +impl Default for &PredefinedFields { + fn default() -> Self { + static DEFAULT: Lazy = Lazy::new(|| PredefinedFields::default()); + &DEFAULT + } +} + // --- #[derive(Debug, Serialize, Deserialize, Deref, Clone, PartialEq, Eq)] diff --git a/src/themecfg.rs b/src/themecfg.rs index a9540f50..bd4527c7 100644 --- a/src/themecfg.rs +++ b/src/themecfg.rs @@ -9,7 +9,7 @@ use std::{ }; // third-party imports -use derive_deref::Deref; +use derive_more::Deref; use enum_map::Enum; use rust_embed::RustEmbed; use serde::{