From 8c24146a1171413f3e42668f7f56e679b7bce0a0 Mon Sep 17 00:00:00 2001 From: Fabian Murariu Date: Tue, 17 Dec 2024 15:53:16 +0000 Subject: [PATCH] fix the persistent graph materialize issue for disk as well --- Cargo.lock | 39 +++++++++++++-- Cargo.toml | 4 +- pometry-storage-private | 2 +- raphtory/src/disk_graph/graph_impl/tprops.rs | 48 ++++++------------- .../src/disk_graph/storage_interface/node.rs | 13 +++++ 5 files changed, 67 insertions(+), 39 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f0cf30a70..283c46e0f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -72,6 +72,7 @@ dependencies = [ "const-random", "getrandom", "once_cell", + "serde", "version_check", "zerocopy", ] @@ -4423,6 +4424,37 @@ dependencies = [ name = "pometry-storage" version = "0.14.0" +[[package]] +name = "pometry-storage-private" +version = "0.12.1" +dependencies = [ + "ahash", + "bincode", + "bytemuck", + "criterion", + "itertools 0.12.1", + "kdam", + "memmap2 0.9.5", + "once_cell", + "parking_lot", + "polars-arrow", + "polars-parquet", + "polars-utils", + "pretty_assertions", + "proptest", + "rand", + "raphtory-api", + "rayon", + "serde", + "serde_json", + "strum", + "tempfile", + "thiserror 1.0.69", + "tracing", + "tracing-subscriber", + "twox-hash 1.6.3", +] + [[package]] name = "portable-atomic" version = "1.10.0" @@ -4859,7 +4891,7 @@ dependencies = [ "polars-core", "polars-io", "polars-parquet", - "pometry-storage", + "pometry-storage-private", "pretty_assertions", "proptest", "prost", @@ -4923,7 +4955,7 @@ dependencies = [ "criterion", "csv", "flate2", - "pometry-storage", + "pometry-storage-private", "rand", "raphtory", "raphtory-api", @@ -4950,7 +4982,7 @@ dependencies = [ "pest", "pest_derive", "polars-arrow", - "pometry-storage", + "pometry-storage-private", "pretty_assertions", "proptest", "raphtory", @@ -6654,6 +6686,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ "cfg-if", + "rand", "static_assertions", ] diff --git a/Cargo.toml b/Cargo.toml index 3534075e0..51f20a290 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,9 +40,9 @@ debug = 0 [workspace.dependencies] #[public-storage] -pometry-storage = { version = ">=0.8.1", path = "pometry-storage" } +# pometry-storage = { version = ">=0.8.1", path = "pometry-storage" } #[private-storage] -# pometry-storage = { path = "pometry-storage-private", package = "pometry-storage-private" } +pometry-storage = { path = "pometry-storage-private", package = "pometry-storage-private" } async-graphql = { version = "7.0.13", features = ["dynamic-schema"] } bincode = "1.3.3" async-graphql-poem = "7.0.13" diff --git a/pometry-storage-private b/pometry-storage-private index e6bfeac35..131261cf6 160000 --- a/pometry-storage-private +++ b/pometry-storage-private @@ -1 +1 @@ -Subproject commit e6bfeac35e2a4d5d61f166f3645f1f010828518a +Subproject commit 131261cf60daa1a65b0aa2e78bb3dfdc66dbc2ad diff --git a/raphtory/src/disk_graph/graph_impl/tprops.rs b/raphtory/src/disk_graph/graph_impl/tprops.rs index cf678f686..dde4f1e9e 100644 --- a/raphtory/src/disk_graph/graph_impl/tprops.rs +++ b/raphtory/src/disk_graph/graph_impl/tprops.rs @@ -13,10 +13,10 @@ use std::{iter, ops::Range}; impl<'a> TPropOps<'a> for TPropColumn<'a, ChunkedBoolCol<'a>, TimeIndexEntry> { fn last_before(&self, t: TimeIndexEntry) -> Option<(TimeIndexEntry, Prop)> { - let (props, timestamps) = self.into_inner(); - let (t, t_index) = timestamps.last_before(t)?; - let v = props.get(t_index)?; - Some((t, v.into())) + self.iter_window_inner(TimeIndexEntry::MIN..t) + .filter_map(|(t, v)| v.map(|v| (t, v))) + .next_back() + .map(|(t, v)| (t, v.into())) } fn iter(self) -> impl Iterator + Send + 'a { @@ -31,13 +31,7 @@ impl<'a> TPropOps<'a> for TPropColumn<'a, ChunkedBoolCol<'a>, TimeIndexEntry> { self, r: Range, ) -> impl Iterator + Send + 'a { - let (props, timestamps) = self.into_inner(); - let start = timestamps.position(&r.start); - let end = timestamps.position(&r.end); - timestamps - .sliced(start..end) - .into_iter() - .zip(props.sliced(start..end)) + self.iter_window_inner(r) .filter_map(|(t, v)| v.map(|v| (t, v.into()))) } @@ -59,10 +53,10 @@ impl<'a, T: NativeType + Into> TPropOps<'a> for TPropColumn<'a, ChunkedPrimitiveCol<'a, T>, TimeIndexEntry> { fn last_before(&self, t: TimeIndexEntry) -> Option<(TimeIndexEntry, Prop)> { - let (props, timestamps) = self.into_inner(); - let (t, t_index) = timestamps.last_before(t)?; - let v = props.get(t_index)?; - Some((t, v.into())) + self.iter_window_inner(TimeIndexEntry::MIN..t) + .filter_map(|(t, v)| v.map(|v| (t, v))) + .next_back() + .map(|(t, v)| (t, v.into())) } fn iter(self) -> impl Iterator + Send + 'a { @@ -77,13 +71,7 @@ impl<'a, T: NativeType + Into> TPropOps<'a> self, r: Range, ) -> impl Iterator + Send + 'a { - let (props, timestamps) = self.into_inner(); - let start = timestamps.position(&r.start); - let end = timestamps.position(&r.end); - timestamps - .sliced(start..end) - .into_iter() - .zip(props.sliced(start..end)) + self.iter_window_inner(r) .filter_map(|(t, v)| v.map(|v| (t, v.into()))) } @@ -103,10 +91,10 @@ impl<'a, T: NativeType + Into> TPropOps<'a> impl<'a, I: Offset> TPropOps<'a> for TPropColumn<'a, StringCol<'a, I>, TimeIndexEntry> { fn last_before(&self, t: TimeIndexEntry) -> Option<(TimeIndexEntry, Prop)> { - let (props, timestamps) = self.into_inner(); - let (t, t_index) = timestamps.last_before(t)?; - let v = props.get(t_index)?; - Some((t, v.into())) + self.iter_window_inner(TimeIndexEntry::MIN..t) + .filter_map(|(t, v)| v.map(|v| (t, v))) + .next_back() + .map(|(t, v)| (t, v.into())) } fn iter(self) -> impl Iterator + Send + 'a { @@ -121,13 +109,7 @@ impl<'a, I: Offset> TPropOps<'a> for TPropColumn<'a, StringCol<'a, I>, TimeIndex self, r: Range, ) -> impl Iterator + Send + 'a { - let (props, timestamps) = self.into_inner(); - let start = timestamps.position(&r.start); - let end = timestamps.position(&r.end); - timestamps - .sliced(start..end) - .into_iter() - .zip(props.sliced(start..end)) + self.iter_window_inner(r) .filter_map(|(t, v)| v.map(|v| (t, v.into()))) } diff --git a/raphtory/src/disk_graph/storage_interface/node.rs b/raphtory/src/disk_graph/storage_interface/node.rs index 79ff29802..0f2c00b30 100644 --- a/raphtory/src/disk_graph/storage_interface/node.rs +++ b/raphtory/src/disk_graph/storage_interface/node.rs @@ -65,6 +65,19 @@ impl<'a> DiskNode<'a> { }) } + pub fn last_before_row(self, t: TimeIndexEntry) -> Vec<(usize, Prop)> { + self.graph + .node_properties() + .temporal_props() + .into_iter() + .enumerate() + .filter_map(|(layer, props)| { + let ts = props.timestamps::(self.vid); + let idx = ts.last_before(t)?; + }) + .collect() + } + pub fn constant_node_prop_ids(self) -> BoxedLIter<'a, usize> { match &self.graph.node_properties().const_props { None => Box::new(std::iter::empty()),