diff --git a/Cargo.lock b/Cargo.lock index 2274b4f9..cfa2bb23 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -125,7 +125,7 @@ checksum = "721cae7de5c34fbb2acd27e21e6d2cf7b886dce0c27388d46c4e6c47ea4318dd" dependencies = [ "proc-macro2", "quote", - "syn 2.0.90", + "syn", ] [[package]] @@ -196,7 +196,7 @@ checksum = "62f7e0e71f98d6c71bfe42b0a7a47d0f870ad808401fad2d44fa156ed5b0ae03" dependencies = [ "proc-macro2", "quote", - "syn 2.0.90", + "syn", ] [[package]] @@ -381,7 +381,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.90", + "syn", ] [[package]] @@ -400,6 +400,12 @@ dependencies = [ "roff", ] +[[package]] +name = "clean-path" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aaa6b4b263a5d737e9bf6b7c09b72c41a5480aec4d7219af827f6564e950b6a5" + [[package]] name = "closure" version = "0.3.0" @@ -605,7 +611,7 @@ dependencies = [ "ident_case", "proc-macro2", "quote", - "syn 2.0.90", + "syn", ] [[package]] @@ -616,7 +622,7 @@ checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806" dependencies = [ "darling_core", "quote", - "syn 2.0.90", + "syn", ] [[package]] @@ -632,14 +638,23 @@ dependencies = [ ] [[package]] -name = "derive_deref" -version = "1.1.1" +name = "derive_more" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a9b99b9cbbe49445b21764dc0625032a89b145a2642e67603e1c936f5458d05" +dependencies = [ + "derive_more-impl", +] + +[[package]] +name = "derive_more-impl" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dcdbcee2d9941369faba772587a565f4f534e42cb8d17e5295871de730163b2b" +checksum = "cb7330aeadfbe296029522e6c40f315320aba36fc43a5b3632f3795348f3bd22" dependencies = [ "proc-macro2", "quote", - "syn 1.0.109", + "syn", ] [[package]] @@ -682,6 +697,12 @@ dependencies = [ "const-random", ] +[[package]] +name = "downcast" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1" + [[package]] name = "either" version = "1.13.0" @@ -724,7 +745,7 @@ checksum = "f282cfdfe92516eb26c2af8589c274c7c17681f5ecc03c18255fe741c6aa64eb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.90", + "syn", ] [[package]] @@ -752,7 +773,30 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.90", + "syn", +] + +[[package]] +name = "env_filter" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f2c92ceda6ceec50f43169f9ee8424fe2db276791afde7b2cd8bc084cb376ab" +dependencies = [ + "log", + "regex", +] + +[[package]] +name = "env_logger" +version = "0.11.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13fa619b91fb2381732789fc5de83b45675e882f66623b7d8cb4f643017018d" +dependencies = [ + "anstream", + "anstyle", + "env_filter", + "humantime", + "log", ] [[package]] @@ -799,6 +843,12 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "fragile" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa" + [[package]] name = "fsevent-sys" version = "4.1.0" @@ -915,7 +965,7 @@ checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" [[package]] name = "hl" -version = "0.30.0-alpha.3" +version = "0.30.0-alpha.4" dependencies = [ "bincode", "byte-strings", @@ -927,6 +977,7 @@ dependencies = [ "clap", "clap_complete", "clap_mangen", + "clean-path", "closure", "collection_macros", "config", @@ -935,13 +986,14 @@ dependencies = [ "crossbeam-queue", "crossbeam-utils", "deko", - "derive_deref", + "derive_more", "dirs", "dirs-sys", "encstr", "enum-map", "enumset", "enumset-ext", + "env_logger", "flate2", "heapless", "hex", @@ -950,14 +1002,16 @@ dependencies = [ "itertools 0.13.0", "itoa", "kqueue", + "log", "maplit", + "mockall", + "nonzero_ext", "notify", "nu-ansi-term", "num_cpus", "once_cell", "pest", "pest_derive", - "phf", "regex", "rust-embed", "serde", @@ -1245,6 +1299,38 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "mockall" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39a6bfcc6c8c7eed5ee98b9c3e33adc726054389233e201c95dab2d41a3839d2" +dependencies = [ + "cfg-if", + "downcast", + "fragile", + "mockall_derive", + "predicates", + "predicates-tree", +] + +[[package]] +name = "mockall_derive" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25ca3004c2efe9011bd4e461bd8256445052b9615405b4f7ea43fc8ca5c20898" +dependencies = [ + "cfg-if", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "nonzero_ext" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21" + [[package]] name = "notify" version = "7.0.0" @@ -1375,7 +1461,7 @@ dependencies = [ "pest_meta", "proc-macro2", "quote", - "syn 2.0.90", + "syn", ] [[package]] @@ -1395,7 +1481,6 @@ version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ade2d8b8f33c7333b51bcf0428d37e217e9f32192ae4772156f65063b8ce03dc" dependencies = [ - "phf_macros", "phf_shared", ] @@ -1419,19 +1504,6 @@ dependencies = [ "rand", ] -[[package]] -name = "phf_macros" -version = "0.11.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3444646e286606587e49f3bcf1679b8cef1dc2c5ecc29ddacaffc305180d464b" -dependencies = [ - "phf_generator", - "phf_shared", - "proc-macro2", - "quote", - "syn 2.0.90", -] - [[package]] name = "phf_shared" version = "0.11.2" @@ -1475,6 +1547,32 @@ dependencies = [ "plotters-backend", ] +[[package]] +name = "predicates" +version = "3.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e9086cc7640c29a356d1a29fd134380bee9d8f79a17410aa76e7ad295f42c97" +dependencies = [ + "anstyle", + "predicates-core", +] + +[[package]] +name = "predicates-core" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae8177bee8e75d6846599c6b9ff679ed51e882816914eec639944d7c9aa11931" + +[[package]] +name = "predicates-tree" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41b740d195ed3166cd147c8047ec98db0e22ec019eb8eeb76d343b795304fb13" +dependencies = [ + "predicates-core", + "termtree", +] + [[package]] name = "proc-macro2" version = "1.0.92" @@ -1615,7 +1713,7 @@ dependencies = [ "proc-macro2", "quote", "rust-embed-utils", - "syn 2.0.90", + "syn", "walkdir", ] @@ -1698,7 +1796,7 @@ checksum = "46f859dbbf73865c6627ed570e78961cd3ac92407a2d117204c49232485da55e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.90", + "syn", ] [[package]] @@ -1832,18 +1930,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.90", -] - -[[package]] -name = "syn" -version = "1.0.109" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" -dependencies = [ - "proc-macro2", - "quote", - "unicode-ident", + "syn", ] [[package]] @@ -1865,7 +1952,7 @@ checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" dependencies = [ "proc-macro2", "quote", - "syn 2.0.90", + "syn", ] [[package]] @@ -1878,6 +1965,12 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "termtree" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76" + [[package]] name = "thiserror" version = "1.0.69" @@ -1904,7 +1997,7 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.90", + "syn", ] [[package]] @@ -1915,7 +2008,7 @@ checksum = "f2f357fcec90b3caef6623a099691be676d033b40a058ac95d2a6ade6fa0c943" dependencies = [ "proc-macro2", "quote", - "syn 2.0.90", + "syn", ] [[package]] @@ -2060,7 +2153,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.90", + "syn", "wasm-bindgen-shared", ] @@ -2082,7 +2175,7 @@ checksum = "98c9ae5a76e46f4deecd0f0255cc223cfa18dc9b261213b8aa0c7b36f61b3f1d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.90", + "syn", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -2360,7 +2453,7 @@ checksum = "2380878cad4ac9aac1e2435f3eb4020e8374b5f13c296cb75b4620ff8e229154" dependencies = [ "proc-macro2", "quote", - "syn 2.0.90", + "syn", "synstructure", ] @@ -2381,7 +2474,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.90", + "syn", ] [[package]] @@ -2401,7 +2494,7 @@ checksum = "595eed982f7d355beb85837f651fa22e90b3c044842dc7f2c2842c086f295808" dependencies = [ "proc-macro2", "quote", - "syn 2.0.90", + "syn", "synstructure", ] diff --git a/Cargo.toml b/Cargo.toml index 90150dc4..4cf9d8a0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ members = [".", "crate/encstr"] [workspace.package] repository = "https://github.com/pamburus/hl" authors = ["Pavel Ivanov "] -version = "0.30.0-alpha.3" +version = "0.30.0-alpha.4" edition = "2021" license = "MIT" @@ -43,13 +43,14 @@ crossbeam-channel = "0" crossbeam-queue = "0" crossbeam-utils = "0" deko = "0" -derive_deref = "1" +derive_more = {version="1", features = ["deref", "from"]} dirs = "5" dirs-sys = "0" encstr = { path = "./crate/encstr" } enum-map = "2" enumset = "1" enumset-ext = { path = "./crate/enumset-ext" } +env_logger = "0" flate2 = "1" heapless = "0" hex = "0" @@ -57,13 +58,14 @@ htp = { git = "https://github.com/pamburus/htp.git" } humantime = "2" itertools = "0.13" itoa = { version = "1", default-features = false } +log = "0" +nonzero_ext = "0" notify = { version = "7", features = ["macos_kqueue"] } nu-ansi-term = "0" num_cpus = "1" once_cell = "1" pest = "2" pest_derive = "2" -phf = { version = "0", features = ["macros"] } regex = "1" rust-embed = "8" serde = { version = "1", features = ["derive"] } @@ -88,8 +90,10 @@ kqueue = "1" [dev-dependencies] byte-strings = "0" +clean-path = "0" criterion = "0" maplit = "1" +mockall = "0" stats_alloc = "0" regex = "1" wildmatch = "2" diff --git a/build/ci/coverage.sh b/build/ci/coverage.sh index d31efcfb..c9392605 100755 --- a/build/ci/coverage.sh +++ b/build/ci/coverage.sh @@ -41,18 +41,15 @@ function clean() { function test() { cargo test --tests --workspace cargo build - ${MAIN_EXECUTABLE:?} > /dev/null + ${MAIN_EXECUTABLE:?} --config= > /dev/null ${MAIN_EXECUTABLE:?} --config= --help > /dev/null ${MAIN_EXECUTABLE:?} --config=etc/defaults/config-k8s.yaml > /dev/null ${MAIN_EXECUTABLE:?} --config=etc/defaults/config-ecs.yaml > /dev/null - ${MAIN_EXECUTABLE:?} --shell-completions bash > /dev/null - ${MAIN_EXECUTABLE:?} --man-page > /dev/null - ${MAIN_EXECUTABLE:?} --list-themes > /dev/null - echo "" | ${MAIN_EXECUTABLE:?} --concurrency 4 > /dev/null - if ${MAIN_EXECUTABLE:?} -s test.log.gz 2>/dev/null > /dev/null; then - echo "Expected combination of options `-s 1234.log.gz` to fail" - exit 1 - fi + ${MAIN_EXECUTABLE:?} --config= --shell-completions bash > /dev/null + ${MAIN_EXECUTABLE:?} --config= --man-page > /dev/null + ${MAIN_EXECUTABLE:?} --config= --list-themes > /dev/null + ${MAIN_EXECUTABLE:?} --config= sample/prometheus.log -P > /dev/null + echo "" | ${MAIN_EXECUTABLE:?} --config= --concurrency 4 > /dev/null } function merge() { diff --git a/sample/test.log b/sample/test.log new file mode 100644 index 00000000..f0714e90 --- /dev/null +++ b/sample/test.log @@ -0,0 +1,2 @@ +{"ts":"2024-10-01T01:02:04Z","level":"info","msg":"latest test message"} +{"ts":"2024-10-01T01:02:03Z","level":"info","msg":"earlier test message"} diff --git a/src/app.rs b/src/app.rs index 6463848e..7ed72829 100644 --- a/src/app.rs +++ b/src/app.rs @@ -40,6 +40,7 @@ use crate::{ settings::{FieldShowOption, Fields, Formatting}, theme::{Element, StylingPush, Theme}, timezone::Tz, + vfs::LocalFileSystem, IncludeExcludeKeyFilter, {error::*, QueryNone}, }; @@ -219,7 +220,7 @@ impl App { let mut tx = StripedSender::new(txi); let scanner = Scanner::new(sfi, &self.options.delimiter); for (i, mut input) in inputs.into_iter().enumerate() { - for item in scanner.items(&mut input.stream).with_max_segment_size(self.options.max_message_size.into()) { + for item in scanner.items(&mut input.stream.as_sequential()).with_max_segment_size(self.options.max_message_size.into()) { if tx.send((i, item?)).is_none() { break; } @@ -272,6 +273,7 @@ impl App { fn sort(&self, inputs: Vec, output: &mut Output) -> Result<()> { let mut output = BufWriter::new(output); let indexer_settings = IndexerSettings::new( + LocalFileSystem, self.options.buffer_size.try_into()?, self.options.max_message_size.try_into()?, &self.options.fields.settings.predefined, @@ -499,14 +501,14 @@ impl App { let reader = scope.spawn(closure!(clone sfi, clone txi, |_| -> Result<()> { let scanner = Scanner::new(sfi.clone(), &self.options.delimiter); let mut meta = None; - if let InputReference::File(filename) = &input_ref { - meta = Some(fs::metadata(filename)?); + if let InputReference::File(path) = &input_ref { + meta = Some(fs::metadata(&path.canonical)?); } - let mut input = Some(input_ref.open_tail(self.options.tail)?); + let mut input = Some(input_ref.open()?.tail(self.options.tail)?); let is_file = |meta: &Option| meta.as_ref().map(|m|m.is_file()).unwrap_or(false); let process = |input: &mut Option, is_file: bool| { if let Some(input) = input { - for (j, item) in scanner.items(&mut input.stream).with_max_segment_size(self.options.max_message_size.into()).enumerate() { + for (j, item) in scanner.items(&mut input.stream.as_sequential()).with_max_segment_size(self.options.max_message_size.into()).enumerate() { if txi.send((i, j, item?)).is_err() { break; } @@ -516,14 +518,14 @@ impl App { Ok(false) } }; - if let InputReference::File(filename) = &input_ref { + if let InputReference::File(path) = &input_ref { if process(&mut input, is_file(&meta))? { return Ok(()) } - fsmon::run(vec![filename.clone()], |event| { + fsmon::run(vec![path.canonical.clone()], |event| { match event.kind { EventKind::Modify(_) | EventKind::Create(_) | EventKind::Any | EventKind::Other => { - if let (Some(old_meta), Ok(new_meta)) = (&meta, fs::metadata(&filename)) { + if let (Some(old_meta), Ok(new_meta)) = (&meta, fs::metadata(&path.canonical)) { if old_meta.len() > new_meta.len() { input = None; } @@ -691,7 +693,7 @@ impl App { fn input_badges<'a, I: IntoIterator>(&self, inputs: I) -> Option> { let name = |input: &InputReference| match input { InputReference::Stdin => "".to_owned(), - InputReference::File(path) => path.to_string_lossy().to_string(), + InputReference::File(path) => path.original.to_string_lossy().to_string(), }; let mut badges = inputs.into_iter().map(|x| name(x).chars().collect_vec()).collect_vec(); diff --git a/src/error.rs b/src/error.rs index cb41211d..5c253ef6 100644 --- a/src/error.rs +++ b/src/error.rs @@ -102,8 +102,6 @@ pub enum Error { }, #[error("invalid index header")] InvalidIndexHeader, - #[error("requested sorting of messages in {}-compressed file '{}' that is not currently supported", HILITE.paint(.format), HILITE.paint(.path.to_string_lossy()))] - UnsupportedFormatForIndexing { path: PathBuf, format: String }, #[error("failed to parse json: {0}")] JsonParseError(#[from] serde_json::Error), #[error("failed to parse logfmt: {0}")] diff --git a/src/index.rs b/src/index.rs index a5bee5c8..af5cbe05 100644 --- a/src/index.rs +++ b/src/index.rs @@ -10,14 +10,16 @@ // // std imports +#[cfg(test)] +use mockall::{automock, predicate::*}; use std::{ cmp::{max, min}, - convert::{TryFrom, TryInto}, + convert::{Into, TryFrom, TryInto}, fmt::{self, Display}, - fs::File, - io::{Read, Write}, + fs::{self}, + io::{self, Read, Write}, iter::empty, - num::NonZeroU32, + num::{NonZero, NonZeroU32}, path::{Path, PathBuf}, sync::Arc, time::{SystemTime, UNIX_EPOCH}, @@ -29,7 +31,9 @@ use closure::closure; use crossbeam_channel as channel; use crossbeam_channel::RecvError; use crossbeam_utils::thread; +use derive_more::{Deref, From}; use itertools::izip; +use nonzero_ext::nonzero; use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; @@ -38,11 +42,11 @@ use crate::{ app::{InputFormat, UnixTimestampUnit}, error::{Error, Result}, index_capnp as schema, - input::Input, level::Level, model::{Parser, ParserSettings, RawRecord}, scanning::{Delimiter, Scanner, Segment, SegmentBuf, SegmentBufFactory}, settings::PredefinedFields, + vfs::{FileRead, FileSystem, LocalFileSystem}, }; // types @@ -126,20 +130,23 @@ impl std::ops::Sub for Timestamp { // --- -pub struct IndexerSettings<'a> { - buffer_size: NonZeroU32, - max_message_size: NonZeroU32, - fields: &'a PredefinedFields, - delimiter: Delimiter, - allow_prefix: bool, - unix_ts_unit: Option, - format: Option, +#[derive(Default)] +pub struct IndexerSettings<'a, FS> { + pub fs: FS, + pub buffer_size: BufferSize, + pub max_message_size: MessageSize, + pub fields: &'a PredefinedFields, + pub delimiter: Delimiter, + pub allow_prefix: bool, + pub unix_ts_unit: Option, + pub format: Option, } -impl<'a> IndexerSettings<'a> { +impl<'a, FS: FileSystem> IndexerSettings<'a, FS> { pub fn new( - buffer_size: NonZeroU32, - max_message_size: NonZeroU32, + fs: FS, + buffer_size: BufferSize, + max_message_size: MessageSize, fields: &'a PredefinedFields, delimiter: Delimiter, allow_prefix: bool, @@ -147,6 +154,7 @@ impl<'a> IndexerSettings<'a> { format: Option, ) -> Self { Self { + fs, buffer_size, max_message_size, fields, @@ -175,10 +183,69 @@ impl<'a> IndexerSettings<'a> { } } +#[derive(Deref, From, Serialize, Deserialize)] +pub struct BufferSize(NonZeroU32); + +impl Default for BufferSize { + fn default() -> Self { + Self(nonzero!(4 * 1024u32)) + } +} + +impl TryFrom> for BufferSize { + type Error = std::num::TryFromIntError; + + fn try_from(value: NonZero) -> std::result::Result { + Ok(Self(value.try_into()?)) + } +} + +impl From for NonZeroU32 { + fn from(value: BufferSize) -> NonZeroU32 { + value.0.into() + } +} + +impl From for u32 { + fn from(value: BufferSize) -> u32 { + value.0.into() + } +} + +#[derive(Deref, From, Serialize, Deserialize)] +pub struct MessageSize(NonZeroU32); + +impl Default for MessageSize { + fn default() -> Self { + Self(nonzero!(64 * 1024u32)) + } +} + +impl TryFrom> for MessageSize { + type Error = std::num::TryFromIntError; + + fn try_from(value: NonZero) -> std::result::Result { + Ok(Self(value.try_into()?)) + } +} + +impl From for NonZeroU32 { + fn from(value: MessageSize) -> NonZeroU32 { + value.0.into() + } +} + +impl From for u32 { + fn from(value: MessageSize) -> u32 { + value.0.into() + } +} + // --- /// Allows log files indexing to enable message sorting. -pub struct Indexer { +pub struct Indexer { + fs: FS, concurrency: usize, buffer_size: u32, max_message_size: u32, @@ -189,10 +256,14 @@ pub struct Indexer { format: Option, } -impl Indexer { +impl Indexer +where + FS::Metadata: SourceMetadata, +{ /// Returns a new Indexer with the given parameters. - pub fn new(concurrency: usize, dir: PathBuf, settings: IndexerSettings<'_>) -> Self { + pub fn new(concurrency: usize, dir: PathBuf, settings: IndexerSettings<'_, FS>) -> Self { Self { + fs: settings.fs, concurrency, buffer_size: settings.buffer_size.into(), max_message_size: settings.max_message_size.into(), @@ -212,14 +283,59 @@ impl Indexer { /// Builds index for the given file. /// /// Builds the index, saves it to disk and returns it. - pub fn index(&self, source_path: &PathBuf) -> Result { - let source_path = std::fs::canonicalize(source_path)?; - let meta = source_path.metadata()?; + pub fn index(&self, source_path: &Path) -> Result { + let (source_path, mut stream) = self.open_source(source_path)?; + let meta = Metadata::from(&stream.metadata()?)?; + let (index_path, index, actual) = self.prepare(&source_path, &meta)?; + if actual { + return Ok(index.unwrap()); + } + + self.build_index_from_stream(&mut stream, &source_path, &meta, &index_path, index) + } + + /// Builds index for the given file represended by a stream. + /// + /// The stream may be an uncompressed representation of the file. + /// The source_path parameter must be the canonical path of the file. + pub fn index_stream(&self, stream: &mut Reader, source_path: &Path, meta: &fs::Metadata) -> Result { + let meta = &meta.try_into()?; + let (index_path, index, actual) = self.prepare(source_path, meta)?; + if actual { + return Ok(index.unwrap()); + } + + self.build_index_from_stream(stream, source_path, meta, &index_path, index) + } + + /// Builds an in-memory index for the given stream. + pub fn index_in_memory(&self, input: &mut Reader) -> Result { + self.process_file( + &PathBuf::from(""), + &Metadata { + len: 0, + modified: (0, 0), + }, + input, + &mut io::sink(), + None, + ) + } + + fn prepare(&self, source_path: &Path, meta: &Metadata) -> Result<(PathBuf, Option, bool)> { + assert_eq!(source_path, &self.fs.canonicalize(source_path)?); + let hash = hex::encode(sha256(source_path.to_string_lossy().as_bytes())); let index_path = self.dir.join(PathBuf::from(hash)); let mut existing_index = None; - if Path::new(&index_path).exists() { - let mut file = match File::open(&index_path) { + let mut actual = false; + + log::debug!("source path: {}", source_path.display()); + log::debug!("index file path: {}", index_path.display()); + log::debug!("source meta: size={} modified={:?}", meta.len, meta.modified); + + if self.fs.exists(&index_path)? { + let mut file = match self.fs.open(&index_path) { Ok(file) => file, Err(err) => { return Err(Error::FailedToOpenFileForReading { @@ -229,52 +345,29 @@ impl Indexer { } }; if let Ok(index) = Index::load(&mut file) { - if meta.len() == index.source().size && ts(meta.modified()?) == index.source().modified { - return Ok(index); + log::debug!( + "index stuff: size={} modified={:?}", + index.source().size, + index.source().modified + ); + if meta.len == index.source().size && meta.modified == index.source().modified { + actual = true; } - existing_index = Some(index) + existing_index = Some(index); } } - - self.build_index(&source_path, &index_path, existing_index) + Ok((index_path, existing_index, actual)) } - /// Builds index for the given stream. - /// - /// Builds the index and returns it. - pub fn index_from_stream(&self, input: &mut Reader) -> Result { - self.process_file( - &PathBuf::from(""), - Metadata { - len: 0, - modified: (0, 0), - }, - input, - &mut std::io::sink(), - None, - ) - } - - fn build_index(&self, source_path: &PathBuf, index_path: &PathBuf, existing_index: Option) -> Result { - let mut input = match Input::open(&source_path) { - Ok(input) => input, - Err(err) => { - return Err(Error::FailedToOpenFileForReading { - path: source_path.clone(), - source: err, - }); - } - }; - let metadata = match std::fs::metadata(&source_path) { - Ok(metadata) => metadata, - Err(err) => { - return Err(Error::FailedToGetFileMetadata { - path: source_path.clone(), - source: err, - }); - } - }; - let mut output = match File::create(&index_path) { + fn build_index_from_stream( + &self, + stream: &mut Reader, + source_path: &Path, + meta: &Metadata, + index_path: &PathBuf, + existing_index: Option, + ) -> Result { + let mut output = match self.fs.create(&index_path) { Ok(output) => output, Err(err) => { return Err(Error::FailedToOpenFileForWriting { @@ -283,19 +376,14 @@ impl Indexer { }); } }; - self.process_file( - &source_path, - (&metadata).try_into()?, - &mut input.stream, - &mut output, - existing_index, - ) + + self.process_file(source_path, meta, stream, &mut output, existing_index) } fn process_file( &self, - path: &PathBuf, - metadata: Metadata, + path: &Path, + metadata: &Metadata, input: &mut Reader, output: &mut Writer, existing_index: Option, @@ -503,6 +591,48 @@ impl Indexer { }) }) } + + fn open_source( + &self, + source_path: &Path, + ) -> io::Result<(PathBuf, Box + Send + Sync>)> { + let source_path = self.fs.canonicalize(source_path)?; + let result = self.fs.open(&source_path)?; + Ok((source_path, result)) + } +} + +// --- + +#[cfg_attr(test, automock)] +pub trait SourceMetadata { + fn len(&self) -> u64; + fn modified(&self) -> io::Result; +} + +impl SourceMetadata for fs::Metadata { + #[inline] + fn len(&self) -> u64 { + self.len() + } + + #[inline] + fn modified(&self) -> io::Result { + self.modified() + } +} + +#[cfg(test)] +impl SourceMetadata for crate::vfs::mem::Metadata { + #[inline] + fn len(&self) -> u64 { + self.len as u64 + } + + #[inline] + fn modified(&self) -> io::Result { + Ok(self.modified) + } } // --- @@ -913,22 +1043,46 @@ impl Header { // --- +#[derive(Clone, Copy, PartialEq, Eq, Debug)] struct Metadata { len: u64, modified: (i64, u32), } -impl TryFrom<&std::fs::Metadata> for Metadata { - type Error = std::io::Error; - - fn try_from(value: &std::fs::Metadata) -> std::io::Result { +impl Metadata { + pub fn from(source: &M) -> io::Result { Ok(Self { - len: value.len(), - modified: ts(value.modified()?), + len: source.len(), + modified: ts(source.modified()?), }) } } +impl TryFrom<&fs::Metadata> for Metadata { + type Error = io::Error; + + fn try_from(value: &fs::Metadata) -> io::Result { + Self::from(value) + } +} + +impl TryFrom for Metadata { + type Error = io::Error; + + fn try_from(value: fs::Metadata) -> io::Result { + Self::from(&value) + } +} + +#[cfg(test)] +impl TryFrom<&MockSourceMetadata> for Metadata { + type Error = io::Error; + + fn try_from(value: &MockSourceMetadata) -> io::Result { + Self::from(value) + } +} + // --- struct AsHex(T); @@ -1020,21 +1174,20 @@ const CURRENT_VERSION: u64 = 1; mod tests { use super::*; + use crate::vfs::{self, MockFileSystem}; + #[test] fn test_process_file_success() { - use std::io::Cursor; + use io::Cursor; let indexer = Indexer::new( 1, PathBuf::from("/tmp/cache"), - IndexerSettings::new( - NonZeroU32::new(1024).unwrap(), - NonZeroU32::new(1024).unwrap(), - &PredefinedFields::default(), - Delimiter::default(), - false, - None, - None, - ), + IndexerSettings { + fs: MockFileSystem::::new(), + buffer_size: nonzero!(1024u32).into(), + max_message_size: nonzero!(1024u32).into(), + ..Default::default() + }, ); let data = concat!( "ts=2023-12-04T10:01:07.091243+01:00 msg=msg1\n", @@ -1046,7 +1199,7 @@ mod tests { let index = indexer .process_file( &PathBuf::from("/tmp/test.log"), - Metadata { + &Metadata { len: data.len() as u64, modified: (1714739340, 0), }, @@ -1084,25 +1237,23 @@ mod tests { #[test] fn test_process_file_error() { - use std::io::Cursor; + use io::Cursor; + let fs = MockFileSystem::::new(); let indexer = Indexer::new( 1, PathBuf::from("/tmp/cache"), - IndexerSettings::new( - NonZeroU32::new(1024).unwrap(), - NonZeroU32::new(1024).unwrap(), - &PredefinedFields::default(), - Delimiter::default(), - false, - None, - None, - ), + IndexerSettings { + fs, + buffer_size: nonzero!(1024u32).into(), + max_message_size: nonzero!(1024u32).into(), + ..Default::default() + }, ); let mut input = FailingReader; let mut output = Cursor::new(Vec::new()); let result = indexer.process_file( &PathBuf::from("/tmp/test.log"), - Metadata { + &Metadata { len: 135, modified: (1714739340, 0), }, @@ -1114,11 +1265,44 @@ mod tests { assert_eq!(output.into_inner().len(), 0); } + #[test] + fn test_indexer() { + let fs = vfs::mem::FileSystem::new(); + + let data = br#"ts=2024-01-02T03:04:05Z msg="some test message""#; + let mut file = fs.create(&PathBuf::from("test.log")).unwrap(); + file.write_all(data).unwrap(); + + let indexer = Indexer::new( + 1, + PathBuf::from("/tmp/cache"), + IndexerSettings { + fs, + ..Default::default() + }, + ); + + let index1 = indexer.index(&PathBuf::from("test.log")).unwrap(); + + assert_eq!(index1.source.size, 47); + assert_eq!(index1.source.path, "/tmp/test.log"); + assert_eq!(index1.source.stat.lines_valid, 1); + assert_eq!(index1.source.stat.lines_invalid, 0); + assert_eq!(index1.source.stat.flags, schema::FLAG_HAS_TIMESTAMPS); + assert_eq!(index1.source.blocks.len(), 1); + + let index2 = indexer.index(&PathBuf::from("test.log")).unwrap(); + assert_eq!(index2.source.size, index1.source.size); + assert_eq!(index2.source.modified, index1.source.modified); + } + + // --- + struct FailingReader; impl Read for FailingReader { - fn read(&mut self, _: &mut [u8]) -> std::io::Result { - Err(std::io::Error::new(std::io::ErrorKind::Other, "read error")) + fn read(&mut self, _: &mut [u8]) -> io::Result { + Err(io::Error::new(io::ErrorKind::Other, "read error")) } } } diff --git a/src/input.rs b/src/input.rs index a9487ee9..d63de4fb 100644 --- a/src/input.rs +++ b/src/input.rs @@ -1,202 +1,443 @@ // std imports -use std::cmp::min; -use std::convert::TryInto; -use std::fs::File; -use std::io::{self, stdin, BufReader, Read, Seek, SeekFrom}; -use std::mem::size_of_val; -use std::ops::Range; -use std::path::PathBuf; -use std::sync::{Arc, Mutex}; +use std::{ + cmp::min, + convert::TryInto, + fs::{self, File, Metadata}, + io::{self, stdin, BufRead, BufReader, Cursor, Read, Seek, SeekFrom, Write}, + mem::size_of_val, + ops::{Deref, Range}, + path::{Path, PathBuf}, + sync::{Arc, Mutex}, +}; // third-party imports -use deko::bufread::AnyDecoder; +use deko::{bufread::AnyDecoder, Format}; use nu_ansi_term::Color; // local imports -use crate::error::Result; -use crate::index::{Index, Indexer, SourceBlock}; -use crate::iox::ReadFill; -use crate::pool::SQPool; -use crate::replay::{ReplayBufCreator, ReplayBufReader}; -use crate::tee::TeeReader; +use crate::{ + error::{Result, HILITE}, + index::{Index, Indexer, SourceBlock, SourceMetadata}, + iox::ReadFill, + pool::SQPool, + replay::{ReplayBufCreator, ReplayBufReader, ReplaySeekReader}, + tee::TeeReader, + vfs::{FileSystem, LocalFileSystem}, +}; // --- -pub type InputStream = Box; -pub type InputStreamFactory = Box Box + Send + Sync>; - -pub type InputSeekStream = Box>; - +pub type SequentialStream = Box; +pub type RandomAccessStream = Box; pub type BufPool = SQPool>; // --- -#[derive(Clone)] -pub enum InputReference { - Stdin, - File(PathBuf), +/// The path to an input file. +#[derive(Clone, PartialEq, Eq, Debug)] +pub struct InputPath { + pub original: PathBuf, + pub canonical: PathBuf, +} + +impl InputPath { + /// Creates a new input path. + pub fn new(original: PathBuf, canonical: PathBuf) -> Self { + Self { original, canonical } + } + + /// Resolves the canonical path for the given path. + pub fn resolve(original: PathBuf) -> io::Result { + Self::resolve_with_fs(original, LocalFileSystem) + } + + /// Resolves the canonical path for the given path using the specified file system. + pub fn resolve_with_fs(original: PathBuf, fs: FS) -> io::Result { + let canonical = fs.canonicalize(&original).map_err(|e| { + io::Error::new( + e.kind(), + format!( + "failed to resolve path for '{}': {}", + HILITE.paint(original.to_string_lossy()), + e + ), + ) + })?; + + Ok(Self { original, canonical }) + } + + /// Creates an ephemeral path. + pub fn ephemeral(original: PathBuf) -> Self { + Self { + original: original.clone(), + canonical: original, + } + } } -impl Into> for InputReference { - fn into(self) -> io::Result { - self.hold() +impl TryFrom for InputPath { + type Error = io::Error; + + fn try_from(original: PathBuf) -> io::Result { + Self::resolve(original) } } +// --- + +/// A reference to an input file or stdin. +#[derive(Clone, PartialEq, Eq, Debug)] +pub enum InputReference { + Stdin, + File(InputPath), +} + impl InputReference { + /// Preliminarily opens the input file to ensure it exists and is readable + /// and protect it from being suddenly deleted while we need it. pub fn hold(&self) -> io::Result { - Ok(InputHolder::new( - self.clone(), - match self { - InputReference::Stdin => None, - InputReference::File(path) => { - let meta = std::fs::metadata(path).map_err(|e| { - io::Error::new( - e.kind(), - format!("failed to get information on {}: {}", self.description(), e), - ) - })?; - if meta.is_dir() { - return Err(io::Error::new( - io::ErrorKind::InvalidInput, - format!("{} is a directory", self.description()), - )); - } - Some(Box::new(File::open(path).map_err(|e| { - io::Error::new(e.kind(), format!("failed to open {}: {}", self.description(), e)) - })?)) + let (reference, stream): (_, Option>) = match self { + Self::Stdin => (self.clone(), None), + Self::File(path) => { + let meta = fs::metadata(&path.canonical).map_err(|e| { + io::Error::new( + e.kind(), + format!("failed to get information on {}: {}", self.description(), e), + ) + })?; + if meta.is_dir() { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!("{} is a directory", self.description()), + )); } - }, - )) + let stream = + Box::new(File::open(&path.canonical).map_err(|e| { + io::Error::new(e.kind(), format!("failed to open {}: {}", self.description(), e)) + })?); + + (InputReference::File(path.clone()), Some(stream)) + } + }; + + Ok(InputHolder::new(reference, stream)) } + /// Completely opens the input for reading. + /// This includes decoding compressed files if needed. pub fn open(&self) -> io::Result { self.hold()?.open() } - pub fn open_tail(&self, n: u64) -> io::Result { + /// Returns a description of the input reference. + pub fn description(&self) -> String { match self { - Self::Stdin => self.open(), - Self::File(path) => { - let mut file = File::open(path) - .map_err(|e| io::Error::new(e.kind(), format!("failed to open {}: {}", self.description(), e)))?; - Self::seek_tail(&mut file, n).ok(); - Ok(Input::new(self.clone(), Box::new(file))) - } + Self::Stdin => "".into(), + Self::File(path) => format!("file '{}'", Color::Yellow.paint(path.original.to_string_lossy())), } } - pub fn description(&self) -> String { + #[inline] + fn path(&self) -> Option<&PathBuf> { match self { - Self::Stdin => "".into(), - Self::File(filename) => format!("file '{}'", Color::Yellow.paint(filename.to_string_lossy())), + Self::Stdin => None, + Self::File(path) => Some(&path.canonical), } } +} - fn seek_tail(file: &mut File, lines: u64) -> io::Result<()> { - const BUF_SIZE: usize = 64 * 1024; - let mut scratch = [0; BUF_SIZE]; - let mut count: u64 = 0; - let mut prev_pos = file.seek(SeekFrom::End(0))?; - let mut pos = prev_pos; - while pos > 0 { - pos -= min(BUF_SIZE as u64, pos); - pos = file.seek(SeekFrom::Start(pos))?; - if pos == prev_pos { - break; - } - let bn = min(BUF_SIZE, (prev_pos - pos) as usize); - let buf = scratch[..bn].as_mut(); +// --- - file.read_exact(buf)?; +/// Meta information about the input. +pub trait Meta { + fn metadata(&self) -> io::Result>; +} - for i in (0..bn).rev() { - if buf[i] == b'\n' { - if count == lines { - file.seek(SeekFrom::Start(pos + i as u64 + 1))?; - return Ok(()); - } - count += 1; - } - } +impl Meta for &T { + #[inline] + fn metadata(&self) -> io::Result> { + (*self).metadata() + } +} - prev_pos = pos; - } - file.seek(SeekFrom::Start(pos as u64))?; - Ok(()) +impl Meta for &mut T { + #[inline] + fn metadata(&self) -> io::Result> { + (**self).metadata() + } +} + +impl Meta for fs::File { + #[inline] + fn metadata(&self) -> io::Result> { + self.metadata().map(Some) + } +} + +impl Meta for io::Stdin { + #[inline] + fn metadata(&self) -> io::Result> { + Ok(None) + } +} + +impl Meta for Cursor { + #[inline] + fn metadata(&self) -> io::Result> { + Ok(None) + } +} + +impl Meta for Mutex { + #[inline] + fn metadata(&self) -> io::Result> { + self.lock().unwrap().metadata() } } // --- +/// A holder of an input file. +/// It can be used to ensure the input file is not suddenly deleting while it is needed. pub struct InputHolder { pub reference: InputReference, - pub stream: Option>, + pub stream: Option>, } impl InputHolder { - pub fn new(reference: InputReference, stream: Option>) -> Self { + /// Creates a new input holder. + pub fn new(reference: InputReference, stream: Option>) -> Self { Self { reference, stream } } + /// Opens the input file for reading. + /// This includes decoding compressed files if needed. pub fn open(self) -> io::Result { - match self.reference { - InputReference::Stdin => Ok(Input::new(self.reference.clone(), self.stdin())), - InputReference::File(path) => match self.stream { - Some(stream) => Input::open_stream(&path, stream), - None => Input::open(&path), - }, - } + Ok(Input::new(self.reference.clone(), self.stream()?)) } - pub fn index(self, indexer: &Indexer) -> Result { - match self.reference { - InputReference::Stdin => IndexedInput::open_sequential(self.reference.clone(), self.stdin(), indexer), - InputReference::File(path) => match self.stream { - Some(stream) => IndexedInput::open_stream(&path, stream, indexer), - None => IndexedInput::open(&path, indexer), + /// Indexes the input file and returns IndexedInput that can be used to access the data in random order. + pub fn index(self, indexer: &Indexer) -> Result + where + FS: FileSystem + Sync, + FS::Metadata: SourceMetadata, + { + self.open()?.indexed(indexer) + } + + fn stream(self) -> io::Result { + Ok(match &self.reference { + InputReference::Stdin => match self.stream { + Some(stream) => Stream::Sequential(Stream::RandomAccess(stream).into_sequential()), + None => Stream::Sequential(self.stdin()), }, - } + InputReference::File(_) => match self.stream { + Some(stream) => Stream::RandomAccess(stream), + None => Stream::RandomAccess(self.reference.hold()?.stream.unwrap()), + }, + }) } - fn stdin(self) -> InputStream { + fn stdin(self) -> SequentialStream { self.stream - .map(|s| Box::new(ReadSeekToRead(s)) as InputStream) + .map(|s| Box::new(StreamOver(s)) as SequentialStream) .unwrap_or_else(|| Box::new(stdin())) } } +/// Represents already opened and decoded input file or stdin. pub struct Input { pub reference: InputReference, - pub stream: InputStream, + pub stream: Stream, } impl Input { - pub fn new(reference: InputReference, stream: InputStream) -> Self { + fn new(reference: InputReference, stream: Stream) -> Self { Self { reference: reference.clone(), - stream: Box::new(WrappedInputStream { reference, stream }), + stream: stream.verified().decoded().tagged(reference), } } + /// Indexes the input file and returns IndexedInput that can be used to access the data in random order. + pub fn indexed(self, indexer: &Indexer) -> Result + where + FS: FileSystem + Sync, + FS::Metadata: SourceMetadata, + { + IndexedInput::from_stream(self.reference, self.stream, indexer) + } + + /// Opens the file for reading. + /// This includes decoding compressed files if needed. pub fn open(path: &PathBuf) -> io::Result { - InputReference::File(path.clone()).hold()?.open() + InputReference::File(path.clone().try_into()?).open() + } + + /// Opens the stdin for reading. + pub fn stdin() -> io::Result { + InputReference::Stdin.open() + } + + pub fn tail(mut self, lines: u64) -> io::Result { + match &mut self.stream { + Stream::Sequential(_) => (), + Stream::RandomAccess(stream) => Self::seek_tail(stream, lines)?, + } + Ok(self) + } + + fn seek_tail(stream: &mut RandomAccessStream, lines: u64) -> io::Result<()> { + const BUF_SIZE: usize = 64 * 1024; + let mut scratch = [0; BUF_SIZE]; + let mut count: u64 = 0; + let mut pos = stream.seek(SeekFrom::End(0))?; + while pos != 0 { + let n = min(BUF_SIZE as u64, pos); + pos -= n; + pos = stream.seek(SeekFrom::Start(pos))?; + let bn = n as usize; + let buf = scratch[..bn].as_mut(); + + stream.read_exact(buf)?; + + for i in (0..bn).rev() { + if buf[i] == b'\n' { + if count == lines { + stream.seek(SeekFrom::Start(pos + i as u64 + 1))?; + return Ok(()); + } + count += 1; + } + } + } + stream.seek(SeekFrom::Start(pos as u64))?; + Ok(()) + } +} + +// --- + +/// Stream of input data. +/// It can be either sequential or supporing random access. +pub enum Stream { + Sequential(SequentialStream), + RandomAccess(RandomAccessStream), +} + +impl Stream { + /// Verifies if the stream supports random access. + /// If not, converts it to a sequential stream. + pub fn verified(self) -> Self { + match self { + Self::Sequential(stream) => Self::Sequential(stream), + Self::RandomAccess(stream) => { + let mut stream = stream; + if stream.seek(SeekFrom::Current(0)).is_err() { + Self::Sequential(Box::new(stream)) + } else { + Self::RandomAccess(stream) + } + } + } + } + + /// Decodes the stream if needed. + pub fn decoded(self) -> Self { + match self { + Self::Sequential(stream) => { + let meta = stream.metadata().ok().flatten(); + Self::Sequential(Box::new(AnyDecoder::new(BufReader::new(stream)).with_metadata(meta))) + } + Self::RandomAccess(mut stream) => { + if let Some(pos) = stream.seek(SeekFrom::Current(0)).ok() { + log::debug!("detecting format of random access stream"); + let meta = stream.metadata().ok().flatten(); + let kind = AnyDecoder::new(BufReader::new(&mut stream)).kind().ok(); + log::debug!("format detected: {:?}", &kind); + stream.seek(SeekFrom::Start(pos)).ok(); + match kind { + Some(Format::Verbatim) => { + return Self::RandomAccess(stream); + } + Some(_) => { + log::debug!("creating decoder"); + let dec = AnyDecoder::new(BufReader::new(stream)); + log::debug!("decoder created"); + return Self::Sequential(Box::new(dec.with_metadata(meta))); + } + None => (), + } + } + Self::Sequential(Box::new(stream)) + } + } + } + + /// Converts the stream to a sequential stream. + pub fn as_sequential<'a>(&'a mut self) -> StreamOver<&'a mut (dyn ReadMeta + Send + Sync)> { + match self { + Self::Sequential(stream) => StreamOver(stream), + Self::RandomAccess(stream) => StreamOver(stream), + } + } + + /// Converts the stream to a sequential stream. + pub fn into_sequential(self) -> SequentialStream { + match self { + Self::Sequential(stream) => stream, + Self::RandomAccess(stream) => Box::new(StreamOver(stream)), + } + } + + /// Adds context to the returned errors. + pub fn tagged(self, reference: InputReference) -> Self { + match self { + Self::Sequential(stream) => Self::Sequential(Box::new(TaggedStream { reference, stream })), + Self::RandomAccess(stream) => Self::RandomAccess(Box::new(TaggedStream { reference, stream })), + } + } +} + +impl Read for Stream { + #[inline] + fn read(&mut self, buf: &mut [u8]) -> io::Result { + match self { + Self::Sequential(stream) => stream.read(buf), + Self::RandomAccess(stream) => stream.read(buf), + } } +} - pub fn open_stream(path: &PathBuf, stream: Box) -> io::Result { - let stream: InputStream = Box::new(AnyDecoder::new(BufReader::new(stream))); - Ok(Self::new(InputReference::File(path.clone()), stream)) +impl Meta for Stream { + #[inline] + fn metadata(&self) -> io::Result> { + match self { + Self::Sequential(stream) => stream.metadata(), + Self::RandomAccess(stream) => stream.metadata(), + } } } // --- -pub struct WrappedInputStream { +/// A wrapper around a stream that adds context to the returned errors. +pub struct TaggedStream { reference: InputReference, - stream: InputStream, + stream: R, } -impl Read for WrappedInputStream { +impl Deref for TaggedStream { + type Target = R; + + #[inline] + fn deref(&self) -> &Self::Target { + &self.stream + } +} + +impl Read for TaggedStream { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.stream.read(buf).map_err(|e| { io::Error::new( @@ -207,58 +448,134 @@ impl Read for WrappedInputStream { } } +impl Seek for TaggedStream { + fn seek(&mut self, pos: SeekFrom) -> io::Result { + self.stream.seek(pos).map_err(|e| { + io::Error::new( + e.kind(), + format!("failed to seek {}: {}", self.reference.description(), e), + ) + }) + } +} + +impl Meta for TaggedStream { + #[inline] + fn metadata(&self) -> io::Result> { + self.stream.metadata() + } +} + // --- pub struct IndexedInput { pub reference: InputReference, - pub stream: InputSeekStream, + pub stream: Mutex, pub index: Index, } impl IndexedInput { - pub fn new(reference: InputReference, stream: InputSeekStream, index: Index) -> Self { + #[inline] + fn new(reference: InputReference, stream: RandomAccessStream, index: Index) -> Self { Self { reference, - stream, + stream: Mutex::new(stream), index, } } - pub fn open(path: &PathBuf, indexer: &Indexer) -> Result { - InputReference::File(path.clone()).hold()?.index(indexer) + /// Opens the input file and indexes it. + pub fn open(path: &Path, indexer: &Indexer) -> Result + where + FS: FileSystem + Sync, + FS::Metadata: SourceMetadata, + { + InputReference::File(PathBuf::from(path).try_into()?) + .hold()? + .index(indexer) } - pub fn open_stream(path: &PathBuf, mut stream: Box, indexer: &Indexer) -> Result { - if stream.seek(SeekFrom::Current(0)).is_err() { - return Self::open_sequential( - InputReference::File(path.clone()), - Box::new(stream.as_input_stream()), - indexer, - ); + /// Converts the input to blocks. + pub fn into_blocks(self) -> Blocks> { + let n = self.index.source().blocks.len(); + Blocks::new(Arc::new(self), (0..n).into_iter()) + } + + fn from_stream(reference: InputReference, stream: Stream, indexer: &Indexer) -> Result + where + FS: FileSystem + Sync, + FS::Metadata: SourceMetadata, + { + let (stream, index) = Self::index_stream(&reference, stream, indexer)?; + Ok(Self::new(reference, stream, index)) + } + + fn index_stream( + reference: &InputReference, + stream: Stream, + indexer: &Indexer, + ) -> Result<(RandomAccessStream, Index)> + where + FS: FileSystem + Sync, + FS::Metadata: SourceMetadata, + { + log::info!("indexing {}", reference.description()); + + if let (Some(path), Some(meta)) = (reference.path(), stream.metadata()?) { + match stream { + Stream::Sequential(stream) => Self::index_sequential_stream(path, &meta, stream, indexer), + Stream::RandomAccess(stream) => Self::index_random_access_stream(path, &meta, stream, indexer), + } + } else { + let mut tee = TeeReader::new(stream, ReplayBufCreator::new()); + let index = indexer.index_in_memory(&mut tee)?; + let buf = tee.into_writer().result()?; + let stream = Box::new(ReplayBufReader::new(buf).with_metadata(None)); + + Ok((stream, index)) } + } - let index = indexer.index(&path)?; - Ok(Self::new( - InputReference::File(path.clone()), - Box::new(Mutex::new(stream)), - index, - )) + fn index_random_access_stream( + path: &Path, + meta: &Metadata, + mut stream: RandomAccessStream, + indexer: &Indexer, + ) -> Result<(RandomAccessStream, Index)> + where + FS: FileSystem + Sync, + FS::Metadata: SourceMetadata, + { + let pos = stream.seek(SeekFrom::Current(0))?; + let index = indexer.index_stream(&mut stream, path, meta)?; + + stream.seek(SeekFrom::Start(pos))?; + + Ok((stream, index)) } - pub fn open_sequential(reference: InputReference, stream: InputStream, indexer: &Indexer) -> Result { + fn index_sequential_stream( + path: &PathBuf, + meta: &Metadata, + stream: SequentialStream, + indexer: &Indexer, + ) -> Result<(RandomAccessStream, Index)> + where + FS: FileSystem + Sync, + FS::Metadata: SourceMetadata, + { let mut tee = TeeReader::new(stream, ReplayBufCreator::new()); - let index = indexer.index_from_stream(&mut tee)?; - let buf = tee.into_writer().result()?; - Ok(IndexedInput::new( - reference, - Box::new(Mutex::new(ReplayBufReader::new(buf))), - index, - )) - } + let index = indexer.index_stream(&mut tee, path, meta)?; + let meta = meta.clone(); + + let stream: RandomAccessStream = if tee.processed() == 0 { + Box::new(ReplaySeekReader::new(tee.into_reader()).with_metadata(Some(meta))) + } else { + let buf = tee.into_writer().result()?; + Box::new(ReplayBufReader::new(buf).with_metadata(Some(meta))) + }; - pub fn into_blocks(self) -> Blocks> { - let n = self.index.source().blocks.len(); - Blocks::new(Arc::new(self), (0..n).into_iter()) + Ok((stream, index)) } } @@ -270,6 +587,7 @@ pub struct Blocks { } impl> Blocks { + #[inline] pub fn new(input: Arc, indexes: II) -> Self { Self { input, indexes } } @@ -285,18 +603,22 @@ impl> Blocks { impl> Iterator for Blocks { type Item = Block; + #[inline] fn next(&mut self) -> Option { self.indexes.next().map(|i| Block::new(self.input.clone(), i)) } + #[inline] fn size_hint(&self) -> (usize, Option) { self.indexes.size_hint() } + #[inline] fn count(self) -> usize { self.indexes.count() } + #[inline] fn nth(&mut self, n: usize) -> Option { self.indexes.nth(n).map(|i| Block::new(self.input.clone(), i)) } @@ -311,6 +633,7 @@ pub struct Block { } impl Block { + #[inline] pub fn new(input: Arc, index: usize) -> Self { Self { input, @@ -319,6 +642,7 @@ impl Block { } } + #[inline] pub fn with_buf_pool(self, buf_pool: Arc) -> Self { Self { input: self.input, @@ -327,22 +651,27 @@ impl Block { } } + #[inline] pub fn into_lines(self) -> Result> { BlockLines::new(self) } + #[inline] pub fn offset(&self) -> u64 { self.source_block().offset } + #[inline] pub fn size(&self) -> u32 { self.source_block().size } + #[inline] pub fn source_block(&self) -> &SourceBlock { &self.input.index.source().blocks[self.index] } + #[inline] pub fn lines_valid(&self) -> u64 { self.source_block().stat.lines_valid } @@ -420,11 +749,13 @@ impl Iterator for BlockLines { Some(BlockLine::new(self.buf.clone(), offset..offset + l)) } + #[inline] fn size_hint(&self) -> (usize, Option) { let count = self.total - self.current; (count, Some(count)) } + #[inline] fn count(self) -> usize { self.size_hint().0 } @@ -438,18 +769,22 @@ pub struct BlockLine { } impl BlockLine { + #[inline] pub fn new(buf: Arc>, range: Range) -> Self { Self { buf, range } } + #[inline] pub fn bytes(&self) -> &[u8] { &self.buf[self.range.clone()] } + #[inline] pub fn offset(&self) -> usize { self.range.start } + #[inline] pub fn len(&self) -> usize { self.range.end - self.range.start } @@ -463,6 +798,7 @@ pub struct ConcatReader { } impl ConcatReader { + #[inline] pub fn new(iter: I) -> Self { Self { iter, item: None } } @@ -503,66 +839,269 @@ where // --- pub trait ReadSeek: Read + Seek {} +pub trait ReadSeekMeta: ReadSeek + Meta {} +pub trait ReadMeta: Read + Meta {} +pub trait BufReadSeek: BufRead + Seek {} impl ReadSeek for T {} -pub struct ReadSeekToRead(T); +impl ReadSeekMeta for T {} -impl Read for ReadSeekToRead -where - T: ReadSeek, -{ +impl ReadMeta for T {} + +impl Meta for Box { + #[inline] + fn metadata(&self) -> io::Result> { + self.as_ref().metadata() + } +} + +pub struct StreamOver(T); + +impl Deref for StreamOver { + type Target = T; + + #[inline] + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl Read for StreamOver { + #[inline] fn read(&mut self, buf: &mut [u8]) -> io::Result { self.0.read(buf) } } -trait AsInputStream { - fn as_input_stream(self) -> InputStream; +impl Seek for StreamOver { + #[inline] + fn seek(&mut self, pos: SeekFrom) -> io::Result { + self.0.seek(pos) + } } -impl AsInputStream for T { - fn as_input_stream(self) -> InputStream { - Box::new(self) +impl Meta for StreamOver { + #[inline] + fn metadata(&self) -> io::Result> { + self.0.metadata() } } +// --- + +trait WithMeta { + fn with_metadata(self, meta: Option) -> WithMetadata + where + Self: Sized; +} + +impl WithMeta for T { + #[inline] + fn with_metadata(self, meta: Option) -> WithMetadata { + WithMetadata::new(self, meta) + } +} + +// --- + +struct WithMetadata { + inner: T, + meta: Option, +} + +impl WithMetadata { + #[inline] + fn new(inner: T, meta: Option) -> Self { + Self { inner, meta } + } +} + +impl Read for WithMetadata { + #[inline] + fn read(&mut self, buf: &mut [u8]) -> io::Result { + self.inner.read(buf) + } +} + +impl Write for WithMetadata { + #[inline] + fn write(&mut self, buf: &[u8]) -> io::Result { + self.inner.write(buf) + } + + #[inline] + fn flush(&mut self) -> io::Result<()> { + self.inner.flush() + } +} + +impl Seek for WithMetadata { + #[inline] + fn seek(&mut self, pos: SeekFrom) -> io::Result { + self.inner.seek(pos) + } +} + +impl Meta for WithMetadata { + #[inline] + fn metadata(&self) -> io::Result> { + Ok(self.meta.clone()) + } +} + +// --- + #[cfg(test)] mod tests { use super::*; - use std::io::ErrorKind; - use std::io::Read; + use io::Read; + + use itertools::Itertools; + use nonzero_ext::nonzero; + + use crate::{ + index::IndexerSettings, + vfs::{self, LocalFileSystem}, + }; + + #[test] + fn test_input_reference() { + let reference = InputReference::Stdin; + assert_eq!(reference.description(), ""); + assert_eq!(reference.path(), None); + let input = reference.open().unwrap(); + assert_eq!(input.reference, reference); + let reference = InputReference::File(InputPath::ephemeral(PathBuf::from("test.log"))); + assert_eq!(reference.description(), "file '\u{1b}[33mtest.log\u{1b}[0m'"); + assert_eq!(reference.path(), Some(&PathBuf::from("test.log"))); + } + + #[test] + fn test_input_holder() { + let reference = InputReference::File(InputPath::ephemeral(PathBuf::from("sample/test.log"))); + let holder = InputHolder::new(reference, None); + let mut stream = holder.stream().unwrap(); + let mut buf = Vec::new(); + let n = stream.read_to_end(&mut buf).unwrap(); + assert!(matches!(stream, Stream::RandomAccess(_))); + let stream = stream.as_sequential(); + let meta = stream.metadata().unwrap(); + assert_eq!(meta.is_some(), true); + assert_eq!(n, 147); + assert_eq!(buf.len(), n); + } + + #[test] + fn test_input() { + let input = Input::stdin().unwrap(); + assert!(matches!(input.stream, Stream::Sequential(_))); + assert_eq!(input.reference.description(), ""); + let input = Input::open(&PathBuf::from("sample/prometheus.log")).unwrap(); + assert!(matches!(input.stream, Stream::RandomAccess(_))); + assert_eq!( + input.reference.description(), + "file '\u{1b}[33msample/prometheus.log\u{1b}[0m'" + ); + } + + #[test] + fn test_input_tail() { + let input = Input::stdin().unwrap().tail(1).unwrap(); + assert!(matches!(input.stream, Stream::Sequential(_))); + + for &(filename, requested, expected) in &[ + ("sample/test.log", 1, 1), + ("sample/test.log", 2, 2), + ("sample/test.log", 3, 2), + ("sample/prometheus.log", 2, 2), + ] { + let input = Input::open(&PathBuf::from(filename)).unwrap().tail(requested).unwrap(); + let mut buf = Vec::new(); + let n = input.stream.into_sequential().read_to_end(&mut buf).unwrap(); + assert!(n > 0); + assert_eq!(buf.lines().count(), expected); + } + } + + #[test] + fn test_stream() { + let stream = Stream::Sequential(Box::new(Cursor::new(b"test"))); + let stream = stream.verified().decoded().tagged(InputReference::Stdin); + assert!(matches!(stream, Stream::Sequential(_))); + let mut buf = Vec::new(); + let n = stream.into_sequential().read_to_end(&mut buf).unwrap(); + assert_eq!(n, 4); + assert_eq!(buf, b"test"); + + let stream = Stream::RandomAccess(Box::new(UnseekableReader(Cursor::new(b"test")))); + let stream = stream.tagged(InputReference::Stdin).verified(); + assert!(matches!(stream, Stream::Sequential(_))); + let mut buf = Vec::new(); + let n = stream.into_sequential().read_to_end(&mut buf).unwrap(); + assert_eq!(n, 4); + assert_eq!(buf, b"test"); + + let stream = Stream::RandomAccess(Box::new(UnseekableReader(Cursor::new(b"test")))); + assert!(matches!(stream.metadata(), Ok(None))); + let stream = stream.tagged(InputReference::Stdin).decoded(); + assert!(matches!(stream, Stream::Sequential(_))); + assert!(matches!(stream.metadata(), Ok(None))); + let mut buf = Vec::new(); + let n = stream.into_sequential().read_to_end(&mut buf).unwrap(); + assert_eq!(n, 4); + assert_eq!(buf, b"test"); + + // echo 't' | gzip -cf | xxd -p | sed 's/\(..\)/\\x\1/g' + let data = b"\x1f\x8b\x08\x00\x03\x87\x55\x67\x00\x03\x2b\xe1\x02\x00\x13\x47\x5f\xea\x02\x00\x00\x00"; + let stream = Stream::RandomAccess(Box::new(Cursor::new(data).with_metadata(None))); + let stream = stream.tagged(InputReference::Stdin).decoded(); + assert!(matches!(stream, Stream::Sequential(_))); + let mut buf = Vec::new(); + let n = stream.into_sequential().read_to_end(&mut buf).unwrap(); + assert_eq!(n, 2); + assert_eq!(buf, b"t\n"); + + let stream = Stream::RandomAccess(Box::new(FailingReader)); + let stream = stream.tagged(InputReference::Stdin).decoded(); + assert!(matches!(stream, Stream::Sequential(_))); + let mut buf = [0; 128]; + let result = stream.into_sequential().read(&mut buf); + assert!(result.is_err()); + let err = result.unwrap_err(); + assert_eq!(err.kind(), io::ErrorKind::Other); + } #[test] fn test_input_read_error() { - let reference = InputReference::File(PathBuf::from("test.log")); - let mut input = Input::new(reference, Box::new(FailingReader)); + let reference = InputReference::File(InputPath::ephemeral(PathBuf::from("test.log"))); + let input = Input::new(reference, Stream::Sequential(Box::new(FailingReader))); let mut buf = [0; 128]; - let result = input.stream.read(&mut buf); + let result = input.stream.into_sequential().read(&mut buf); assert!(result.is_err()); let err = result.unwrap_err(); - assert_eq!(err.kind(), ErrorKind::Other); + assert_eq!(err.kind(), io::ErrorKind::Other); assert_eq!(err.to_string().contains("test.log"), true); } #[test] fn test_input_hold_error_is_dir() { - let reference = InputReference::File(PathBuf::from(".")); + let reference = InputReference::File(InputPath::ephemeral(PathBuf::from("."))); let result = reference.hold(); assert!(result.is_err()); let err = result.err().unwrap(); - assert_eq!(err.kind(), ErrorKind::InvalidInput); + assert_eq!(err.kind(), io::ErrorKind::InvalidInput); assert_eq!(err.to_string().contains("is a directory"), true); } #[test] fn test_input_hold_error_not_found() { let filename = "AKBNIJGHERHBNMCKJABHSDJ"; - let reference = InputReference::File(PathBuf::from(filename)); + let reference = InputReference::File(InputPath::ephemeral(PathBuf::from(filename))); let result = reference.hold(); assert!(result.is_err()); let err = result.err().unwrap(); - assert_eq!(err.kind(), ErrorKind::NotFound); + assert_eq!(err.kind(), io::ErrorKind::NotFound); assert_eq!(err.to_string().contains(filename), true); } @@ -573,19 +1112,139 @@ mod tests { // echo 'test' | gzip -cf | xxd -p | sed 's/\(..\)/\\x\1/g' b"\x1f\x8b\x08\x00\x9e\xdd\x48\x67\x00\x03\x2b\x49\x2d\x2e\xe1\x02\x00\xc6\x35\xb9\x3b\x05\x00\x00\x00", ); - let mut input = Input::open_stream(&PathBuf::from("test.log.gz"), Box::new(data)).unwrap(); + let mut stream = Stream::Sequential(Box::new(data)).verified().decoded(); let mut buf = Vec::new(); - let result = input.stream.read_to_end(&mut buf); + let result = stream.read_to_end(&mut buf); assert!(result.is_ok()); assert_eq!(result.unwrap(), 5); assert_eq!(buf, b"test\n"); } + #[test] + fn test_indexed_input_stdin() { + let data = br#"{"ts":"2024-10-01T01:02:03Z","level":"info","msg":"some test message"}\n"#; + let stream = Stream::RandomAccess(Box::new(Cursor::new(data))); + let indexer = Indexer::::new(1, PathBuf::new(), IndexerSettings::default()); + let input = IndexedInput::from_stream(InputReference::Stdin, stream, &indexer).unwrap(); + let mut blocks = input.into_blocks().collect_vec(); + assert_eq!(blocks.len(), 1); + let block = blocks.drain(..).next().unwrap(); + assert_eq!(block.lines_valid(), 1); + let mut lines = block.into_lines().unwrap().collect_vec(); + let line = lines.drain(..).next().unwrap(); + assert_eq!(line.bytes(), data); + } + + #[test] + fn test_indexed_input_file_random_access() { + let fs = Arc::new(vfs::mem::FileSystem::new()); + + for _ in 0..2 { + let path = PathBuf::from("sample/test.log"); + let indexer = Indexer::new( + 1, + PathBuf::from("."), + IndexerSettings { + fs: fs.clone(), + buffer_size: nonzero!(64u32).into(), + ..Default::default() + }, + ); + let input = IndexedInput::open(&path, &indexer).unwrap(); + let mut blocks = input.into_blocks().sorted().collect_vec(); + assert_eq!(blocks.len(), 2); + assert_eq!(blocks[0].lines_valid(), 1); + assert_eq!(blocks[0].size(), 74); + assert_eq!(blocks[1].lines_valid(), 1); + assert_eq!(blocks[1].size(), 73); + let lines = blocks.pop().unwrap().into_lines().unwrap().collect_vec(); + assert_eq!(lines.len(), 1); + assert_eq!(lines[0].len(), 73); + let lines = blocks.pop().unwrap().into_lines().unwrap().collect_vec(); + assert_eq!(lines.len(), 1); + assert_eq!(lines[0].len(), 74); + } + } + + #[test] + fn test_indexed_input_sequential_access() { + let fs = Arc::new(vfs::mem::FileSystem::new()); + + for _ in 0..2 { + let path = PathBuf::from("sample/test.log"); + let indexer = Indexer::new( + 1, + PathBuf::from("/tmp/cache"), + IndexerSettings { + fs: fs.clone(), + buffer_size: nonzero!(64u32).into(), + ..Default::default() + }, + ); + let reference = InputReference::File(InputPath::resolve_with_fs(path.clone(), &fs).unwrap()); + let stream = Stream::Sequential(Box::new(File::open(&path).unwrap())); + let input = IndexedInput::from_stream(reference, stream, &indexer).unwrap(); + let mut blocks = input.into_blocks().sorted().collect_vec(); + assert_eq!(blocks.len(), 2); + assert_eq!(blocks[0].lines_valid(), 1); + assert_eq!(blocks[0].size(), 74); + assert_eq!(blocks[1].lines_valid(), 1); + assert_eq!(blocks[1].size(), 73); + let lines = blocks.pop().unwrap().into_lines().unwrap().collect_vec(); + assert_eq!(lines.len(), 1); + assert_eq!(lines[0].len(), 73); + let lines = blocks.pop().unwrap().into_lines().unwrap().collect_vec(); + assert_eq!(lines.len(), 1); + assert_eq!(lines[0].len(), 74); + } + } + + // --- + struct FailingReader; impl Read for FailingReader { - fn read(&mut self, _buf: &mut [u8]) -> std::io::Result { - Err(std::io::Error::new(std::io::ErrorKind::Other, "read error")) + fn read(&mut self, _buf: &mut [u8]) -> io::Result { + Err(io::Error::new(io::ErrorKind::Other, "read error")) + } + } + + impl Seek for FailingReader { + fn seek(&mut self, from: SeekFrom) -> io::Result { + match from { + SeekFrom::Start(0) => Ok(0), + SeekFrom::Current(0) => Ok(0), + SeekFrom::End(0) => Ok(0), + _ => Err(io::Error::new(io::ErrorKind::Other, "seek error")), + } + } + } + + impl Meta for FailingReader { + fn metadata(&self) -> std::io::Result> { + Ok(None) + } + } + + // --- + + struct UnseekableReader(R); + + impl Read for UnseekableReader { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + self.0.read(buf) + } + } + + impl Seek for UnseekableReader { + fn seek(&mut self, _: SeekFrom) -> io::Result { + Err(io::Error::new(io::ErrorKind::Other, "seek error")) + } + } + + impl Meta for UnseekableReader { + fn metadata(&self) -> io::Result> { + Ok(None) } } } diff --git a/src/lib.rs b/src/lib.rs index 0eb2afcd..7b287f48 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -33,6 +33,7 @@ mod replay; mod scanning; mod serdex; mod tee; +mod vfs; // conditional public modules #[cfg_attr(unix, path = "signal_unix.rs")] diff --git a/src/main.rs b/src/main.rs index 0e3a829d..008edaa4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,8 +11,8 @@ use std::{ // third-party imports use chrono::Utc; use clap::{CommandFactory, Parser}; +use env_logger::fmt::TimestampPrecision; use itertools::Itertools; -use phf::phf_map; // local imports use hl::{ @@ -29,14 +29,6 @@ use hl::{ Delimiter, {IncludeExcludeKeyFilter, KeyMatchOptions}, }; -// supported compression formats -const COMPRESSION_FORMATS: phf::Map<&str, &str> = phf_map! { - "bz2" => "bzip2", - "gz" => "gzip", - "xz" => "xz", - "zst" => "zstd" -}; - // --- fn run() -> Result<()> { @@ -48,6 +40,13 @@ fn run() -> Result<()> { return cli::Opt::command().print_help().map_err(Error::Io); } + if opt.debug { + env_logger::builder() + .format_timestamp(Some(TimestampPrecision::Micros)) + .init(); + log::debug!("logging initialized"); + } + if let Some(shell) = opt.shell_completions { let mut cmd = cli::Opt::command(); let name = cmd.get_name().to_string(); @@ -246,18 +245,18 @@ fn run() -> Result<()> { flatten: opt.flatten != cli::FlattenOption::Never, }); - // Configure input. + // Configure the input. let mut inputs = opt .files .iter() .map(|x| { if x.to_str() == Some("-") { - InputReference::Stdin + Ok::<_, std::io::Error>(InputReference::Stdin) } else { - InputReference::File(x.clone()) + Ok(InputReference::File(x.clone().try_into()?)) } }) - .collect::>(); + .collect::, _>>()?; if inputs.len() == 0 { if stdin().is_terminal() { let mut cmd = cli::Opt::command(); @@ -266,21 +265,8 @@ fn run() -> Result<()> { inputs.push(InputReference::Stdin); } - if opt.sort || opt.follow { - for input in &inputs { - if let InputReference::File(path) = input { - if let Some(Some(ext)) = path.extension().map(|x| x.to_str()) { - if let Some(&format) = COMPRESSION_FORMATS.get(ext) { - return Err(Error::UnsupportedFormatForIndexing { - path: path.clone(), - format: format.into(), - }); - } - } - } - } - } - + let n = inputs.len(); + log::debug!("hold {n} inputs"); let inputs = inputs .into_iter() .map(|input| input.hold().map_err(Error::Io)) @@ -313,6 +299,8 @@ fn run() -> Result<()> { } }; + log::debug!("run the app"); + // Run the app. let run = || match app.run(inputs, output.as_mut()) { Ok(()) => Ok(()), diff --git a/src/replay.rs b/src/replay.rs index 2f1a4d28..a043a5a2 100644 --- a/src/replay.rs +++ b/src/replay.rs @@ -26,14 +26,24 @@ type Buf = Vec; // --- +/// A trait that allows caching data for a given key. pub trait Cache { type Key: Copy + Clone + Eq + PartialEq + Ord + PartialOrd + Hash; fn cache Result>(&mut self, key: Self::Key, f: F) -> Result<&[u8]>; } +impl Cache for &mut C { + type Key = C::Key; + + fn cache Result>(&mut self, key: Self::Key, f: F) -> Result<&[u8]> { + (**self).cache(key, f) + } +} + // --- +/// A buffer that holds compressed segments of continuous data. pub struct ReplayBuf { segment_size: NonZeroUsize, segments: Vec, @@ -58,24 +68,42 @@ impl TryFrom for ReplayBuf { } } +impl ReplayBufRead for ReplayBuf { + fn size(&self) -> usize { + self.size + } + + fn segment_size(&self) -> NonZeroUsize { + self.segment_size + } + + fn segments(&self) -> &Vec { + &self.segments + } +} + // --- +/// A creator for ReplayBuf. pub struct ReplayBufCreator { buf: ReplayBuf, scratch: ReusableBuf, } impl ReplayBufCreator { + /// Create a new ReplayBufCreator with the default options. pub fn new() -> Self { - Self::build().result() + Self::build().done() } + /// Create a builder for ReplayBufCreator with the default options. pub fn build() -> ReplayBufCreatorBuilder { ReplayBufCreatorBuilder { segment_size: DEFAULT_SEGMENT_SIZE.unwrap(), } } + /// Returns the created ReplayBuf. pub fn result(mut self) -> Result { self.flush()?; Ok(self.buf) @@ -112,34 +140,32 @@ impl Write for ReplayBufCreator { fn flush(&mut self) -> Result<()> { if self.scratch.len() != 0 { - let buf = self.scratch.clear(); + let buf = self.scratch.bytes(); self.buf.segments.push(CompressedBuf::try_from(buf)?); self.buf.size += buf.len(); + self.scratch.clear(); } Ok(()) } } -impl From for ReplayBufCreator { - fn from(builder: ReplayBufCreatorBuilder) -> Self { - builder.result() - } -} - // --- +/// A builder for ReplayBufCreator. pub struct ReplayBufCreatorBuilder { segment_size: NonZeroUsize, } impl ReplayBufCreatorBuilder { + /// Set the segment size for the ReplayBuf. #[allow(dead_code)] pub fn segment_size(mut self, segment_size: NonZeroUsize) -> Self { self.segment_size = segment_size; self } - pub fn result(self) -> ReplayBufCreator { + /// Create a new ReplayBufCreator with the current configuration. + pub fn done(self) -> ReplayBufCreator { ReplayBufCreator { buf: ReplayBuf::new(self.segment_size), scratch: ReusableBuf::new(self.segment_size.get()), @@ -149,18 +175,44 @@ impl ReplayBufCreatorBuilder { // --- -pub struct ReplayBufReader { - buf: ReplayBuf, +/// A trait that allows reading from a ReplayBuf. +pub trait ReplayBufRead { + fn segment_size(&self) -> NonZeroUsize; + fn size(&self) -> usize; + fn segments(&self) -> &Vec; +} + +impl ReplayBufRead for &B { + fn segment_size(&self) -> NonZeroUsize { + (**self).segment_size() + } + + fn size(&self) -> usize { + (**self).size() + } + + fn segments(&self) -> &Vec { + (**self).segments() + } +} + +// --- + +/// A Read implementation that reads from a ReplayBuf. +pub struct ReplayBufReader { + buf: B, cache: C, position: usize, } -impl ReplayBufReader> { - pub fn new(buf: ReplayBuf) -> Self { - Self::build(buf).result() +impl ReplayBufReader> { + /// Create a new ReplayBufReader with a default cache. + pub fn new(buf: B) -> Self { + Self::build(buf).done() } - pub fn build(buf: ReplayBuf) -> ReplayBufReaderBuilder> { + /// Create a builder for ReplayBufReader with a default cache. + pub fn build(buf: B) -> ReplayBufReaderBuilder> { ReplayBufReaderBuilder { buf, cache: MinimalCache::new(), @@ -169,47 +221,47 @@ impl ReplayBufReader> { } } -impl> ReplayBufReader { +impl> ReplayBufReader { #[inline(always)] fn segment_size(&self) -> NonZeroUsize { - self.buf.segment_size + self.buf.segment_size() } fn segment(&mut self, index: usize) -> Result<&[u8]> { - if index >= self.buf.segments.len() { - panic!("logic error") - } + assert!(index < self.buf.segments().len()); let ss = self.segment_size().get(); - let data = &mut self.buf.segments; + let data = self.buf.segments(); self.cache.cache(index, || { let mut buf = vec![0; ss]; - data[index].decode(&mut buf)?; + let n = data[index].decode(&mut buf)?; + buf.truncate(n); + assert!(n == ss || index == data.len() - 1); Ok(buf) }) } fn from_start(&self, offset: u64) -> Option { - usize::try_from(offset).ok().filter(|&v| v <= self.buf.size) + usize::try_from(offset).ok().filter(|&v| v <= self.buf.size()) } fn from_current(&self, offset: i64) -> Option { usize::try_from(i64::try_from(self.position).ok()?.checked_add(offset)?) .ok() - .filter(|&v| v <= self.buf.size) + .filter(|&v| v <= self.buf.size()) } - fn from_end(&mut self, offset: i64) -> Option { - usize::try_from(i64::try_from(self.buf.size).ok()?.checked_add(offset)?) + fn from_end(&self, offset: i64) -> Option { + usize::try_from(i64::try_from(self.buf.size()).ok()?.checked_add(offset)?) .ok() - .filter(|&v| v <= self.buf.size) + .filter(|&v| v <= self.buf.size()) } } -impl> Read for ReplayBufReader { +impl> Read for ReplayBufReader { fn read(&mut self, buf: &mut [u8]) -> Result { let mut i = 0; let ss = self.segment_size().get(); - loop { + while self.position < self.buf.size() { let segment = self.position / self.segment_size(); let offset = self.position % self.segment_size(); let data = self.segment(segment)?; @@ -222,10 +274,11 @@ impl> Read for ReplayBufReader { return Ok(i); } } + Ok(i) } } -impl> Seek for ReplayBufReader { +impl> Seek for ReplayBufReader { fn seek(&mut self, pos: SeekFrom) -> Result { let pos = match pos { SeekFrom::Start(pos) => self.from_start(pos), @@ -233,29 +286,25 @@ impl> Seek for ReplayBufReader { SeekFrom::End(pos) => self.from_end(pos), }; let pos = pos.ok_or_else(|| Error::new(ErrorKind::InvalidInput, "position out of range"))?; - let pos = min(pos, self.buf.size); + let pos = min(pos, self.buf.size()); self.position = pos; u64::try_from(pos).map_err(|e| Error::new(ErrorKind::InvalidInput, e)) } } -impl From> for ReplayBufReader { - fn from(builder: ReplayBufReaderBuilder) -> Self { - builder.result() - } -} - // --- -pub struct ReplayBufReaderBuilder { - buf: ReplayBuf, +/// A builder for ReplayBufReader. +pub struct ReplayBufReaderBuilder { + buf: B, cache: C, position: usize, } -impl ReplayBufReaderBuilder { +impl ReplayBufReaderBuilder { + /// Set the cache to use for uncompressed segments read from ReplayBuf. #[allow(dead_code)] - pub fn cache(self, cache: C2) -> ReplayBufReaderBuilder { + pub fn cache(self, cache: C2) -> ReplayBufReaderBuilder { ReplayBufReaderBuilder { buf: self.buf, cache, @@ -263,13 +312,14 @@ impl ReplayBufReaderBuilder { } } + /// Set the position to start reading from. #[allow(dead_code)] - pub fn position(mut self, position: usize) -> Self { - self.position = position; - self + pub fn position(self, position: usize) -> Self { + Self { position, ..self } } - pub fn result(self) -> ReplayBufReader { + /// Create a new ReplayBufReader with the current configuration. + pub fn done(self) -> ReplayBufReader { ReplayBufReader { buf: self.buf, cache: self.cache, @@ -280,18 +330,180 @@ impl ReplayBufReaderBuilder { // --- +/// A Read + Seek implementation that reads from a Read source and caches the +/// data in a ReplayBuf. It supports seeking to any position and reading from there. It is useful +/// for reading from a source that does not support seeking, but where seeking is required. +pub struct ReplaySeekReader { + r: R, + w: ReplayBufCreator, + cache: C, + position: usize, + drained: bool, +} + +impl ReplaySeekReader> { + /// Create a new ReplaySeekReader with a default LRU cache. + pub fn new(r: R) -> Self { + Self::build(r).done() + } + + /// Create a builder for ReplaySeekReader with a default LRU cache. + pub fn build(r: R) -> ReplaySeekReaderBuilder> { + ReplaySeekReaderBuilder { + r, + w: ReplayBufCreator::build(), + cache: LruCache::default(), + } + } +} + +impl ReplaySeekReader { + fn end(&self) -> usize { + self.w.buf.size + self.w.scratch.len() + } + + fn from_start(&self, offset: u64) -> Option { + usize::try_from(offset).ok() + } + + fn from_current(&self, offset: i64) -> Option { + usize::try_from(i64::try_from(self.position).ok()?.checked_add(offset)?).ok() + } + + fn from_end(&self, offset: i64) -> Option { + usize::try_from(i64::try_from(self.end()).ok()?.checked_add(offset)?).ok() + } +} + +impl> Read for ReplaySeekReader { + fn read(&mut self, buf: &mut [u8]) -> Result { + if self.position == self.end() { + if self.drained { + return Ok(0); + } + let n = self.r.read(buf)?; + self.w.write_all(&buf[..n])?; + self.position += n; + if n == 0 { + self.w.flush()?; + self.drained = true; + } + return Ok(n); + } + + if self.position < self.w.buf.size { + let mut reader = ReplayBufReader { + buf: &self.w.buf, + cache: &mut self.cache, + position: self.position, + }; + let n = reader.read(buf)?; + self.position += n; + assert!(self.position <= self.w.buf.size); + return Ok(n); + } + + let offset = self.position - self.w.buf.size; + assert!(offset < self.w.scratch.len()); + let n = min(self.w.scratch.len() - offset, buf.len()); + buf[..n].copy_from_slice(&self.w.scratch.bytes()[offset..offset + n]); + self.position += n; + + return Ok(n); + } +} + +impl> Seek for ReplaySeekReader { + fn seek(&mut self, pos: SeekFrom) -> Result { + let pos = match pos { + SeekFrom::Start(pos) => self.from_start(pos), + SeekFrom::Current(pos) => self.from_current(pos), + SeekFrom::End(pos) => { + if !self.drained { + let old = self.position; + let result = std::io::copy(self, &mut std::io::sink()); + self.position = old; + result?; + assert!(self.drained); + } + self.from_end(pos) + } + }; + let pos = pos.filter(|&v| !self.drained || v <= self.w.buf.size); + let pos = pos.ok_or_else(|| Error::new(ErrorKind::InvalidInput, "position out of range"))?; + + if pos > self.end() { + let n = pos - self.end(); + let old = self.position; + self.position = self.end(); + let result = std::io::copy(&mut self.take(n as u64), &mut std::io::sink()); + self.position = old; + result?; + } + + assert!(pos <= self.end()); + self.position = pos; + + Ok(pos as u64) + } +} + +// --- + +/// A builder for ReplaySeekReader. +pub struct ReplaySeekReaderBuilder { + r: R, + w: ReplayBufCreatorBuilder, + cache: C, +} + +impl ReplaySeekReaderBuilder { + /// Set the cache to use for uncompressed segments read from ReplayBuf. + #[allow(dead_code)] + pub fn cache(self, cache: C2) -> ReplaySeekReaderBuilder { + ReplaySeekReaderBuilder { + r: self.r, + w: self.w, + cache, + } + } + + /// Set the segment size for the ReplayBuf. + #[allow(dead_code)] + pub fn segment_size(mut self, segment_size: NonZeroUsize) -> Self { + self.w.segment_size = segment_size; + self + } + + /// Create a new ReplaySeekReader with the current configuration. + pub fn done(self) -> ReplaySeekReader { + ReplaySeekReader { + r: self.r, + w: self.w.done(), + cache: self.cache, + position: 0, + drained: false, + } + } +} + +// --- + +/// A buffer that can be compressed and decompressed in segments. #[derive(Default)] pub struct CompressedBuf(Vec); impl CompressedBuf { + /// Create a new CompressedBuf from the given data. pub fn new(data: &[u8]) -> Result { let mut encoded = Vec::new(); FrameEncoder::new(&mut encoded).write_all(data)?; Ok(Self(encoded)) } - pub fn decode(&self, buf: &mut [u8]) -> Result<()> { - FrameDecoder::new(&self.0[..]).read_exact(buf) + /// Decode the compressed data into the given buffer. + pub fn decode(&self, buf: &mut [u8]) -> Result { + FrameDecoder::new(&self.0[..]).read_fill(buf) } } @@ -364,6 +576,7 @@ impl ReusableBuf { // --- +/// Minimal cache implementation that caches a single value. pub struct MinimalCache { data: Option<(Key, Buf)>, } @@ -385,8 +598,15 @@ impl Cache for Min } } +impl Default for MinimalCache { + fn default() -> Self { + Self::new() + } +} + // --- +/// LRU cache implementation that caches up to a given number of values. pub struct LruCache { limit: usize, data: BTreeMap<(Instant, Key), Buf>, @@ -395,6 +615,7 @@ pub struct LruCache { #[allow(dead_code)] impl LruCache { + /// Create a new LruCache with the given limit. pub fn new(limit: usize) -> Self { Self { limit, @@ -436,6 +657,12 @@ impl Cache for Lru } } +impl Default for LruCache { + fn default() -> Self { + Self::new(4) + } +} + // --- pub trait ReaderFactory { @@ -468,7 +695,7 @@ pub struct RewindingReader { impl RewindingReader> { #[allow(dead_code)] pub fn new(factory: F) -> Result { - Self::build(factory).result() + Self::build(factory).done() } pub fn build(factory: F) -> RewindingReaderBuilder> { @@ -595,7 +822,7 @@ impl RewindingReaderBuilder { self } - pub fn result(self) -> Result> { + pub fn done(self) -> Result> { Ok(RewindingReader { inner: self.factory.new_reader()?, factory: self.factory, @@ -620,12 +847,57 @@ impl ReadSeek for T {} #[cfg(test)] mod tests { use super::*; - use std::{io::Cursor, str}; + use core::str; + use nonzero_ext::nonzero; + use std::{io::Cursor, num::NonZero}; fn dual<'a>(b: &[u8]) -> (&str, &[u8]) { (str::from_utf8(b).unwrap(), b) } + #[test] + fn test_replay_buf() { + let mut w = ReplayBufCreator::build().segment_size(nonzero!(4 as usize)).done(); + w.write_all(b"Lorem ipsum dolor sit amet.").unwrap(); + w.flush().unwrap(); + w.flush().unwrap(); + let r = ReplayBuf::try_from(w).unwrap(); + + assert_eq!(r.segment_size().get(), 4); + assert_eq!(r.size(), 27); + assert_eq!(r.segments().len(), 7); + + let mut buf = vec![0; 27]; + let n = r.segments()[0].decode(&mut buf).unwrap(); + buf.truncate(n); + assert_eq!(n, 4); + assert_eq!(dual(&buf), dual(b"Lore")); + } + + #[test] + fn test_replay_buf_reader() { + let data = b"Lorem ipsum dolor sit amet."; + let mut creator = ReplayBufCreator::new(); + creator.write_all(data).unwrap(); + let buf = ReplayBuf::try_from(creator).unwrap(); + let mut r = ReplayBufReader::build(buf) + .cache(MinimalCache::default()) + .position(6) + .done(); + + let pos = r.seek(SeekFrom::Current(0)).unwrap(); + assert_eq!(pos, 6); + let mut buf = vec![0; 11]; + r.read_exact(&mut buf).unwrap(); + assert_eq!(dual(&buf), dual(b"ipsum dolor")); + + let pos = r.seek(SeekFrom::End(-9)).unwrap(); + assert_eq!(pos, 18); + let mut buf = vec![]; + r.read_to_end(&mut buf).unwrap(); + assert_eq!(dual(&buf), dual(b"sit amet.")); + } + fn test_rewinding_reader Box>(f: F) { let mut r = f(4, "Lorem ipsum dolor sit amet."); @@ -670,12 +942,22 @@ mod tests { Box::new( RewindingReader::build(move || Ok(Cursor::new(data.clone()))) .block_size(block_size.try_into().unwrap()) - .result() + .done() .unwrap(), ) }); } + #[test] + fn test_rewinding_reader_new() { + test_rewinding_reader(|block_size, data| { + let data = data.as_bytes().to_vec(); + let mut r = RewindingReader::new(move || Ok(Cursor::new(data.clone()))).unwrap(); + r.block_size = NonZero::try_from(block_size as u64).unwrap(); + Box::new(r) + }); + } + #[test] fn test_rewinding_reader_lru() { test_rewinding_reader(|block_size, data| { @@ -684,9 +966,58 @@ mod tests { RewindingReader::build(move || Ok(Cursor::new(data.clone()))) .block_size(block_size.try_into().unwrap()) .cache(LruCache::new(3)) - .result() + .done() .unwrap(), ) }); } + + #[test] + fn test_replay_seek_reader() { + let data = b"Lorem ipsum dolor sit amet."; + let s = |buf| str::from_utf8(buf).unwrap(); + let mut r = ReplaySeekReader::build(Cursor::new(data)) + .segment_size(nonzero!(4 as usize)) + .done(); + + let pos = r.seek(SeekFrom::Start(6)).unwrap(); + assert_eq!(pos, 6); + let mut buf = vec![0; 5]; + r.read_exact(&mut buf).unwrap(); + assert_eq!(s(&buf), "ipsum"); + + let pos = r.seek(SeekFrom::Current(7)).unwrap(); + assert_eq!(pos, 18); + let mut buf = vec![0; 3]; + r.read_exact(&mut buf).unwrap(); + assert_eq!(s(&buf), "sit"); + + let pos = r.seek(SeekFrom::Current(-9)).unwrap(); + assert_eq!(pos, 12); + let mut buf = vec![0; 5]; + r.read_exact(&mut buf).unwrap(); + assert_eq!(s(&buf), "dolor"); + + let pos = r.seek(SeekFrom::End(-5)).unwrap(); + assert_eq!(pos, 22); + let mut buf = vec![]; + r.read_to_end(&mut buf).unwrap(); + assert_eq!(s(&buf), "amet."); + + let pos = r.seek(SeekFrom::End(0)).unwrap(); + assert_eq!(pos, 27); + let mut buf = vec![]; + r.read_to_end(&mut buf).unwrap(); + assert_eq!(s(&buf), ""); + + let mut r = ReplaySeekReader::new(Cursor::new(data)); + let pos = r.seek(SeekFrom::End(0)).unwrap(); + assert_eq!(pos, 27); + + let mut r = ReplaySeekReader::build(Cursor::new(data)) + .cache(MinimalCache::default()) + .done(); + let pos = r.seek(SeekFrom::End(-7)).unwrap(); + assert_eq!(pos, 20); + } } diff --git a/src/settings.rs b/src/settings.rs index 84ab32e1..26cd9558 100644 --- a/src/settings.rs +++ b/src/settings.rs @@ -5,7 +5,7 @@ use std::include_str; // third-party imports use chrono_tz::Tz; use config::{Config, File, FileFormat}; -use derive_deref::Deref; +use derive_more::Deref; use once_cell::sync::Lazy; use serde::{Deserialize, Serialize, Serializer}; use strum::IntoEnumIterator; @@ -113,6 +113,13 @@ pub struct PredefinedFields { pub caller_line: CallerLineField, } +impl Default for &PredefinedFields { + fn default() -> Self { + static DEFAULT: Lazy = Lazy::new(|| PredefinedFields::default()); + &DEFAULT + } +} + // --- #[derive(Debug, Serialize, Deserialize, Deref, Clone, PartialEq, Eq)] diff --git a/src/tee.rs b/src/tee.rs index 28659c53..09da79d1 100644 --- a/src/tee.rs +++ b/src/tee.rs @@ -6,14 +6,16 @@ use std::io::{Read, Result, Write}; pub struct TeeReader { reader: R, writer: W, + processed: usize, } impl TeeReader { #[inline] pub fn new(reader: R, writer: W) -> TeeReader { TeeReader { - reader: reader, - writer: writer, + reader, + writer, + processed: 0, } } @@ -34,12 +36,18 @@ impl TeeReader { pub fn into(self) -> (R, W) { (self.reader, self.writer) } + + #[inline] + pub fn processed(&self) -> usize { + self.processed + } } impl Read for TeeReader { fn read(&mut self, buf: &mut [u8]) -> Result { let n = self.reader.read(buf)?; self.writer.write_all(&buf[..n])?; + self.processed += n; Ok(n) } } diff --git a/src/themecfg.rs b/src/themecfg.rs index a9540f50..bd4527c7 100644 --- a/src/themecfg.rs +++ b/src/themecfg.rs @@ -9,7 +9,7 @@ use std::{ }; // third-party imports -use derive_deref::Deref; +use derive_more::Deref; use enum_map::Enum; use rust_embed::RustEmbed; use serde::{ diff --git a/src/vfs.rs b/src/vfs.rs new file mode 100644 index 00000000..22a11898 --- /dev/null +++ b/src/vfs.rs @@ -0,0 +1,437 @@ +// stdlib imports +use std::{ + fs, + io::{self, Read, Seek, Write}, + path::{Path, PathBuf}, +}; + +// third-party imports +#[cfg(test)] +use mockall::mock; + +// --- + +pub trait FileSystem { + type Metadata: 'static; + + fn canonicalize(&self, path: &Path) -> io::Result; + fn metadata(&self, path: &Path) -> io::Result; + fn exists(&self, path: &Path) -> io::Result; + fn open(&self, path: &Path) -> io::Result + Send + Sync>>; + fn create(&self, path: &Path) -> io::Result + Send + Sync>>; +} + +#[cfg(test)] +mock! { + pub FileSystem {} + + impl FileSystem for FileSystem { + type Metadata = M; + + fn canonicalize(&self, path: &Path) -> io::Result; + fn metadata(&self, path: &Path) -> io::Result; + fn exists(&self, path: &Path) -> io::Result; + fn open(&self, path: &Path) -> io::Result + Send + Sync>>; + fn create(&self, path: &Path) -> io::Result + Send + Sync>>; + } +} + +macro_rules! delegate_fs_methods { + () => { + #[inline] + fn canonicalize(&self, path: &Path) -> io::Result { + (**self).canonicalize(path) + } + + #[inline] + fn metadata(&self, path: &Path) -> io::Result { + (**self).metadata(path) + } + + #[inline] + fn exists(&self, path: &Path) -> io::Result { + (**self).exists(path) + } + + #[inline] + fn open(&self, path: &Path) -> io::Result + Send + Sync>> { + (**self).open(path) + } + + #[inline] + fn create(&self, path: &Path) -> io::Result + Send + Sync>> { + (**self).create(path) + } + }; +} + +impl<'a, T> FileSystem for &'a T +where + T: FileSystem, +{ + type Metadata = T::Metadata; + delegate_fs_methods!(); +} + +impl FileSystem for std::sync::Arc +where + T: FileSystem, +{ + type Metadata = T::Metadata; + delegate_fs_methods!(); +} + +// --- + +pub trait Meta { + type Metadata; + + fn metadata(&self) -> io::Result; +} + +impl Meta for fs::File { + type Metadata = fs::Metadata; + + #[inline] + fn metadata(&self) -> io::Result { + self.metadata() + } +} + +// --- + +pub trait FileRead: Read + Seek + Meta {} + +impl FileRead for T {} + +#[cfg(test)] +mock! { + pub FileRead {} + + impl Read for FileRead { + fn read(&mut self, buf: &mut [u8]) -> io::Result; + } + + impl Seek for FileRead { + fn seek(&mut self, pos: io::SeekFrom) -> io::Result; + } + + impl Meta for FileRead { + type Metadata = M; + + fn metadata(&self) -> io::Result; + } +} + +// --- + +pub trait FileReadWrite: FileRead + Write {} + +impl FileReadWrite for T {} + +#[cfg(test)] +mock! { + pub FileReadWrite {} + + impl Read for FileReadWrite { + fn read(&mut self, buf: &mut [u8]) -> io::Result; + } + + impl Seek for FileReadWrite { + fn seek(&mut self, pos: io::SeekFrom) -> io::Result; + } + + impl Meta for FileReadWrite { + type Metadata = M; + + fn metadata(&self) -> io::Result; + } + + impl Write for FileReadWrite { + fn write(&mut self, buf: &[u8]) -> io::Result; + fn flush(&mut self) -> io::Result<()>; + } +} + +// --- + +#[derive(Default)] +pub struct LocalFileSystem; + +impl FileSystem for LocalFileSystem { + type Metadata = fs::Metadata; + + #[inline] + fn canonicalize(&self, path: &Path) -> io::Result { + fs::canonicalize(path) + } + + #[inline] + fn metadata(&self, path: &Path) -> io::Result { + fs::metadata(path) + } + + #[inline] + fn exists(&self, path: &Path) -> io::Result { + fs::exists(path) + } + + #[inline] + fn open(&self, path: &Path) -> io::Result + Send + Sync>> { + Ok(Box::new(fs::File::open(path)?)) + } + + #[inline] + fn create(&self, path: &Path) -> io::Result + Send + Sync>> { + Ok(Box::new(fs::File::create(path)?)) + } +} + +// --- + +#[cfg(test)] +pub mod mem { + use super::{FileRead, Meta}; + + use std::{ + collections::HashMap, + io::{self, Read, Seek, Write}, + path::{Path, PathBuf}, + sync::{Arc, RwLock}, + time::SystemTime, + }; + + use clean_path::Clean; + + // --- + + #[derive(Copy, Clone, Debug)] + #[allow(dead_code)] + pub struct Metadata { + pub len: usize, + pub created: SystemTime, + pub modified: SystemTime, + } + + #[derive(Copy, Clone)] + struct InternalMetadata { + pub created: SystemTime, + pub modified: SystemTime, + } + + impl InternalMetadata { + #[inline] + fn new() -> Self { + let now = SystemTime::now(); + + Self { + created: now, + modified: now, + } + } + } + + impl From<(usize, InternalMetadata)> for Metadata { + #[inline] + fn from(meta: (usize, InternalMetadata)) -> Self { + Self { + len: meta.0, + created: meta.1.created, + modified: meta.1.modified, + } + } + } + + struct File { + data: Vec, + meta: InternalMetadata, + } + + impl File { + #[inline] + fn new(data: Vec) -> Self { + Self { + data, + meta: InternalMetadata::new(), + } + } + + #[inline] + fn metadata(&self) -> Metadata { + (self.data.len(), self.meta).into() + } + } + + // --- + + struct FileCursor { + file: Arc>, + pos: usize, + } + + impl FileCursor { + #[inline] + fn new(file: Arc>) -> Self { + Self { file, pos: 0 } + } + } + + impl Read for FileCursor { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + let data = &self.file.read().unwrap().data; + let len = buf.len().min(data.len() - self.pos); + buf[..len].copy_from_slice(&data[self.pos..self.pos + len]); + self.pos += len; + Ok(len) + } + } + + impl Write for FileCursor { + fn write(&mut self, buf: &[u8]) -> io::Result { + let file = &mut self.file.write().unwrap(); + let data = &mut file.data; + if self.pos + buf.len() > data.len() { + data.resize(self.pos + buf.len(), 0); + } + data[self.pos..self.pos + buf.len()].copy_from_slice(buf); + file.meta.modified = SystemTime::now(); + self.pos += buf.len(); + Ok(buf.len()) + } + + #[inline] + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } + } + + impl Seek for FileCursor { + fn seek(&mut self, pos: io::SeekFrom) -> io::Result { + let new_pos = match pos { + io::SeekFrom::Start(offset) => offset as usize, + io::SeekFrom::Current(offset) => (self.pos as i64 + offset) as usize, + io::SeekFrom::End(offset) => (self.file.read().unwrap().data.len() as i64 + offset) as usize, + }; + self.pos = new_pos; + Ok(new_pos as u64) + } + } + + impl Meta for FileCursor { + type Metadata = Metadata; + + #[inline] + fn metadata(&self) -> io::Result { + Ok(self.file.read().unwrap().metadata()) + } + } + + // --- + + #[derive(Default)] + pub struct FileSystem { + files: RwLock>>>, + } + + impl FileSystem { + pub fn new() -> Self { + FileSystem { + files: RwLock::new(HashMap::new()), + } + } + } + + impl super::FileSystem for FileSystem { + type Metadata = Metadata; + + fn canonicalize(&self, path: &Path) -> io::Result { + Ok(PathBuf::from("/tmp").join(path).clean()) + } + + fn metadata(&self, path: &Path) -> io::Result { + let path = self.canonicalize(path)?; + let files = self.files.read().unwrap(); + if let Some(file) = files.get(&path) { + Ok(file.read().unwrap().metadata()) + } else { + Err(io::Error::new(io::ErrorKind::NotFound, "file not found")) + } + } + + fn exists(&self, path: &Path) -> io::Result { + let path = self.canonicalize(path)?; + let files = self.files.read().unwrap(); + Ok(files.contains_key(&path)) + } + + fn open(&self, path: &Path) -> io::Result + Send + Sync>> { + let path = self.canonicalize(path)?; + let files = self.files.read().unwrap(); + if let Some(file) = files.get(&path) { + Ok(Box::new(FileCursor::new(file.clone()))) + } else { + Err(io::Error::new(io::ErrorKind::NotFound, "file not found")) + } + } + + fn create( + &self, + path: &Path, + ) -> io::Result + Send + Sync>> { + let path = self.canonicalize(path)?; + let mut files = self.files.write().unwrap(); + if files.contains_key(&path) { + return Err(io::Error::new(io::ErrorKind::AlreadyExists, "file already exists")); + } + let file = Arc::new(RwLock::new(File::new(Vec::new()))); + files.insert(path.clone(), file.clone()); + Ok(Box::new(FileCursor::new(file))) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_filesystem() { + let fs = mem::FileSystem::new(); + let path = Path::new("file.txt"); + + assert_eq!(fs.exists(path).unwrap(), false); + + let mut file = fs.create(path).unwrap(); + file.write_all(b"hello world").unwrap(); + file.flush().unwrap(); + + let res = fs.create(path); + assert!(res.is_err()); + assert_eq!(res.err().map(|e| e.kind()), Some(io::ErrorKind::AlreadyExists)); + + assert_eq!(fs.exists(path).unwrap(), true); + + let meta = fs.metadata(path).unwrap(); + assert_eq!(meta.len, 11); + + let res = fs.metadata(Path::new("nonexistent.txt")); + assert!(res.is_err()); + assert!(matches!(res.unwrap_err().kind(), io::ErrorKind::NotFound)); + + let canonical_path = fs.canonicalize(path).unwrap(); + assert_eq!(canonical_path, PathBuf::from("/tmp/file.txt")); + + let mut file = fs.open(path).unwrap(); + let mut buf = Vec::new(); + file.read_to_end(&mut buf).unwrap(); + + assert_eq!(buf, b"hello world"); + + let meta = file.metadata().unwrap(); + assert_eq!(meta.len, 11); + + let res = fs.open(Path::new("nonexistent.txt")); + assert!(res.is_err()); + assert_eq!(res.err().map(|e| e.kind()), Some(io::ErrorKind::NotFound)); + } +}