Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

new: Improved index rebuilding performance #123

Merged
merged 1 commit into from
Jan 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .build/capnp/index.capnp.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"source": "e9a1ab0d52fc8a798553feddc01dd0d3221ea2551134bba7993598f4171300af",
"target": "fc0896783f60725dfcde8665bbb81f0fa1385d4b7cda23037ea282219a305e9c"
"source": "7cafdecc6197b3fe7978bcfe649598aa26938d097c052f9eefe37676579ddb9f",
"target": "47b895a77c88757bb44bb6061d0aefd0b24636e10eb078a1b3f65acf8e6398e2"
}
12 changes: 11 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ categories = ["command-line-utilities"]
description = "Utility for viewing json-formatted log files."
keywords = ["cli", "human", "log"]
name = "hl"
version = "0.25.1-beta.2"
version = "0.25.1-beta.3"
edition = "2021"
build = "build.rs"

Expand All @@ -17,7 +17,6 @@ serde_json = { version = "1", features = ["raw_value"] }
sha2 = "0"

[dependencies]
nu-ansi-term = "0"
atoi = "1"
bincode = "1"
bitmask = "0"
Expand All @@ -43,6 +42,7 @@ humantime = "2"
itertools = "0"
itoa = { version = "1", default-features = false }
notify = { version = "6", features = ["macos_kqueue"] }
nu-ansi-term = "0"
num_cpus = "1"
once_cell = "1"
pest = "2"
Expand All @@ -60,6 +60,7 @@ snap = "1"
thiserror = "1"
wildmatch = "2"
winapi = {version = "0", features = ["handleapi"]}
wyhash = "0"

[target.'cfg(target_os = "macos")'.dependencies]
kqueue = "1"
Expand Down
14 changes: 14 additions & 0 deletions schema/index.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ struct SourceBlock {
size @1 :UInt32;
index @2 :Index;
chronology @3 :Chronology;
hash @4 :Hash;
}

# Index holds index information of a block or a whole file.
Expand Down Expand Up @@ -65,6 +66,19 @@ struct Chronology {
jumps @3 :List(UInt32);
}

# HashAlgorithm is an algorithm used to calculate data hash.
enum HashAlgorithm {
sha256 @0;
gxHash64 @1;
wyHash @2;
}

# Hash is a hash of some data.
struct Hash {
algorithm @0 :HashAlgorithm;
value @1 :Data;
}

# Various flags.
const flagLevelDebug :UInt64 = 0x0000000000000001;
const flagLevelInfo :UInt64 = 0x0000000000000002;
Expand Down
121 changes: 103 additions & 18 deletions src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ pub type Reader = dyn Read + Send + Sync;

// ---

#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug, Hash)]
pub enum Hash {
Sha256(GenericArray<u8, U32>),
GxHash64(u64),
WyHash(u64),
}

// ---

#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug, Hash)]
pub struct Timestamp {
pub sec: i64,
Expand Down Expand Up @@ -148,6 +157,7 @@ impl Indexer {
let meta = source_path.metadata()?;
let hash = hex::encode(sha256(source_path.to_string_lossy().as_bytes()));
let index_path = self.dir.join(PathBuf::from(hash));
let mut existing_index = None;
if Path::new(&index_path).exists() {
let mut file = match File::open(&index_path) {
Ok(file) => file,
Expand All @@ -162,10 +172,11 @@ impl Indexer {
if meta.len() == index.source().size && ts(meta.modified()?) == index.source().modified {
return Ok(index);
}
existing_index = Some(index)
}
}

self.build_index(&source_path, &index_path)
self.build_index(&source_path, &index_path, existing_index)
}

/// Builds index for the given stream.
Expand All @@ -180,10 +191,11 @@ impl Indexer {
},
input,
&mut std::io::sink(),
None,
)
}

fn build_index(&self, source_path: &PathBuf, index_path: &PathBuf) -> Result<Index> {
fn build_index(&self, source_path: &PathBuf, index_path: &PathBuf, existing_index: Option<Index>) -> Result<Index> {
let mut input = match Input::open(&source_path) {
Ok(input) => input,
Err(err) => {
Expand Down Expand Up @@ -211,7 +223,13 @@ impl Indexer {
});
}
};
self.process_file(&source_path, (&metadata).try_into()?, &mut input.stream, &mut output)
self.process_file(
&source_path,
(&metadata).try_into()?,
&mut input.stream,
&mut output,
existing_index,
)
}

fn process_file(
Expand All @@ -220,6 +238,7 @@ impl Indexer {
metadata: Metadata,
input: &mut Reader,
output: &mut Writer,
existing_index: Option<Index>,
) -> Result<Index> {
let n = self.concurrency;
let sfi = Arc::new(SegmentBufFactory::new(self.buffer_size.try_into()?));
Expand All @@ -229,14 +248,14 @@ impl Indexer {
// prepare receive/transmit channels for output data
let (txo, rxo): (Vec<_>, Vec<_>) = (0..n)
.into_iter()
.map(|_| channel::bounded::<(usize, Stat, Chronology)>(1))
.map(|_| channel::bounded::<(usize, Stat, Chronology, Option<Hash>)>(1))
.unzip();
// spawn reader thread
let reader = scope.spawn(closure!(clone sfi, |_| -> Result<()> {
let mut sn: usize = 0;
let scanner = Scanner::new(sfi, b'\n');
for item in scanner.items(input).with_max_segment_size(self.max_message_size.try_into()?) {
if let Err(_) = txi[sn % n].send(item?) {
if let Err(_) = txi[sn % n].send((sn, item?)) {
break;
}
sn += 1;
Expand All @@ -245,19 +264,27 @@ impl Indexer {
}));
// spawn processing threads
for (rxi, txo) in izip!(rxi, txo) {
scope.spawn(closure!(ref sfi, |_| {
for segment in rxi.iter() {
let ((stat, chronology), segment) = match segment {
Segment::Complete(segment) => (self.process_segment(&segment), segment),
scope.spawn(closure!(ref sfi, ref existing_index, |_| {
for (sn, segment) in rxi.iter() {
let (stat, chronology, segment, hash) = match segment {
Segment::Complete(segment) => {
let hash = Hash::WyHash(wyhash::wyhash(segment.data(), 0));
let (stat, chronology) = existing_index
.as_ref()
.and_then(|index| Self::match_segment(&index, sn, &hash))
.map(|(stat, chronology)| (stat, chronology))
.unwrap_or_else(|| self.process_segment(&segment));
(stat, chronology, segment, Some(hash))
}
Segment::Incomplete(segment, _) => {
let mut stat = Stat::new();
stat.add_invalid();
((stat, Chronology::default()), segment)
(stat, Chronology::default(), segment, None)
}
};
let size = segment.data().len();
sfi.recycle(segment);
if let Err(_) = txo.send((size, stat, chronology)) {
if let Err(_) = txo.send((size, stat, chronology, hash)) {
break;
};
}
Expand All @@ -280,12 +307,15 @@ impl Indexer {
let mut offset: u64 = 0;
loop {
match rxo[sn % n].recv() {
Ok((size, stat, chronology)) => {
Ok((size, stat, chronology, hash)) => {
index.source.stat.merge(&stat);
index
.source
.blocks
.push(SourceBlock::new(offset, size.try_into()?, stat, chronology));
index.source.blocks.push(SourceBlock::new(
offset,
size.try_into()?,
stat,
chronology,
hash,
));
offset += u64::try_from(size)?;
}
Err(RecvError) => {
Expand Down Expand Up @@ -385,6 +415,18 @@ impl Indexer {
};
(stat, chronology)
}

fn match_segment(index: &Index, sn: usize, hash: &Hash) -> Option<(Stat, Chronology)> {
index.source().blocks.get(sn).and_then(|block| {
block.hash.as_ref().and_then(|h| {
if h == hash {
Some((block.stat.clone(), block.chronology.clone()))
} else {
None
}
})
})
}
}

// ---
Expand Down Expand Up @@ -488,6 +530,7 @@ impl Index {
size: block.get_size(),
stat: Self::load_stat(block.get_index()?),
chronology: Self::load_chronology(block.get_chronology()?)?,
hash: Self::load_hash(block.get_hash()?)?,
})
}
Ok(result)
Expand All @@ -500,7 +543,8 @@ impl Index {
block.set_offset(source_block.offset);
block.set_size(source_block.size);
Self::save_stat(block.reborrow().init_index(), &source_block.stat);
Self::save_chronology(block.init_chronology(), &source_block.chronology)?;
Self::save_chronology(block.reborrow().init_chronology(), &source_block.chronology)?;
Self::save_hash(block.init_hash(), &source_block.hash)?;
}
Ok(())
}
Expand Down Expand Up @@ -566,6 +610,45 @@ impl Index {
}
Ok(())
}

fn load_hash(hash: schema::hash::Reader) -> Result<Option<Hash>> {
match hash.get_algorithm().ok() {
Some(schema::HashAlgorithm::Sha256) => {
let value = hash.get_value()?;
if value.len() == 32 {
Ok(Some(Hash::Sha256(*GenericArray::from_slice(value))))
} else {
Ok(None)
}
}
Some(schema::HashAlgorithm::WyHash) => {
let value = hash.get_value()?;
if value.len() == 8 {
Ok(Some(Hash::WyHash(u64::from_be_bytes(value.try_into().unwrap()))))
} else {
Ok(None)
}
}
Some(schema::HashAlgorithm::GxHash64) => Ok(None),
None => Ok(None),
}
}

fn save_hash(mut to: schema::hash::Builder, from: &Option<Hash>) -> Result<()> {
match from {
Some(Hash::Sha256(value)) => {
to.set_algorithm(schema::HashAlgorithm::Sha256);
to.set_value(value.as_slice());
}
Some(Hash::WyHash(value)) => {
to.set_algorithm(schema::HashAlgorithm::WyHash);
to.set_value(&value.to_be_bytes());
}
Some(Hash::GxHash64(_)) => (),
None => (),
}
Ok(())
}
}

// ---
Expand All @@ -589,16 +672,18 @@ pub struct SourceBlock {
pub size: u32,
pub stat: Stat,
pub chronology: Chronology,
pub hash: Option<Hash>,
}

impl SourceBlock {
/// Returns a new SourceBlock.
pub fn new(offset: u64, size: u32, stat: Stat, chronology: Chronology) -> Self {
pub fn new(offset: u64, size: u32, stat: Stat, chronology: Chronology, hash: Option<Hash>) -> Self {
Self {
offset,
size,
stat,
chronology,
hash,
}
}

Expand Down
Loading
Loading