Skip to content

Commit

Permalink
Merge pull request #123 from pamburus/feature/segment-hashing
Browse files Browse the repository at this point in the history
new: Improved index rebuilding performance
  • Loading branch information
pamburus authored Jan 28, 2024
2 parents 4c397b0 + 5c645ff commit 8127c8b
Show file tree
Hide file tree
Showing 6 changed files with 498 additions and 41 deletions.
4 changes: 2 additions & 2 deletions .build/capnp/index.capnp.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"source": "e9a1ab0d52fc8a798553feddc01dd0d3221ea2551134bba7993598f4171300af",
"target": "fc0896783f60725dfcde8665bbb81f0fa1385d4b7cda23037ea282219a305e9c"
"source": "7cafdecc6197b3fe7978bcfe649598aa26938d097c052f9eefe37676579ddb9f",
"target": "47b895a77c88757bb44bb6061d0aefd0b24636e10eb078a1b3f65acf8e6398e2"
}
12 changes: 11 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ categories = ["command-line-utilities"]
description = "Utility for viewing json-formatted log files."
keywords = ["cli", "human", "log"]
name = "hl"
version = "0.25.1-beta.2"
version = "0.25.1-beta.3"
edition = "2021"
build = "build.rs"

Expand All @@ -17,7 +17,6 @@ serde_json = { version = "1", features = ["raw_value"] }
sha2 = "0"

[dependencies]
nu-ansi-term = "0"
atoi = "1"
bincode = "1"
bitmask = "0"
Expand All @@ -43,6 +42,7 @@ humantime = "2"
itertools = "0"
itoa = { version = "1", default-features = false }
notify = { version = "6", features = ["macos_kqueue"] }
nu-ansi-term = "0"
num_cpus = "1"
once_cell = "1"
pest = "2"
Expand All @@ -60,6 +60,7 @@ snap = "1"
thiserror = "1"
wildmatch = "2"
winapi = {version = "0", features = ["handleapi"]}
wyhash = "0"

[target.'cfg(target_os = "macos")'.dependencies]
kqueue = "1"
Expand Down
14 changes: 14 additions & 0 deletions schema/index.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ struct SourceBlock {
size @1 :UInt32;
index @2 :Index;
chronology @3 :Chronology;
hash @4 :Hash;
}

# Index holds index information of a block or a whole file.
Expand Down Expand Up @@ -65,6 +66,19 @@ struct Chronology {
jumps @3 :List(UInt32);
}

# HashAlgorithm is an algorithm used to calculate data hash.
enum HashAlgorithm {
sha256 @0;
gxHash64 @1;
wyHash @2;
}

# Hash is a hash of some data.
struct Hash {
algorithm @0 :HashAlgorithm;
value @1 :Data;
}

# Various flags.
const flagLevelDebug :UInt64 = 0x0000000000000001;
const flagLevelInfo :UInt64 = 0x0000000000000002;
Expand Down
121 changes: 103 additions & 18 deletions src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ pub type Reader = dyn Read + Send + Sync;

// ---

#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug, Hash)]
pub enum Hash {
Sha256(GenericArray<u8, U32>),
GxHash64(u64),
WyHash(u64),
}

// ---

#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug, Hash)]
pub struct Timestamp {
pub sec: i64,
Expand Down Expand Up @@ -148,6 +157,7 @@ impl Indexer {
let meta = source_path.metadata()?;
let hash = hex::encode(sha256(source_path.to_string_lossy().as_bytes()));
let index_path = self.dir.join(PathBuf::from(hash));
let mut existing_index = None;
if Path::new(&index_path).exists() {
let mut file = match File::open(&index_path) {
Ok(file) => file,
Expand All @@ -162,10 +172,11 @@ impl Indexer {
if meta.len() == index.source().size && ts(meta.modified()?) == index.source().modified {
return Ok(index);
}
existing_index = Some(index)
}
}

self.build_index(&source_path, &index_path)
self.build_index(&source_path, &index_path, existing_index)
}

/// Builds index for the given stream.
Expand All @@ -180,10 +191,11 @@ impl Indexer {
},
input,
&mut std::io::sink(),
None,
)
}

fn build_index(&self, source_path: &PathBuf, index_path: &PathBuf) -> Result<Index> {
fn build_index(&self, source_path: &PathBuf, index_path: &PathBuf, existing_index: Option<Index>) -> Result<Index> {
let mut input = match Input::open(&source_path) {
Ok(input) => input,
Err(err) => {
Expand Down Expand Up @@ -211,7 +223,13 @@ impl Indexer {
});
}
};
self.process_file(&source_path, (&metadata).try_into()?, &mut input.stream, &mut output)
self.process_file(
&source_path,
(&metadata).try_into()?,
&mut input.stream,
&mut output,
existing_index,
)
}

fn process_file(
Expand All @@ -220,6 +238,7 @@ impl Indexer {
metadata: Metadata,
input: &mut Reader,
output: &mut Writer,
existing_index: Option<Index>,
) -> Result<Index> {
let n = self.concurrency;
let sfi = Arc::new(SegmentBufFactory::new(self.buffer_size.try_into()?));
Expand All @@ -229,14 +248,14 @@ impl Indexer {
// prepare receive/transmit channels for output data
let (txo, rxo): (Vec<_>, Vec<_>) = (0..n)
.into_iter()
.map(|_| channel::bounded::<(usize, Stat, Chronology)>(1))
.map(|_| channel::bounded::<(usize, Stat, Chronology, Option<Hash>)>(1))
.unzip();
// spawn reader thread
let reader = scope.spawn(closure!(clone sfi, |_| -> Result<()> {
let mut sn: usize = 0;
let scanner = Scanner::new(sfi, b'\n');
for item in scanner.items(input).with_max_segment_size(self.max_message_size.try_into()?) {
if let Err(_) = txi[sn % n].send(item?) {
if let Err(_) = txi[sn % n].send((sn, item?)) {
break;
}
sn += 1;
Expand All @@ -245,19 +264,27 @@ impl Indexer {
}));
// spawn processing threads
for (rxi, txo) in izip!(rxi, txo) {
scope.spawn(closure!(ref sfi, |_| {
for segment in rxi.iter() {
let ((stat, chronology), segment) = match segment {
Segment::Complete(segment) => (self.process_segment(&segment), segment),
scope.spawn(closure!(ref sfi, ref existing_index, |_| {
for (sn, segment) in rxi.iter() {
let (stat, chronology, segment, hash) = match segment {
Segment::Complete(segment) => {
let hash = Hash::WyHash(wyhash::wyhash(segment.data(), 0));
let (stat, chronology) = existing_index
.as_ref()
.and_then(|index| Self::match_segment(&index, sn, &hash))
.map(|(stat, chronology)| (stat, chronology))
.unwrap_or_else(|| self.process_segment(&segment));
(stat, chronology, segment, Some(hash))
}
Segment::Incomplete(segment, _) => {
let mut stat = Stat::new();
stat.add_invalid();
((stat, Chronology::default()), segment)
(stat, Chronology::default(), segment, None)
}
};
let size = segment.data().len();
sfi.recycle(segment);
if let Err(_) = txo.send((size, stat, chronology)) {
if let Err(_) = txo.send((size, stat, chronology, hash)) {
break;
};
}
Expand All @@ -280,12 +307,15 @@ impl Indexer {
let mut offset: u64 = 0;
loop {
match rxo[sn % n].recv() {
Ok((size, stat, chronology)) => {
Ok((size, stat, chronology, hash)) => {
index.source.stat.merge(&stat);
index
.source
.blocks
.push(SourceBlock::new(offset, size.try_into()?, stat, chronology));
index.source.blocks.push(SourceBlock::new(
offset,
size.try_into()?,
stat,
chronology,
hash,
));
offset += u64::try_from(size)?;
}
Err(RecvError) => {
Expand Down Expand Up @@ -385,6 +415,18 @@ impl Indexer {
};
(stat, chronology)
}

fn match_segment(index: &Index, sn: usize, hash: &Hash) -> Option<(Stat, Chronology)> {
index.source().blocks.get(sn).and_then(|block| {
block.hash.as_ref().and_then(|h| {
if h == hash {
Some((block.stat.clone(), block.chronology.clone()))
} else {
None
}
})
})
}
}

// ---
Expand Down Expand Up @@ -488,6 +530,7 @@ impl Index {
size: block.get_size(),
stat: Self::load_stat(block.get_index()?),
chronology: Self::load_chronology(block.get_chronology()?)?,
hash: Self::load_hash(block.get_hash()?)?,
})
}
Ok(result)
Expand All @@ -500,7 +543,8 @@ impl Index {
block.set_offset(source_block.offset);
block.set_size(source_block.size);
Self::save_stat(block.reborrow().init_index(), &source_block.stat);
Self::save_chronology(block.init_chronology(), &source_block.chronology)?;
Self::save_chronology(block.reborrow().init_chronology(), &source_block.chronology)?;
Self::save_hash(block.init_hash(), &source_block.hash)?;
}
Ok(())
}
Expand Down Expand Up @@ -566,6 +610,45 @@ impl Index {
}
Ok(())
}

fn load_hash(hash: schema::hash::Reader) -> Result<Option<Hash>> {
match hash.get_algorithm().ok() {
Some(schema::HashAlgorithm::Sha256) => {
let value = hash.get_value()?;
if value.len() == 32 {
Ok(Some(Hash::Sha256(*GenericArray::from_slice(value))))
} else {
Ok(None)
}
}
Some(schema::HashAlgorithm::WyHash) => {
let value = hash.get_value()?;
if value.len() == 8 {
Ok(Some(Hash::WyHash(u64::from_be_bytes(value.try_into().unwrap()))))
} else {
Ok(None)
}
}
Some(schema::HashAlgorithm::GxHash64) => Ok(None),
None => Ok(None),
}
}

fn save_hash(mut to: schema::hash::Builder, from: &Option<Hash>) -> Result<()> {
match from {
Some(Hash::Sha256(value)) => {
to.set_algorithm(schema::HashAlgorithm::Sha256);
to.set_value(value.as_slice());
}
Some(Hash::WyHash(value)) => {
to.set_algorithm(schema::HashAlgorithm::WyHash);
to.set_value(&value.to_be_bytes());
}
Some(Hash::GxHash64(_)) => (),
None => (),
}
Ok(())
}
}

// ---
Expand All @@ -589,16 +672,18 @@ pub struct SourceBlock {
pub size: u32,
pub stat: Stat,
pub chronology: Chronology,
pub hash: Option<Hash>,
}

impl SourceBlock {
/// Returns a new SourceBlock.
pub fn new(offset: u64, size: u32, stat: Stat, chronology: Chronology) -> Self {
pub fn new(offset: u64, size: u32, stat: Stat, chronology: Chronology, hash: Option<Hash>) -> Self {
Self {
offset,
size,
stat,
chronology,
hash,
}
}

Expand Down
Loading

0 comments on commit 8127c8b

Please sign in to comment.