diff --git a/Cargo.lock b/Cargo.lock index ef7339e3..5f91a883 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -400,6 +400,12 @@ dependencies = [ "roff", ] +[[package]] +name = "clean-path" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aaa6b4b263a5d737e9bf6b7c09b72c41a5480aec4d7219af827f6564e950b6a5" + [[package]] name = "closure" version = "0.3.0" @@ -971,6 +977,7 @@ dependencies = [ "clap", "clap_complete", "clap_mangen", + "clean-path", "closure", "collection_macros", "config", diff --git a/Cargo.toml b/Cargo.toml index 1338711f..39a11148 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -90,6 +90,7 @@ kqueue = "1" [dev-dependencies] byte-strings = "0" +clean-path = "0" criterion = "0" maplit = "1" mockall = "0" diff --git a/sample/test.log b/sample/test.log index 8d66ed89..f0714e90 100644 --- a/sample/test.log +++ b/sample/test.log @@ -1 +1,2 @@ -{"ts":"2024-10-01T01:02:03Z","level":"info","msg":"some test message"} \ No newline at end of file +{"ts":"2024-10-01T01:02:04Z","level":"info","msg":"latest test message"} +{"ts":"2024-10-01T01:02:03Z","level":"info","msg":"earlier test message"} diff --git a/src/app.rs b/src/app.rs index bb3fa8db..7ed72829 100644 --- a/src/app.rs +++ b/src/app.rs @@ -40,6 +40,7 @@ use crate::{ settings::{FieldShowOption, Fields, Formatting}, theme::{Element, StylingPush, Theme}, timezone::Tz, + vfs::LocalFileSystem, IncludeExcludeKeyFilter, {error::*, QueryNone}, }; @@ -272,6 +273,7 @@ impl App { fn sort(&self, inputs: Vec, output: &mut Output) -> Result<()> { let mut output = BufWriter::new(output); let indexer_settings = IndexerSettings::new( + LocalFileSystem, self.options.buffer_size.try_into()?, self.options.max_message_size.try_into()?, &self.options.fields.settings.predefined, @@ -499,8 +501,8 @@ impl App { let reader = scope.spawn(closure!(clone sfi, clone txi, |_| -> Result<()> { let scanner = Scanner::new(sfi.clone(), &self.options.delimiter); let mut meta = None; - if let InputReference::File(filename) = &input_ref { - meta = Some(fs::metadata(filename)?); + if let InputReference::File(path) = &input_ref { + meta = Some(fs::metadata(&path.canonical)?); } let mut input = Some(input_ref.open()?.tail(self.options.tail)?); let is_file = |meta: &Option| meta.as_ref().map(|m|m.is_file()).unwrap_or(false); @@ -516,14 +518,14 @@ impl App { Ok(false) } }; - if let InputReference::File(filename) = &input_ref { + if let InputReference::File(path) = &input_ref { if process(&mut input, is_file(&meta))? { return Ok(()) } - fsmon::run(vec![filename.clone()], |event| { + fsmon::run(vec![path.canonical.clone()], |event| { match event.kind { EventKind::Modify(_) | EventKind::Create(_) | EventKind::Any | EventKind::Other => { - if let (Some(old_meta), Ok(new_meta)) = (&meta, fs::metadata(&filename)) { + if let (Some(old_meta), Ok(new_meta)) = (&meta, fs::metadata(&path.canonical)) { if old_meta.len() > new_meta.len() { input = None; } @@ -691,7 +693,7 @@ impl App { fn input_badges<'a, I: IntoIterator>(&self, inputs: I) -> Option> { let name = |input: &InputReference| match input { InputReference::Stdin => "".to_owned(), - InputReference::File(path) => path.to_string_lossy().to_string(), + InputReference::File(path) => path.original.to_string_lossy().to_string(), }; let mut badges = inputs.into_iter().map(|x| name(x).chars().collect_vec()).collect_vec(); diff --git a/src/index.rs b/src/index.rs index 68d90016..af5cbe05 100644 --- a/src/index.rs +++ b/src/index.rs @@ -16,11 +16,11 @@ use std::{ cmp::{max, min}, convert::{Into, TryFrom, TryInto}, fmt::{self, Display}, - fs::{self, File}, - io::{self, Read, Seek, Write}, + fs::{self}, + io::{self, Read, Write}, iter::empty, num::{NonZero, NonZeroU32}, - path::PathBuf, + path::{Path, PathBuf}, sync::Arc, time::{SystemTime, UNIX_EPOCH}, }; @@ -46,6 +46,7 @@ use crate::{ model::{Parser, ParserSettings, RawRecord}, scanning::{Delimiter, Scanner, Segment, SegmentBuf, SegmentBufFactory}, settings::PredefinedFields, + vfs::{FileRead, FileSystem, LocalFileSystem}, }; // types @@ -130,19 +131,20 @@ impl std::ops::Sub for Timestamp { // --- #[derive(Default)] -pub struct IndexerSettings<'a, FS: FileSystem> { - fs: FS, - buffer_size: BufferSize, - max_message_size: MessageSize, - fields: &'a PredefinedFields, - delimiter: Delimiter, - allow_prefix: bool, - unix_ts_unit: Option, - format: Option, +pub struct IndexerSettings<'a, FS> { + pub fs: FS, + pub buffer_size: BufferSize, + pub max_message_size: MessageSize, + pub fields: &'a PredefinedFields, + pub delimiter: Delimiter, + pub allow_prefix: bool, + pub unix_ts_unit: Option, + pub format: Option, } -impl<'a, FS: FileSystem + Default> IndexerSettings<'a, FS> { +impl<'a, FS: FileSystem> IndexerSettings<'a, FS> { pub fn new( + fs: FS, buffer_size: BufferSize, max_message_size: MessageSize, fields: &'a PredefinedFields, @@ -152,7 +154,7 @@ impl<'a, FS: FileSystem + Default> IndexerSettings<'a, FS> { format: Option, ) -> Self { Self { - fs: FS::default(), + fs, buffer_size, max_message_size, fields, @@ -242,7 +244,7 @@ impl From for u32 { // --- /// Allows log files indexing to enable message sorting. -pub struct Indexer { +pub struct Indexer { fs: FS, concurrency: usize, buffer_size: u32, @@ -254,7 +256,10 @@ pub struct Indexer { format: Option, } -impl Indexer { +impl Indexer +where + FS::Metadata: SourceMetadata, +{ /// Returns a new Indexer with the given parameters. pub fn new(concurrency: usize, dir: PathBuf, settings: IndexerSettings<'_, FS>) -> Self { Self { @@ -278,41 +283,36 @@ impl Indexer { /// Builds index for the given file. /// /// Builds the index, saves it to disk and returns it. - pub fn index(&self, source_path: &PathBuf) -> Result { - let (source_path, _, index_path, index, actual) = self.prepare(source_path, None)?; + pub fn index(&self, source_path: &Path) -> Result { + let (source_path, mut stream) = self.open_source(source_path)?; + let meta = Metadata::from(&stream.metadata()?)?; + let (index_path, index, actual) = self.prepare(&source_path, &meta)?; if actual { return Ok(index.unwrap()); } - self.build_index(&source_path, &index_path, index) + self.build_index_from_stream(&mut stream, &source_path, &meta, &index_path, index) } /// Builds index for the given file represended by a stream. /// /// The stream may be an uncompressed representation of the file. - pub fn index_stream( - &self, - stream: &mut Reader, - source_path: Option<&PathBuf>, - meta: Option, - ) -> Result { - let Some(source_path) = source_path else { - return self.index_in_memory(stream); - }; - - let (source_path, meta, index_path, index, actual) = self.prepare(source_path, meta)?; + /// The source_path parameter must be the canonical path of the file. + pub fn index_stream(&self, stream: &mut Reader, source_path: &Path, meta: &fs::Metadata) -> Result { + let meta = &meta.try_into()?; + let (index_path, index, actual) = self.prepare(source_path, meta)?; if actual { return Ok(index.unwrap()); } - self.build_index_from_stream(stream, &source_path, &index_path, meta, index) + self.build_index_from_stream(stream, source_path, meta, &index_path, index) } /// Builds an in-memory index for the given stream. pub fn index_in_memory(&self, input: &mut Reader) -> Result { self.process_file( &PathBuf::from(""), - Metadata { + &Metadata { len: 0, modified: (0, 0), }, @@ -322,24 +322,17 @@ impl Indexer { ) } - fn prepare( - &self, - source_path: &PathBuf, - meta: Option, - ) -> Result<(PathBuf, fs::Metadata, PathBuf, Option, bool)> { - let source_path = self.fs.canonicalize(source_path)?; - let meta = match meta { - Some(meta) => meta, - None => self.fs.metadata(&source_path)?, - }; + fn prepare(&self, source_path: &Path, meta: &Metadata) -> Result<(PathBuf, Option, bool)> { + assert_eq!(source_path, &self.fs.canonicalize(source_path)?); + let hash = hex::encode(sha256(source_path.to_string_lossy().as_bytes())); let index_path = self.dir.join(PathBuf::from(hash)); let mut existing_index = None; let mut actual = false; - log::debug!("canonical source path: {}", source_path.display()); - log::debug!("index file path: {}", index_path.display()); - log::debug!("source meta: size={} modified={:?}", meta.len(), ts(meta.modified()?)); + log::debug!("source path: {}", source_path.display()); + log::debug!("index file path: {}", index_path.display()); + log::debug!("source meta: size={} modified={:?}", meta.len, meta.modified); if self.fs.exists(&index_path)? { let mut file = match self.fs.open(&index_path) { @@ -357,45 +350,21 @@ impl Indexer { index.source().size, index.source().modified ); - if meta.len() == index.source().size && ts(meta.modified()?) == index.source().modified { + if meta.len == index.source().size && meta.modified == index.source().modified { actual = true; } existing_index = Some(index); } } - Ok((source_path, meta, index_path, existing_index, actual)) - } - - fn build_index(&self, source_path: &PathBuf, index_path: &PathBuf, existing_index: Option) -> Result { - let mut input = match self.fs.open(&source_path) { - Ok(input) => input, - Err(err) => { - return Err(Error::FailedToOpenFileForReading { - path: source_path.clone(), - source: err, - }); - } - }; - - let meta = match self.fs.metadata(&source_path) { - Ok(metadata) => metadata, - Err(err) => { - return Err(Error::FailedToGetFileMetadata { - path: source_path.clone(), - source: err, - }); - } - }; - - self.build_index_from_stream(&mut input, source_path, index_path, meta, existing_index) + Ok((index_path, existing_index, actual)) } fn build_index_from_stream( &self, stream: &mut Reader, - source_path: &PathBuf, + source_path: &Path, + meta: &Metadata, index_path: &PathBuf, - metadata: fs::Metadata, existing_index: Option, ) -> Result { let mut output = match self.fs.create(&index_path) { @@ -408,19 +377,13 @@ impl Indexer { } }; - self.process_file( - &source_path, - (&metadata).try_into()?, - stream, - &mut output, - existing_index, - ) + self.process_file(source_path, meta, stream, &mut output, existing_index) } fn process_file( &self, - path: &PathBuf, - metadata: Metadata, + path: &Path, + metadata: &Metadata, input: &mut Reader, output: &mut Writer, existing_index: Option, @@ -628,49 +591,47 @@ impl Indexer { }) }) } -} -// --- - -pub trait ReadSeek: Read + Seek {} - -impl ReadSeek for T {} + fn open_source( + &self, + source_path: &Path, + ) -> io::Result<(PathBuf, Box + Send + Sync>)> { + let source_path = self.fs.canonicalize(source_path)?; + let result = self.fs.open(&source_path)?; + Ok((source_path, result)) + } +} // --- #[cfg_attr(test, automock)] -pub trait FileSystem { - fn canonicalize(&self, path: &PathBuf) -> io::Result; - fn metadata(&self, path: &PathBuf) -> io::Result; - fn exists(&self, path: &PathBuf) -> io::Result; - fn open(&self, path: &PathBuf) -> io::Result>; - fn create(&self, path: &PathBuf) -> io::Result>; +pub trait SourceMetadata { + fn len(&self) -> u64; + fn modified(&self) -> io::Result; } -// --- - -#[derive(Default)] -pub struct RealFileSystem; - -impl FileSystem for RealFileSystem { - fn canonicalize(&self, path: &PathBuf) -> io::Result { - fs::canonicalize(path) - } - - fn metadata(&self, path: &PathBuf) -> io::Result { - fs::metadata(path) +impl SourceMetadata for fs::Metadata { + #[inline] + fn len(&self) -> u64 { + self.len() } - fn exists(&self, path: &PathBuf) -> io::Result { - fs::exists(path) + #[inline] + fn modified(&self) -> io::Result { + self.modified() } +} - fn open(&self, path: &PathBuf) -> io::Result> { - Ok(Box::new(File::open(path)?)) +#[cfg(test)] +impl SourceMetadata for crate::vfs::mem::Metadata { + #[inline] + fn len(&self) -> u64 { + self.len as u64 } - fn create(&self, path: &PathBuf) -> io::Result> { - Ok(Box::new(File::create(path)?)) + #[inline] + fn modified(&self) -> io::Result { + Ok(self.modified) } } @@ -1082,19 +1043,43 @@ impl Header { // --- +#[derive(Clone, Copy, PartialEq, Eq, Debug)] struct Metadata { len: u64, modified: (i64, u32), } +impl Metadata { + pub fn from(source: &M) -> io::Result { + Ok(Self { + len: source.len(), + modified: ts(source.modified()?), + }) + } +} + impl TryFrom<&fs::Metadata> for Metadata { type Error = io::Error; fn try_from(value: &fs::Metadata) -> io::Result { - Ok(Self { - len: value.len(), - modified: ts(value.modified()?), - }) + Self::from(value) + } +} + +impl TryFrom for Metadata { + type Error = io::Error; + + fn try_from(value: fs::Metadata) -> io::Result { + Self::from(&value) + } +} + +#[cfg(test)] +impl TryFrom<&MockSourceMetadata> for Metadata { + type Error = io::Error; + + fn try_from(value: &MockSourceMetadata) -> io::Result { + Self::from(value) } } @@ -1189,6 +1174,8 @@ const CURRENT_VERSION: u64 = 1; mod tests { use super::*; + use crate::vfs::{self, MockFileSystem}; + #[test] fn test_process_file_success() { use io::Cursor; @@ -1196,7 +1183,7 @@ mod tests { 1, PathBuf::from("/tmp/cache"), IndexerSettings { - fs: MockFileSystem::new(), + fs: MockFileSystem::::new(), buffer_size: nonzero!(1024u32).into(), max_message_size: nonzero!(1024u32).into(), ..Default::default() @@ -1212,7 +1199,7 @@ mod tests { let index = indexer .process_file( &PathBuf::from("/tmp/test.log"), - Metadata { + &Metadata { len: data.len() as u64, modified: (1714739340, 0), }, @@ -1251,7 +1238,7 @@ mod tests { #[test] fn test_process_file_error() { use io::Cursor; - let fs = MockFileSystem::new(); + let fs = MockFileSystem::::new(); let indexer = Indexer::new( 1, PathBuf::from("/tmp/cache"), @@ -1266,7 +1253,7 @@ mod tests { let mut output = Cursor::new(Vec::new()); let result = indexer.process_file( &PathBuf::from("/tmp/test.log"), - Metadata { + &Metadata { len: 135, modified: (1714739340, 0), }, @@ -1278,6 +1265,37 @@ mod tests { assert_eq!(output.into_inner().len(), 0); } + #[test] + fn test_indexer() { + let fs = vfs::mem::FileSystem::new(); + + let data = br#"ts=2024-01-02T03:04:05Z msg="some test message""#; + let mut file = fs.create(&PathBuf::from("test.log")).unwrap(); + file.write_all(data).unwrap(); + + let indexer = Indexer::new( + 1, + PathBuf::from("/tmp/cache"), + IndexerSettings { + fs, + ..Default::default() + }, + ); + + let index1 = indexer.index(&PathBuf::from("test.log")).unwrap(); + + assert_eq!(index1.source.size, 47); + assert_eq!(index1.source.path, "/tmp/test.log"); + assert_eq!(index1.source.stat.lines_valid, 1); + assert_eq!(index1.source.stat.lines_invalid, 0); + assert_eq!(index1.source.stat.flags, schema::FLAG_HAS_TIMESTAMPS); + assert_eq!(index1.source.blocks.len(), 1); + + let index2 = indexer.index(&PathBuf::from("test.log")).unwrap(); + assert_eq!(index2.source.size, index1.source.size); + assert_eq!(index2.source.modified, index1.source.modified); + } + // --- struct FailingReader; diff --git a/src/input.rs b/src/input.rs index 46f8b86c..d63de4fb 100644 --- a/src/input.rs +++ b/src/input.rs @@ -2,11 +2,11 @@ use std::{ cmp::min, convert::TryInto, - fs::{File, Metadata}, - io::{self, stdin, BufRead, BufReader, Cursor, Read, Seek, SeekFrom}, + fs::{self, File, Metadata}, + io::{self, stdin, BufRead, BufReader, Cursor, Read, Seek, SeekFrom, Write}, mem::size_of_val, ops::{Deref, Range}, - path::PathBuf, + path::{Path, PathBuf}, sync::{Arc, Mutex}, }; @@ -15,12 +15,15 @@ use deko::{bufread::AnyDecoder, Format}; use nu_ansi_term::Color; // local imports -use crate::error::Result; -use crate::index::{Index, Indexer, SourceBlock}; -use crate::iox::ReadFill; -use crate::pool::SQPool; -use crate::replay::{ReplayBufCreator, ReplayBufReader, ReplaySeekReader}; -use crate::tee::TeeReader; +use crate::{ + error::{Result, HILITE}, + index::{Index, Indexer, SourceBlock, SourceMetadata}, + iox::ReadFill, + pool::SQPool, + replay::{ReplayBufCreator, ReplayBufReader, ReplaySeekReader}, + tee::TeeReader, + vfs::{FileSystem, LocalFileSystem}, +}; // --- @@ -30,40 +33,95 @@ pub type BufPool = SQPool>; // --- +/// The path to an input file. +#[derive(Clone, PartialEq, Eq, Debug)] +pub struct InputPath { + pub original: PathBuf, + pub canonical: PathBuf, +} + +impl InputPath { + /// Creates a new input path. + pub fn new(original: PathBuf, canonical: PathBuf) -> Self { + Self { original, canonical } + } + + /// Resolves the canonical path for the given path. + pub fn resolve(original: PathBuf) -> io::Result { + Self::resolve_with_fs(original, LocalFileSystem) + } + + /// Resolves the canonical path for the given path using the specified file system. + pub fn resolve_with_fs(original: PathBuf, fs: FS) -> io::Result { + let canonical = fs.canonicalize(&original).map_err(|e| { + io::Error::new( + e.kind(), + format!( + "failed to resolve path for '{}': {}", + HILITE.paint(original.to_string_lossy()), + e + ), + ) + })?; + + Ok(Self { original, canonical }) + } + + /// Creates an ephemeral path. + pub fn ephemeral(original: PathBuf) -> Self { + Self { + original: original.clone(), + canonical: original, + } + } +} + +impl TryFrom for InputPath { + type Error = io::Error; + + fn try_from(original: PathBuf) -> io::Result { + Self::resolve(original) + } +} + +// --- + /// A reference to an input file or stdin. #[derive(Clone, PartialEq, Eq, Debug)] pub enum InputReference { Stdin, - File(PathBuf), + File(InputPath), } impl InputReference { /// Preliminarily opens the input file to ensure it exists and is readable /// and protect it from being suddenly deleted while we need it. pub fn hold(&self) -> io::Result { - Ok(InputHolder::new( - self.clone(), - match self { - InputReference::Stdin => None, - InputReference::File(path) => { - let meta = std::fs::metadata(path).map_err(|e| { - io::Error::new( - e.kind(), - format!("failed to get information on {}: {}", self.description(), e), - ) - })?; - if meta.is_dir() { - return Err(io::Error::new( - io::ErrorKind::InvalidInput, - format!("{} is a directory", self.description()), - )); - } - Some(Box::new(File::open(path).map_err(|e| { - io::Error::new(e.kind(), format!("failed to open {}: {}", self.description(), e)) - })?)) + let (reference, stream): (_, Option>) = match self { + Self::Stdin => (self.clone(), None), + Self::File(path) => { + let meta = fs::metadata(&path.canonical).map_err(|e| { + io::Error::new( + e.kind(), + format!("failed to get information on {}: {}", self.description(), e), + ) + })?; + if meta.is_dir() { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!("{} is a directory", self.description()), + )); } - }, - )) + let stream = + Box::new(File::open(&path.canonical).map_err(|e| { + io::Error::new(e.kind(), format!("failed to open {}: {}", self.description(), e)) + })?); + + (InputReference::File(path.clone()), Some(stream)) + } + }; + + Ok(InputHolder::new(reference, stream)) } /// Completely opens the input for reading. @@ -76,7 +134,7 @@ impl InputReference { pub fn description(&self) -> String { match self { Self::Stdin => "".into(), - Self::File(filename) => format!("file '{}'", Color::Yellow.paint(filename.to_string_lossy())), + Self::File(path) => format!("file '{}'", Color::Yellow.paint(path.original.to_string_lossy())), } } @@ -84,7 +142,7 @@ impl InputReference { fn path(&self) -> Option<&PathBuf> { match self { Self::Stdin => None, - Self::File(path) => Some(path), + Self::File(path) => Some(&path.canonical), } } } @@ -110,14 +168,14 @@ impl Meta for &mut T { } } -impl Meta for std::fs::File { +impl Meta for fs::File { #[inline] fn metadata(&self) -> io::Result> { self.metadata().map(Some) } } -impl Meta for std::io::Stdin { +impl Meta for io::Stdin { #[inline] fn metadata(&self) -> io::Result> { Ok(None) @@ -160,7 +218,11 @@ impl InputHolder { } /// Indexes the input file and returns IndexedInput that can be used to access the data in random order. - pub fn index(self, indexer: &Indexer) -> Result { + pub fn index(self, indexer: &Indexer) -> Result + where + FS: FileSystem + Sync, + FS::Metadata: SourceMetadata, + { self.open()?.indexed(indexer) } @@ -199,14 +261,18 @@ impl Input { } /// Indexes the input file and returns IndexedInput that can be used to access the data in random order. - pub fn indexed(self, indexer: &Indexer) -> Result { + pub fn indexed(self, indexer: &Indexer) -> Result + where + FS: FileSystem + Sync, + FS::Metadata: SourceMetadata, + { IndexedInput::from_stream(self.reference, self.stream, indexer) } /// Opens the file for reading. /// This includes decoding compressed files if needed. pub fn open(path: &PathBuf) -> io::Result { - InputReference::File(path.clone()).open() + InputReference::File(path.clone().try_into()?).open() } /// Opens the stdin for reading. @@ -419,8 +485,14 @@ impl IndexedInput { } /// Opens the input file and indexes it. - pub fn open(path: &PathBuf, indexer: &Indexer) -> Result { - InputReference::File(path.clone()).hold()?.index(indexer) + pub fn open(path: &Path, indexer: &Indexer) -> Result + where + FS: FileSystem + Sync, + FS::Metadata: SourceMetadata, + { + InputReference::File(PathBuf::from(path).try_into()?) + .hold()? + .index(indexer) } /// Converts the input to blocks. @@ -429,45 +501,81 @@ impl IndexedInput { Blocks::new(Arc::new(self), (0..n).into_iter()) } - #[inline] - fn from_stream(reference: InputReference, stream: Stream, indexer: &Indexer) -> Result { - match stream { - Stream::Sequential(stream) => Self::from_sequential_stream(reference, stream, indexer), - Stream::RandomAccess(stream) => Self::from_random_access_stream(reference, stream, indexer), + fn from_stream(reference: InputReference, stream: Stream, indexer: &Indexer) -> Result + where + FS: FileSystem + Sync, + FS::Metadata: SourceMetadata, + { + let (stream, index) = Self::index_stream(&reference, stream, indexer)?; + Ok(Self::new(reference, stream, index)) + } + + fn index_stream( + reference: &InputReference, + stream: Stream, + indexer: &Indexer, + ) -> Result<(RandomAccessStream, Index)> + where + FS: FileSystem + Sync, + FS::Metadata: SourceMetadata, + { + log::info!("indexing {}", reference.description()); + + if let (Some(path), Some(meta)) = (reference.path(), stream.metadata()?) { + match stream { + Stream::Sequential(stream) => Self::index_sequential_stream(path, &meta, stream, indexer), + Stream::RandomAccess(stream) => Self::index_random_access_stream(path, &meta, stream, indexer), + } + } else { + let mut tee = TeeReader::new(stream, ReplayBufCreator::new()); + let index = indexer.index_in_memory(&mut tee)?; + let buf = tee.into_writer().result()?; + let stream = Box::new(ReplayBufReader::new(buf).with_metadata(None)); + + Ok((stream, index)) } } - fn from_random_access_stream( - reference: InputReference, + fn index_random_access_stream( + path: &Path, + meta: &Metadata, mut stream: RandomAccessStream, - indexer: &Indexer, - ) -> Result { + indexer: &Indexer, + ) -> Result<(RandomAccessStream, Index)> + where + FS: FileSystem + Sync, + FS::Metadata: SourceMetadata, + { let pos = stream.seek(SeekFrom::Current(0))?; - let meta = stream.metadata()?; - let index = indexer.index_stream( - &mut stream, - match &reference { - InputReference::File(path) => Some(path), - _ => None, - }, - meta, - )?; + let index = indexer.index_stream(&mut stream, path, meta)?; + stream.seek(SeekFrom::Start(pos))?; - Ok(Self::new(reference, stream, index)) + + Ok((stream, index)) } - fn from_sequential_stream(reference: InputReference, stream: SequentialStream, indexer: &Indexer) -> Result { - log::info!("indexing {}", reference.description()); - let meta = stream.metadata()?; + fn index_sequential_stream( + path: &PathBuf, + meta: &Metadata, + stream: SequentialStream, + indexer: &Indexer, + ) -> Result<(RandomAccessStream, Index)> + where + FS: FileSystem + Sync, + FS::Metadata: SourceMetadata, + { let mut tee = TeeReader::new(stream, ReplayBufCreator::new()); - let index = indexer.index_stream(&mut tee, reference.path(), meta.clone())?; + let index = indexer.index_stream(&mut tee, path, meta)?; + let meta = meta.clone(); + let stream: RandomAccessStream = if tee.processed() == 0 { - Box::new(ReplaySeekReader::new(tee.into_reader()).with_metadata(meta)) + Box::new(ReplaySeekReader::new(tee.into_reader()).with_metadata(Some(meta))) } else { let buf = tee.into_writer().result()?; - Box::new(ReplayBufReader::new(buf).with_metadata(meta)) + Box::new(ReplayBufReader::new(buf).with_metadata(Some(meta))) }; - Ok(Self::new(reference, stream, index)) + + Ok((stream, index)) } } @@ -803,24 +911,40 @@ struct WithMetadata { } impl WithMetadata { + #[inline] fn new(inner: T, meta: Option) -> Self { Self { inner, meta } } } impl Read for WithMetadata { + #[inline] fn read(&mut self, buf: &mut [u8]) -> io::Result { self.inner.read(buf) } } +impl Write for WithMetadata { + #[inline] + fn write(&mut self, buf: &[u8]) -> io::Result { + self.inner.write(buf) + } + + #[inline] + fn flush(&mut self) -> io::Result<()> { + self.inner.flush() + } +} + impl Seek for WithMetadata { + #[inline] fn seek(&mut self, pos: SeekFrom) -> io::Result { self.inner.seek(pos) } } impl Meta for WithMetadata { + #[inline] fn metadata(&self) -> io::Result> { Ok(self.meta.clone()) } @@ -830,13 +954,16 @@ impl Meta for WithMetadata { #[cfg(test)] mod tests { - use itertools::Itertools; + use super::*; + use io::Read; - use crate::index::IndexerSettings; + use itertools::Itertools; + use nonzero_ext::nonzero; - use super::*; - use std::io::ErrorKind; - use std::io::Read; + use crate::{ + index::IndexerSettings, + vfs::{self, LocalFileSystem}, + }; #[test] fn test_input_reference() { @@ -845,14 +972,14 @@ mod tests { assert_eq!(reference.path(), None); let input = reference.open().unwrap(); assert_eq!(input.reference, reference); - let reference = InputReference::File(PathBuf::from("test.log")); + let reference = InputReference::File(InputPath::ephemeral(PathBuf::from("test.log"))); assert_eq!(reference.description(), "file '\u{1b}[33mtest.log\u{1b}[0m'"); assert_eq!(reference.path(), Some(&PathBuf::from("test.log"))); } #[test] fn test_input_holder() { - let reference = InputReference::File(PathBuf::from("sample/test.log")); + let reference = InputReference::File(InputPath::ephemeral(PathBuf::from("sample/test.log"))); let holder = InputHolder::new(reference, None); let mut stream = holder.stream().unwrap(); let mut buf = Vec::new(); @@ -861,11 +988,8 @@ mod tests { let stream = stream.as_sequential(); let meta = stream.metadata().unwrap(); assert_eq!(meta.is_some(), true); - assert_eq!(n, 70); - assert_eq!( - buf, - br#"{"ts":"2024-10-01T01:02:03Z","level":"info","msg":"some test message"}"# - ); + assert_eq!(n, 147); + assert_eq!(buf.len(), n); } #[test] @@ -888,7 +1012,8 @@ mod tests { for &(filename, requested, expected) in &[ ("sample/test.log", 1, 1), - ("sample/test.log", 2, 1), + ("sample/test.log", 2, 2), + ("sample/test.log", 3, 2), ("sample/prometheus.log", 2, 2), ] { let input = Input::open(&PathBuf::from(filename)).unwrap().tail(requested).unwrap(); @@ -944,39 +1069,39 @@ mod tests { let result = stream.into_sequential().read(&mut buf); assert!(result.is_err()); let err = result.unwrap_err(); - assert_eq!(err.kind(), ErrorKind::Other); + assert_eq!(err.kind(), io::ErrorKind::Other); } #[test] fn test_input_read_error() { - let reference = InputReference::File(PathBuf::from("test.log")); + let reference = InputReference::File(InputPath::ephemeral(PathBuf::from("test.log"))); let input = Input::new(reference, Stream::Sequential(Box::new(FailingReader))); let mut buf = [0; 128]; let result = input.stream.into_sequential().read(&mut buf); assert!(result.is_err()); let err = result.unwrap_err(); - assert_eq!(err.kind(), ErrorKind::Other); + assert_eq!(err.kind(), io::ErrorKind::Other); assert_eq!(err.to_string().contains("test.log"), true); } #[test] fn test_input_hold_error_is_dir() { - let reference = InputReference::File(PathBuf::from(".")); + let reference = InputReference::File(InputPath::ephemeral(PathBuf::from("."))); let result = reference.hold(); assert!(result.is_err()); let err = result.err().unwrap(); - assert_eq!(err.kind(), ErrorKind::InvalidInput); + assert_eq!(err.kind(), io::ErrorKind::InvalidInput); assert_eq!(err.to_string().contains("is a directory"), true); } #[test] fn test_input_hold_error_not_found() { let filename = "AKBNIJGHERHBNMCKJABHSDJ"; - let reference = InputReference::File(PathBuf::from(filename)); + let reference = InputReference::File(InputPath::ephemeral(PathBuf::from(filename))); let result = reference.hold(); assert!(result.is_err()); let err = result.err().unwrap(); - assert_eq!(err.kind(), ErrorKind::NotFound); + assert_eq!(err.kind(), io::ErrorKind::NotFound); assert_eq!(err.to_string().contains(filename), true); } @@ -996,10 +1121,10 @@ mod tests { } #[test] - fn test_indexed_input() { + fn test_indexed_input_stdin() { let data = br#"{"ts":"2024-10-01T01:02:03Z","level":"info","msg":"some test message"}\n"#; let stream = Stream::RandomAccess(Box::new(Cursor::new(data))); - let indexer = Indexer::new(1, PathBuf::new(), IndexerSettings::default()); + let indexer = Indexer::::new(1, PathBuf::new(), IndexerSettings::default()); let input = IndexedInput::from_stream(InputReference::Stdin, stream, &indexer).unwrap(); let mut blocks = input.into_blocks().collect_vec(); assert_eq!(blocks.len(), 1); @@ -1010,29 +1135,93 @@ mod tests { assert_eq!(line.bytes(), data); } + #[test] + fn test_indexed_input_file_random_access() { + let fs = Arc::new(vfs::mem::FileSystem::new()); + + for _ in 0..2 { + let path = PathBuf::from("sample/test.log"); + let indexer = Indexer::new( + 1, + PathBuf::from("."), + IndexerSettings { + fs: fs.clone(), + buffer_size: nonzero!(64u32).into(), + ..Default::default() + }, + ); + let input = IndexedInput::open(&path, &indexer).unwrap(); + let mut blocks = input.into_blocks().sorted().collect_vec(); + assert_eq!(blocks.len(), 2); + assert_eq!(blocks[0].lines_valid(), 1); + assert_eq!(blocks[0].size(), 74); + assert_eq!(blocks[1].lines_valid(), 1); + assert_eq!(blocks[1].size(), 73); + let lines = blocks.pop().unwrap().into_lines().unwrap().collect_vec(); + assert_eq!(lines.len(), 1); + assert_eq!(lines[0].len(), 73); + let lines = blocks.pop().unwrap().into_lines().unwrap().collect_vec(); + assert_eq!(lines.len(), 1); + assert_eq!(lines[0].len(), 74); + } + } + + #[test] + fn test_indexed_input_sequential_access() { + let fs = Arc::new(vfs::mem::FileSystem::new()); + + for _ in 0..2 { + let path = PathBuf::from("sample/test.log"); + let indexer = Indexer::new( + 1, + PathBuf::from("/tmp/cache"), + IndexerSettings { + fs: fs.clone(), + buffer_size: nonzero!(64u32).into(), + ..Default::default() + }, + ); + let reference = InputReference::File(InputPath::resolve_with_fs(path.clone(), &fs).unwrap()); + let stream = Stream::Sequential(Box::new(File::open(&path).unwrap())); + let input = IndexedInput::from_stream(reference, stream, &indexer).unwrap(); + let mut blocks = input.into_blocks().sorted().collect_vec(); + assert_eq!(blocks.len(), 2); + assert_eq!(blocks[0].lines_valid(), 1); + assert_eq!(blocks[0].size(), 74); + assert_eq!(blocks[1].lines_valid(), 1); + assert_eq!(blocks[1].size(), 73); + let lines = blocks.pop().unwrap().into_lines().unwrap().collect_vec(); + assert_eq!(lines.len(), 1); + assert_eq!(lines[0].len(), 73); + let lines = blocks.pop().unwrap().into_lines().unwrap().collect_vec(); + assert_eq!(lines.len(), 1); + assert_eq!(lines[0].len(), 74); + } + } + // --- struct FailingReader; impl Read for FailingReader { - fn read(&mut self, _buf: &mut [u8]) -> std::io::Result { - Err(std::io::Error::new(std::io::ErrorKind::Other, "read error")) + fn read(&mut self, _buf: &mut [u8]) -> io::Result { + Err(io::Error::new(io::ErrorKind::Other, "read error")) } } impl Seek for FailingReader { - fn seek(&mut self, from: SeekFrom) -> std::io::Result { + fn seek(&mut self, from: SeekFrom) -> io::Result { match from { SeekFrom::Start(0) => Ok(0), SeekFrom::Current(0) => Ok(0), SeekFrom::End(0) => Ok(0), - _ => Err(std::io::Error::new(std::io::ErrorKind::Other, "seek error")), + _ => Err(io::Error::new(io::ErrorKind::Other, "seek error")), } } } impl Meta for FailingReader { - fn metadata(&self) -> std::io::Result> { + fn metadata(&self) -> std::io::Result> { Ok(None) } } @@ -1042,19 +1231,19 @@ mod tests { struct UnseekableReader(R); impl Read for UnseekableReader { - fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + fn read(&mut self, buf: &mut [u8]) -> io::Result { self.0.read(buf) } } impl Seek for UnseekableReader { - fn seek(&mut self, _: SeekFrom) -> std::io::Result { - Err(std::io::Error::new(std::io::ErrorKind::Other, "seek error")) + fn seek(&mut self, _: SeekFrom) -> io::Result { + Err(io::Error::new(io::ErrorKind::Other, "seek error")) } } impl Meta for UnseekableReader { - fn metadata(&self) -> std::io::Result> { + fn metadata(&self) -> io::Result> { Ok(None) } } diff --git a/src/lib.rs b/src/lib.rs index 0eb2afcd..7b287f48 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -33,6 +33,7 @@ mod replay; mod scanning; mod serdex; mod tee; +mod vfs; // conditional public modules #[cfg_attr(unix, path = "signal_unix.rs")] diff --git a/src/main.rs b/src/main.rs index 938a2897..008edaa4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -245,18 +245,18 @@ fn run() -> Result<()> { flatten: opt.flatten != cli::FlattenOption::Never, }); - // Configure input. + // Configure the input. let mut inputs = opt .files .iter() .map(|x| { if x.to_str() == Some("-") { - InputReference::Stdin + Ok::<_, std::io::Error>(InputReference::Stdin) } else { - InputReference::File(x.clone()) + Ok(InputReference::File(x.clone().try_into()?)) } }) - .collect::>(); + .collect::, _>>()?; if inputs.len() == 0 { if stdin().is_terminal() { let mut cmd = cli::Opt::command(); diff --git a/src/replay.rs b/src/replay.rs index e3f7bdbf..a043a5a2 100644 --- a/src/replay.rs +++ b/src/replay.rs @@ -26,6 +26,7 @@ type Buf = Vec; // --- +/// A trait that allows caching data for a given key. pub trait Cache { type Key: Copy + Clone + Eq + PartialEq + Ord + PartialOrd + Hash; @@ -42,6 +43,7 @@ impl Cache for &mut C { // --- +/// A buffer that holds compressed segments of continuous data. pub struct ReplayBuf { segment_size: NonZeroUsize, segments: Vec, @@ -82,22 +84,26 @@ impl ReplayBufRead for ReplayBuf { // --- +/// A creator for ReplayBuf. pub struct ReplayBufCreator { buf: ReplayBuf, scratch: ReusableBuf, } impl ReplayBufCreator { + /// Create a new ReplayBufCreator with the default options. pub fn new() -> Self { Self::build().done() } + /// Create a builder for ReplayBufCreator with the default options. pub fn build() -> ReplayBufCreatorBuilder { ReplayBufCreatorBuilder { segment_size: DEFAULT_SEGMENT_SIZE.unwrap(), } } + /// Returns the created ReplayBuf. pub fn result(mut self) -> Result { self.flush()?; Ok(self.buf) @@ -145,17 +151,20 @@ impl Write for ReplayBufCreator { // --- +/// A builder for ReplayBufCreator. pub struct ReplayBufCreatorBuilder { segment_size: NonZeroUsize, } impl ReplayBufCreatorBuilder { + /// Set the segment size for the ReplayBuf. #[allow(dead_code)] pub fn segment_size(mut self, segment_size: NonZeroUsize) -> Self { self.segment_size = segment_size; self } + /// Create a new ReplayBufCreator with the current configuration. pub fn done(self) -> ReplayBufCreator { ReplayBufCreator { buf: ReplayBuf::new(self.segment_size), @@ -166,6 +175,7 @@ impl ReplayBufCreatorBuilder { // --- +/// A trait that allows reading from a ReplayBuf. pub trait ReplayBufRead { fn segment_size(&self) -> NonZeroUsize; fn size(&self) -> usize; @@ -188,6 +198,7 @@ impl ReplayBufRead for &B { // --- +/// A Read implementation that reads from a ReplayBuf. pub struct ReplayBufReader { buf: B, cache: C, @@ -195,10 +206,12 @@ pub struct ReplayBufReader { } impl ReplayBufReader> { + /// Create a new ReplayBufReader with a default cache. pub fn new(buf: B) -> Self { Self::build(buf).done() } + /// Create a builder for ReplayBufReader with a default cache. pub fn build(buf: B) -> ReplayBufReaderBuilder> { ReplayBufReaderBuilder { buf, @@ -281,6 +294,7 @@ impl> Seek for ReplayBufReader { // --- +/// A builder for ReplayBufReader. pub struct ReplayBufReaderBuilder { buf: B, cache: C, @@ -288,6 +302,7 @@ pub struct ReplayBufReaderBuilder { } impl ReplayBufReaderBuilder { + /// Set the cache to use for uncompressed segments read from ReplayBuf. #[allow(dead_code)] pub fn cache(self, cache: C2) -> ReplayBufReaderBuilder { ReplayBufReaderBuilder { @@ -297,11 +312,13 @@ impl ReplayBufReaderBuilder { } } + /// Set the position to start reading from. #[allow(dead_code)] pub fn position(self, position: usize) -> Self { Self { position, ..self } } + /// Create a new ReplayBufReader with the current configuration. pub fn done(self) -> ReplayBufReader { ReplayBufReader { buf: self.buf, @@ -313,6 +330,9 @@ impl ReplayBufReaderBuilder { // --- +/// A Read + Seek implementation that reads from a Read source and caches the +/// data in a ReplayBuf. It supports seeking to any position and reading from there. It is useful +/// for reading from a source that does not support seeking, but where seeking is required. pub struct ReplaySeekReader { r: R, w: ReplayBufCreator, @@ -322,10 +342,12 @@ pub struct ReplaySeekReader { } impl ReplaySeekReader> { + /// Create a new ReplaySeekReader with a default LRU cache. pub fn new(r: R) -> Self { Self::build(r).done() } + /// Create a builder for ReplaySeekReader with a default LRU cache. pub fn build(r: R) -> ReplaySeekReaderBuilder> { ReplaySeekReaderBuilder { r, @@ -428,6 +450,7 @@ impl> Seek for ReplaySeekReader { // --- +/// A builder for ReplaySeekReader. pub struct ReplaySeekReaderBuilder { r: R, w: ReplayBufCreatorBuilder, @@ -435,6 +458,7 @@ pub struct ReplaySeekReaderBuilder { } impl ReplaySeekReaderBuilder { + /// Set the cache to use for uncompressed segments read from ReplayBuf. #[allow(dead_code)] pub fn cache(self, cache: C2) -> ReplaySeekReaderBuilder { ReplaySeekReaderBuilder { @@ -444,12 +468,14 @@ impl ReplaySeekReaderBuilder { } } + /// Set the segment size for the ReplayBuf. #[allow(dead_code)] pub fn segment_size(mut self, segment_size: NonZeroUsize) -> Self { self.w.segment_size = segment_size; self } + /// Create a new ReplaySeekReader with the current configuration. pub fn done(self) -> ReplaySeekReader { ReplaySeekReader { r: self.r, @@ -463,16 +489,19 @@ impl ReplaySeekReaderBuilder { // --- +/// A buffer that can be compressed and decompressed in segments. #[derive(Default)] pub struct CompressedBuf(Vec); impl CompressedBuf { + /// Create a new CompressedBuf from the given data. pub fn new(data: &[u8]) -> Result { let mut encoded = Vec::new(); FrameEncoder::new(&mut encoded).write_all(data)?; Ok(Self(encoded)) } + /// Decode the compressed data into the given buffer. pub fn decode(&self, buf: &mut [u8]) -> Result { FrameDecoder::new(&self.0[..]).read_fill(buf) } @@ -547,6 +576,7 @@ impl ReusableBuf { // --- +/// Minimal cache implementation that caches a single value. pub struct MinimalCache { data: Option<(Key, Buf)>, } @@ -576,6 +606,7 @@ impl Default for MinimalCache { // --- +/// LRU cache implementation that caches up to a given number of values. pub struct LruCache { limit: usize, data: BTreeMap<(Instant, Key), Buf>, @@ -584,6 +615,7 @@ pub struct LruCache { #[allow(dead_code)] impl LruCache { + /// Create a new LruCache with the given limit. pub fn new(limit: usize) -> Self { Self { limit, diff --git a/src/vfs.rs b/src/vfs.rs new file mode 100644 index 00000000..22a11898 --- /dev/null +++ b/src/vfs.rs @@ -0,0 +1,437 @@ +// stdlib imports +use std::{ + fs, + io::{self, Read, Seek, Write}, + path::{Path, PathBuf}, +}; + +// third-party imports +#[cfg(test)] +use mockall::mock; + +// --- + +pub trait FileSystem { + type Metadata: 'static; + + fn canonicalize(&self, path: &Path) -> io::Result; + fn metadata(&self, path: &Path) -> io::Result; + fn exists(&self, path: &Path) -> io::Result; + fn open(&self, path: &Path) -> io::Result + Send + Sync>>; + fn create(&self, path: &Path) -> io::Result + Send + Sync>>; +} + +#[cfg(test)] +mock! { + pub FileSystem {} + + impl FileSystem for FileSystem { + type Metadata = M; + + fn canonicalize(&self, path: &Path) -> io::Result; + fn metadata(&self, path: &Path) -> io::Result; + fn exists(&self, path: &Path) -> io::Result; + fn open(&self, path: &Path) -> io::Result + Send + Sync>>; + fn create(&self, path: &Path) -> io::Result + Send + Sync>>; + } +} + +macro_rules! delegate_fs_methods { + () => { + #[inline] + fn canonicalize(&self, path: &Path) -> io::Result { + (**self).canonicalize(path) + } + + #[inline] + fn metadata(&self, path: &Path) -> io::Result { + (**self).metadata(path) + } + + #[inline] + fn exists(&self, path: &Path) -> io::Result { + (**self).exists(path) + } + + #[inline] + fn open(&self, path: &Path) -> io::Result + Send + Sync>> { + (**self).open(path) + } + + #[inline] + fn create(&self, path: &Path) -> io::Result + Send + Sync>> { + (**self).create(path) + } + }; +} + +impl<'a, T> FileSystem for &'a T +where + T: FileSystem, +{ + type Metadata = T::Metadata; + delegate_fs_methods!(); +} + +impl FileSystem for std::sync::Arc +where + T: FileSystem, +{ + type Metadata = T::Metadata; + delegate_fs_methods!(); +} + +// --- + +pub trait Meta { + type Metadata; + + fn metadata(&self) -> io::Result; +} + +impl Meta for fs::File { + type Metadata = fs::Metadata; + + #[inline] + fn metadata(&self) -> io::Result { + self.metadata() + } +} + +// --- + +pub trait FileRead: Read + Seek + Meta {} + +impl FileRead for T {} + +#[cfg(test)] +mock! { + pub FileRead {} + + impl Read for FileRead { + fn read(&mut self, buf: &mut [u8]) -> io::Result; + } + + impl Seek for FileRead { + fn seek(&mut self, pos: io::SeekFrom) -> io::Result; + } + + impl Meta for FileRead { + type Metadata = M; + + fn metadata(&self) -> io::Result; + } +} + +// --- + +pub trait FileReadWrite: FileRead + Write {} + +impl FileReadWrite for T {} + +#[cfg(test)] +mock! { + pub FileReadWrite {} + + impl Read for FileReadWrite { + fn read(&mut self, buf: &mut [u8]) -> io::Result; + } + + impl Seek for FileReadWrite { + fn seek(&mut self, pos: io::SeekFrom) -> io::Result; + } + + impl Meta for FileReadWrite { + type Metadata = M; + + fn metadata(&self) -> io::Result; + } + + impl Write for FileReadWrite { + fn write(&mut self, buf: &[u8]) -> io::Result; + fn flush(&mut self) -> io::Result<()>; + } +} + +// --- + +#[derive(Default)] +pub struct LocalFileSystem; + +impl FileSystem for LocalFileSystem { + type Metadata = fs::Metadata; + + #[inline] + fn canonicalize(&self, path: &Path) -> io::Result { + fs::canonicalize(path) + } + + #[inline] + fn metadata(&self, path: &Path) -> io::Result { + fs::metadata(path) + } + + #[inline] + fn exists(&self, path: &Path) -> io::Result { + fs::exists(path) + } + + #[inline] + fn open(&self, path: &Path) -> io::Result + Send + Sync>> { + Ok(Box::new(fs::File::open(path)?)) + } + + #[inline] + fn create(&self, path: &Path) -> io::Result + Send + Sync>> { + Ok(Box::new(fs::File::create(path)?)) + } +} + +// --- + +#[cfg(test)] +pub mod mem { + use super::{FileRead, Meta}; + + use std::{ + collections::HashMap, + io::{self, Read, Seek, Write}, + path::{Path, PathBuf}, + sync::{Arc, RwLock}, + time::SystemTime, + }; + + use clean_path::Clean; + + // --- + + #[derive(Copy, Clone, Debug)] + #[allow(dead_code)] + pub struct Metadata { + pub len: usize, + pub created: SystemTime, + pub modified: SystemTime, + } + + #[derive(Copy, Clone)] + struct InternalMetadata { + pub created: SystemTime, + pub modified: SystemTime, + } + + impl InternalMetadata { + #[inline] + fn new() -> Self { + let now = SystemTime::now(); + + Self { + created: now, + modified: now, + } + } + } + + impl From<(usize, InternalMetadata)> for Metadata { + #[inline] + fn from(meta: (usize, InternalMetadata)) -> Self { + Self { + len: meta.0, + created: meta.1.created, + modified: meta.1.modified, + } + } + } + + struct File { + data: Vec, + meta: InternalMetadata, + } + + impl File { + #[inline] + fn new(data: Vec) -> Self { + Self { + data, + meta: InternalMetadata::new(), + } + } + + #[inline] + fn metadata(&self) -> Metadata { + (self.data.len(), self.meta).into() + } + } + + // --- + + struct FileCursor { + file: Arc>, + pos: usize, + } + + impl FileCursor { + #[inline] + fn new(file: Arc>) -> Self { + Self { file, pos: 0 } + } + } + + impl Read for FileCursor { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + let data = &self.file.read().unwrap().data; + let len = buf.len().min(data.len() - self.pos); + buf[..len].copy_from_slice(&data[self.pos..self.pos + len]); + self.pos += len; + Ok(len) + } + } + + impl Write for FileCursor { + fn write(&mut self, buf: &[u8]) -> io::Result { + let file = &mut self.file.write().unwrap(); + let data = &mut file.data; + if self.pos + buf.len() > data.len() { + data.resize(self.pos + buf.len(), 0); + } + data[self.pos..self.pos + buf.len()].copy_from_slice(buf); + file.meta.modified = SystemTime::now(); + self.pos += buf.len(); + Ok(buf.len()) + } + + #[inline] + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } + } + + impl Seek for FileCursor { + fn seek(&mut self, pos: io::SeekFrom) -> io::Result { + let new_pos = match pos { + io::SeekFrom::Start(offset) => offset as usize, + io::SeekFrom::Current(offset) => (self.pos as i64 + offset) as usize, + io::SeekFrom::End(offset) => (self.file.read().unwrap().data.len() as i64 + offset) as usize, + }; + self.pos = new_pos; + Ok(new_pos as u64) + } + } + + impl Meta for FileCursor { + type Metadata = Metadata; + + #[inline] + fn metadata(&self) -> io::Result { + Ok(self.file.read().unwrap().metadata()) + } + } + + // --- + + #[derive(Default)] + pub struct FileSystem { + files: RwLock>>>, + } + + impl FileSystem { + pub fn new() -> Self { + FileSystem { + files: RwLock::new(HashMap::new()), + } + } + } + + impl super::FileSystem for FileSystem { + type Metadata = Metadata; + + fn canonicalize(&self, path: &Path) -> io::Result { + Ok(PathBuf::from("/tmp").join(path).clean()) + } + + fn metadata(&self, path: &Path) -> io::Result { + let path = self.canonicalize(path)?; + let files = self.files.read().unwrap(); + if let Some(file) = files.get(&path) { + Ok(file.read().unwrap().metadata()) + } else { + Err(io::Error::new(io::ErrorKind::NotFound, "file not found")) + } + } + + fn exists(&self, path: &Path) -> io::Result { + let path = self.canonicalize(path)?; + let files = self.files.read().unwrap(); + Ok(files.contains_key(&path)) + } + + fn open(&self, path: &Path) -> io::Result + Send + Sync>> { + let path = self.canonicalize(path)?; + let files = self.files.read().unwrap(); + if let Some(file) = files.get(&path) { + Ok(Box::new(FileCursor::new(file.clone()))) + } else { + Err(io::Error::new(io::ErrorKind::NotFound, "file not found")) + } + } + + fn create( + &self, + path: &Path, + ) -> io::Result + Send + Sync>> { + let path = self.canonicalize(path)?; + let mut files = self.files.write().unwrap(); + if files.contains_key(&path) { + return Err(io::Error::new(io::ErrorKind::AlreadyExists, "file already exists")); + } + let file = Arc::new(RwLock::new(File::new(Vec::new()))); + files.insert(path.clone(), file.clone()); + Ok(Box::new(FileCursor::new(file))) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_filesystem() { + let fs = mem::FileSystem::new(); + let path = Path::new("file.txt"); + + assert_eq!(fs.exists(path).unwrap(), false); + + let mut file = fs.create(path).unwrap(); + file.write_all(b"hello world").unwrap(); + file.flush().unwrap(); + + let res = fs.create(path); + assert!(res.is_err()); + assert_eq!(res.err().map(|e| e.kind()), Some(io::ErrorKind::AlreadyExists)); + + assert_eq!(fs.exists(path).unwrap(), true); + + let meta = fs.metadata(path).unwrap(); + assert_eq!(meta.len, 11); + + let res = fs.metadata(Path::new("nonexistent.txt")); + assert!(res.is_err()); + assert!(matches!(res.unwrap_err().kind(), io::ErrorKind::NotFound)); + + let canonical_path = fs.canonicalize(path).unwrap(); + assert_eq!(canonical_path, PathBuf::from("/tmp/file.txt")); + + let mut file = fs.open(path).unwrap(); + let mut buf = Vec::new(); + file.read_to_end(&mut buf).unwrap(); + + assert_eq!(buf, b"hello world"); + + let meta = file.metadata().unwrap(); + assert_eq!(meta.len, 11); + + let res = fs.open(Path::new("nonexistent.txt")); + assert!(res.is_err()); + assert_eq!(res.err().map(|e| e.kind()), Some(io::ErrorKind::NotFound)); + } +}