Skip to content

Commit

Permalink
fix the persistent graph materialize issue for disk as well
Browse files Browse the repository at this point in the history
  • Loading branch information
fabianmurariu committed Dec 17, 2024
1 parent 5a7758b commit 8c24146
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 39 deletions.
39 changes: 36 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ debug = 0

[workspace.dependencies]
#[public-storage]
pometry-storage = { version = ">=0.8.1", path = "pometry-storage" }
# pometry-storage = { version = ">=0.8.1", path = "pometry-storage" }
#[private-storage]
# pometry-storage = { path = "pometry-storage-private", package = "pometry-storage-private" }
pometry-storage = { path = "pometry-storage-private", package = "pometry-storage-private" }
async-graphql = { version = "7.0.13", features = ["dynamic-schema"] }
bincode = "1.3.3"
async-graphql-poem = "7.0.13"
Expand Down
2 changes: 1 addition & 1 deletion pometry-storage-private
48 changes: 15 additions & 33 deletions raphtory/src/disk_graph/graph_impl/tprops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ use std::{iter, ops::Range};

impl<'a> TPropOps<'a> for TPropColumn<'a, ChunkedBoolCol<'a>, TimeIndexEntry> {
fn last_before(&self, t: TimeIndexEntry) -> Option<(TimeIndexEntry, Prop)> {
let (props, timestamps) = self.into_inner();
let (t, t_index) = timestamps.last_before(t)?;
let v = props.get(t_index)?;
Some((t, v.into()))
self.iter_window_inner(TimeIndexEntry::MIN..t)
.filter_map(|(t, v)| v.map(|v| (t, v)))
.next_back()
.map(|(t, v)| (t, v.into()))
}

fn iter(self) -> impl Iterator<Item = (TimeIndexEntry, Prop)> + Send + 'a {
Expand All @@ -31,13 +31,7 @@ impl<'a> TPropOps<'a> for TPropColumn<'a, ChunkedBoolCol<'a>, TimeIndexEntry> {
self,
r: Range<TimeIndexEntry>,
) -> impl Iterator<Item = (TimeIndexEntry, Prop)> + Send + 'a {
let (props, timestamps) = self.into_inner();
let start = timestamps.position(&r.start);
let end = timestamps.position(&r.end);
timestamps
.sliced(start..end)
.into_iter()
.zip(props.sliced(start..end))
self.iter_window_inner(r)
.filter_map(|(t, v)| v.map(|v| (t, v.into())))
}

Expand All @@ -59,10 +53,10 @@ impl<'a, T: NativeType + Into<Prop>> TPropOps<'a>
for TPropColumn<'a, ChunkedPrimitiveCol<'a, T>, TimeIndexEntry>
{
fn last_before(&self, t: TimeIndexEntry) -> Option<(TimeIndexEntry, Prop)> {
let (props, timestamps) = self.into_inner();
let (t, t_index) = timestamps.last_before(t)?;
let v = props.get(t_index)?;
Some((t, v.into()))
self.iter_window_inner(TimeIndexEntry::MIN..t)
.filter_map(|(t, v)| v.map(|v| (t, v)))
.next_back()
.map(|(t, v)| (t, v.into()))
}

fn iter(self) -> impl Iterator<Item = (TimeIndexEntry, Prop)> + Send + 'a {
Expand All @@ -77,13 +71,7 @@ impl<'a, T: NativeType + Into<Prop>> TPropOps<'a>
self,
r: Range<TimeIndexEntry>,
) -> impl Iterator<Item = (TimeIndexEntry, Prop)> + Send + 'a {
let (props, timestamps) = self.into_inner();
let start = timestamps.position(&r.start);
let end = timestamps.position(&r.end);
timestamps
.sliced(start..end)
.into_iter()
.zip(props.sliced(start..end))
self.iter_window_inner(r)
.filter_map(|(t, v)| v.map(|v| (t, v.into())))
}

Expand All @@ -103,10 +91,10 @@ impl<'a, T: NativeType + Into<Prop>> TPropOps<'a>

impl<'a, I: Offset> TPropOps<'a> for TPropColumn<'a, StringCol<'a, I>, TimeIndexEntry> {
fn last_before(&self, t: TimeIndexEntry) -> Option<(TimeIndexEntry, Prop)> {
let (props, timestamps) = self.into_inner();
let (t, t_index) = timestamps.last_before(t)?;
let v = props.get(t_index)?;
Some((t, v.into()))
self.iter_window_inner(TimeIndexEntry::MIN..t)
.filter_map(|(t, v)| v.map(|v| (t, v)))
.next_back()
.map(|(t, v)| (t, v.into()))
}

fn iter(self) -> impl Iterator<Item = (TimeIndexEntry, Prop)> + Send + 'a {
Expand All @@ -121,13 +109,7 @@ impl<'a, I: Offset> TPropOps<'a> for TPropColumn<'a, StringCol<'a, I>, TimeIndex
self,
r: Range<TimeIndexEntry>,
) -> impl Iterator<Item = (TimeIndexEntry, Prop)> + Send + 'a {
let (props, timestamps) = self.into_inner();
let start = timestamps.position(&r.start);
let end = timestamps.position(&r.end);
timestamps
.sliced(start..end)
.into_iter()
.zip(props.sliced(start..end))
self.iter_window_inner(r)
.filter_map(|(t, v)| v.map(|v| (t, v.into())))
}

Expand Down
13 changes: 13 additions & 0 deletions raphtory/src/disk_graph/storage_interface/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,19 @@ impl<'a> DiskNode<'a> {
})
}

pub fn last_before_row(self, t: TimeIndexEntry) -> Vec<(usize, Prop)> {
self.graph
.node_properties()
.temporal_props()
.into_iter()
.enumerate()
.filter_map(|(layer, props)| {
let ts = props.timestamps::<TimeIndexEntry>(self.vid);
let idx = ts.last_before(t)?;
})
.collect()
}

pub fn constant_node_prop_ids(self) -> BoxedLIter<'a, usize> {
match &self.graph.node_properties().const_props {
None => Box::new(std::iter::empty()),
Expand Down

0 comments on commit 8c24146

Please sign in to comment.