Skip to content

Commit

Permalink
fix materialize issues for persistent graphs
Browse files Browse the repository at this point in the history
  • Loading branch information
fabianmurariu committed Dec 17, 2024
1 parent c2b656a commit 5a7758b
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 12 deletions.
21 changes: 13 additions & 8 deletions raphtory/src/core/entities/properties/tprop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,22 @@ impl<'a> TPropCell<'a> {
log,
}
}

fn iter_window_inner(
self,
r: Range<TimeIndexEntry>,
) -> impl DoubleEndedIterator<Item = (TimeIndexEntry, Prop)> + Send + 'a {
self.t_cell.into_iter().flat_map(move |t_cell| {
t_cell
.iter_window(r.clone())
.filter_map(move |(t, &id)| self.log?.get(id?).map(|prop| (*t, prop)))
})
}
}

impl<'a> TPropOps<'a> for TPropCell<'a> {
fn last_before(&self, t: TimeIndexEntry) -> Option<(TimeIndexEntry, Prop)> {
self.t_cell?
.last_before(t)
.and_then(|(t, &id)| self.log?.get(id?).map(|prop| (t, prop))) // FIXME: is this correct?
self.iter_window_inner(TimeIndexEntry::MIN..t).next_back()
}

fn iter(self) -> impl Iterator<Item = (TimeIndexEntry, Prop)> + Send + 'a {
Expand All @@ -72,11 +81,7 @@ impl<'a> TPropOps<'a> for TPropCell<'a> {
self,
r: Range<TimeIndexEntry>,
) -> impl Iterator<Item = (TimeIndexEntry, Prop)> + Send + 'a {
self.t_cell.into_iter().flat_map(move |t_cell| {
t_cell
.iter_window(r.clone())
.filter_map(move |(t, &id)| self.log?.get(id?).map(|prop| (*t, prop)))
})
self.iter_window_inner(r)
}

fn at(self, ti: &TimeIndexEntry) -> Option<Prop> {
Expand Down
11 changes: 11 additions & 0 deletions raphtory/src/core/storage/node_entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,17 @@ impl<'a> NodePtr<'a> {
})
}

pub fn last_before_row(self, t: TimeIndexEntry) -> Vec<(usize, Prop)> {
self.t_props_log
.iter()
.enumerate()
.filter_map(|(prop_id, _)| {
let t_prop = self.t_prop(prop_id);
t_prop.last_before(t).map(|(_, v)| (prop_id, v))
})
.collect()
}

pub fn into_rows_window(
self,
w: Range<TimeIndexEntry>,
Expand Down
8 changes: 8 additions & 0 deletions raphtory/src/db/api/storage/graph/nodes/node_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ impl<'a> NodeStorageRef<'a> {
NodeStorageRef::Disk(disk_node) => disk_node.into_rows_window(window).into_dyn_boxed(),
}
}

pub fn last_before_row(self, t: TimeIndexEntry) -> Vec<(usize, Prop)> {
match self {
NodeStorageRef::Mem(node_entry) => node_entry.last_before_row(t),
#[cfg(feature = "storage")]
NodeStorageRef::Disk(disk_node) => disk_node.last_before_row(t),
}
}
}

impl<'a> From<NodePtr<'a>> for NodeStorageRef<'a> {
Expand Down
22 changes: 18 additions & 4 deletions raphtory/src/db/graph/views/deletion_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,18 @@ impl TimeSemantics for PersistentGraph {
v: VID,
w: Option<Range<i64>>,
) -> BoxedLIter<(TimeIndexEntry, Vec<(usize, Prop)>)> {
self.0.node_history_rows(v, w)
// if window exists, we need to add the first row before the window
if let Some(w) = w {
let t = TimeIndexEntry::start(w.start.saturating_add(1));
let first_row = self.core_node_entry(v).as_ref().last_before_row(t);
Box::new(
std::iter::once(first_row)
.map(move |row| (TimeIndexEntry::start(w.start), row))
.chain(self.0.node_history_rows(v, Some(w))),
)
} else {
self.0.node_history_rows(v, w)
}
}

fn node_property_history(&self, v: VID, w: Option<Range<i64>>) -> BoxedLIter<TimeIndexEntry> {
Expand Down Expand Up @@ -686,6 +697,7 @@ mod test_deletions {
},
},
prelude::*,
test_storage,
};
use itertools::Itertools;
use raphtory_api::core::{entities::GID, utils::logging::global_info_logger};
Expand Down Expand Up @@ -893,9 +905,11 @@ mod test_deletions {
let g = PersistentGraph::new();
g.add_node(0, 1, [("test", "test")], None).unwrap();

let wg = g.window(3, 5);
let mg = wg.materialize().unwrap();
assert_graph_equal(&wg, &mg);
test_storage!(&g, |g| {
let wg = g.window(3, 5);
let mg = wg.materialize().unwrap();
assert_graph_equal(&wg, &mg);
})
}

#[test]
Expand Down

0 comments on commit 5a7758b

Please sign in to comment.