Skip to content

Commit

Permalink
refine the rows API
Browse files Browse the repository at this point in the history
  • Loading branch information
fabianmurariu committed Dec 12, 2024
1 parent 8bce638 commit 243595d
Show file tree
Hide file tree
Showing 14 changed files with 3,217 additions and 641 deletions.
3,661 changes: 3,128 additions & 533 deletions Cargo.lock

Large diffs are not rendered by default.

24 changes: 12 additions & 12 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
[workspace]
members = [
"raphtory",
# "raphtory-cypher",
# "raphtory-benchmark",
# "pometry-storage",
# "examples/rust",
# "examples/netflow",
# "examples/custom-gql-apis",
# "python",
# "js-raphtory",
# "raphtory-graphql",
"raphtory-cypher",
"raphtory-benchmark",
"pometry-storage",
"examples/rust",
"examples/netflow",
"examples/custom-gql-apis",
"python",
"js-raphtory",
"raphtory-graphql",
"raphtory-api",
]
default-members = [
"raphtory",
# "raphtory-graphql"
"raphtory-graphql"
]
resolver = "2"

Expand Down Expand Up @@ -43,9 +43,9 @@ debug = 0

[workspace.dependencies]
#[public-storage]
# pometry-storage = { version = ">=0.8.1", path = "pometry-storage" }
pometry-storage = { version = ">=0.8.1", path = "pometry-storage" }
#[private-storage]
pometry-storage = { path = "pometry-storage-private", package = "pometry-storage-private" }
# pometry-storage = { path = "pometry-storage-private", package = "pometry-storage-private" }
async-graphql = { version = ">=7.0.5, <7.0.8", features = [
"dynamic-schema",
] } # 7.0.8+ is borked, see https://github.com/async-graphql/async-graphql/issues/1586
Expand Down
8 changes: 3 additions & 5 deletions raphtory/src/db/api/mutation/import_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,11 +360,9 @@ fn import_node_internal<
let props = row
.into_iter()
.zip(&keys)
.filter_map(|(prop, key)| {
prop.map(|prop| {
let prop_id = graph.resolve_node_property(key, prop.dtype(), false);
prop_id.map(|prop_id| (prop_id.inner(), prop))
})
.map(|((_, prop), key)| {
let prop_id = graph.resolve_node_property(key, prop.dtype(), false);
prop_id.map(|prop_id| (prop_id.inner(), prop))
})
.collect::<Result<Vec<_>, _>>()?;
graph.internal_add_node(t, node_internal, &props)?;
Expand Down
2 changes: 1 addition & 1 deletion raphtory/src/db/api/properties/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub trait TemporalPropertyViewOps {
}

pub trait TemporalPropertiesRowView {
fn rows(&self) -> BoxedLIter<(TimeIndexEntry, Vec<Option<Prop>>)>;
fn rows(&self) -> BoxedLIter<(TimeIndexEntry, Vec<(usize, Prop)>)>;
fn edge_ts(&self) -> BoxedLIter<TimeIndexEntry>;
}

Expand Down
20 changes: 17 additions & 3 deletions raphtory/src/db/api/storage/graph/storage_ops/time_semantics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,20 +152,34 @@ impl TimeSemantics for GraphStorage {
&'a self,
v: VID,
w: Option<Range<i64>>,
) -> BoxedLIter<'a, (TimeIndexEntry, Vec<Option<Prop>>)> {
) -> BoxedLIter<'a, (TimeIndexEntry, Vec<(usize, Prop)>)> {
let node = self.node_entry(v);
GenLockedIter::from(node, |node| match w {
Some(range) => {
let range = TimeIndexEntry::range(range);
node.as_ref()
.temp_prop_rows_window(range)
.map(|(t, row)| (t, row.into_iter().map(|(_, p)| p).collect()))
.map(|(t, row)| {
(
t,
row.into_iter()
.filter_map(|(prop_id, p)| p.map(|p| (prop_id, p)))
.collect(),
)
})
.into_dyn_boxed()
}
None => node
.as_ref()
.temp_prop_rows()
.map(|(t, row)| (t, row.into_iter().map(|(_, p)| p).collect()))
.map(|(t, row)| {
(
t,
row.into_iter()
.filter_map(|(prop_id, p)| p.map(|p| (prop_id, p)))
.collect(),
)
})
.into_dyn_boxed(),
})
.into_dyn_boxed()
Expand Down
48 changes: 12 additions & 36 deletions raphtory/src/db/api/view/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
api::{
mutation::internal::InternalAdditionOps,
properties::{
internal::{ConstPropertiesOps, TemporalPropertiesOps},
internal::{ConstPropertiesOps, TemporalPropertiesOps, TemporalPropertiesRowView},
Properties,
},
storage::graph::{
Expand Down Expand Up @@ -245,38 +245,11 @@ impl<'graph, G: BoxableGraphView + Sized + Clone + 'graph> GraphViewOps<'graph>
.get_mut()
.update_t_prop_time(TimeIndexEntry::start(earliest), None)
}
// for (t, eid) in self.node_edge_history(node.node, None) {
// new_node.get_mut().update_time(t, eid);
// }

let node_entry = self.core_node_entry(node.node);

let props = 0..self.node_meta().temporal_prop_meta().len();
let start = self.view_start();
let end = self.view_end();

// let rows = if let Some(range) = start.zip(end).map(|(s, e)| s..e) {
// node_entry
// .as_ref()
// .temp_prop_rows_window(
// props.clone(),
// TimeIndexEntry::start(range.start)
// ..TimeIndexEntry::start(range.end),
// )
// .into_dyn_boxed()
// } else {
// node_entry
// .as_ref()
// .temp_prop_rows(props.clone())
// .into_dyn_boxed()
// };

// for (time, row) in rows {
// let prop_offset = new_node
// .t_props_log_mut()
// .push(row.into_iter().filter_map(|(id, prop)| Some((id, prop?))))?;
// new_node.get_mut().update_t_prop_time(time, prop_offset);
// }

for (t, rows) in node.rows() {
let prop_offset = new_node.t_props_log_mut().push(rows)?;
new_node.get_mut().update_t_prop_time(t, prop_offset);
}

for c_prop_id in node.const_prop_ids() {
if let Some(prop_value) = node.get_const_prop(c_prop_id) {
Expand Down Expand Up @@ -338,9 +311,10 @@ impl<'graph, G: BoxableGraphView + Sized + Clone + 'graph> GraphViewOps<'graph>
new_storage.nodes.par_iter_mut().try_for_each(|mut shard| {
for (eid, edge) in self.edges().iter().enumerate() {
if let Some(src_node) = shard.get_mut(node_map[edge.edge.src().index()]) {
for t in self.edge_history(edge.edge, self.layer_ids()) {
src_node.update_time(t, EID(eid));
}
for ee in edge.explode_layers() {
todo!("Add timestamps updates here from edge");
// self.edge_history(ee.edge, self.layer_ids())
src_node.add_edge(
node_map[edge.edge.dst().index()],
Direction::OUT,
Expand All @@ -350,8 +324,10 @@ impl<'graph, G: BoxableGraphView + Sized + Clone + 'graph> GraphViewOps<'graph>
}
}
if let Some(dst_node) = shard.get_mut(node_map[edge.edge.dst().index()]) {
for t in self.edge_history(edge.edge, self.layer_ids()) {
dst_node.update_time(t, EID(eid));
}
for ee in edge.explode_layers() {
todo!("Add timestamps updates here from edge");
dst_node.add_edge(
node_map[edge.edge.src().index()],
Direction::IN,
Expand Down
4 changes: 2 additions & 2 deletions raphtory/src/db/api/view/internal/time_semantics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub trait TimeSemantics {
&self,
v: VID,
w: Option<Range<i64>>,
) -> BoxedLIter<(TimeIndexEntry, Vec<Option<Prop>>)>;
) -> BoxedLIter<(TimeIndexEntry, Vec<(usize, Prop)>)>;

fn edge_history<'a>(
&'a self,
Expand Down Expand Up @@ -678,7 +678,7 @@ impl<G: DelegateTimeSemantics + ?Sized> TimeSemantics for G {
&self,
v: VID,
w: Option<Range<i64>>,
) -> BoxedLIter<(TimeIndexEntry, Vec<Option<Prop>>)> {
) -> BoxedLIter<(TimeIndexEntry, Vec<(usize, Prop)>)> {
self.graph().node_history_rows(v, w)
}
}
45 changes: 23 additions & 22 deletions raphtory/src/db/graph/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1404,7 +1404,7 @@ mod db_tests {
.node(1)
.unwrap()
.rows()
.map(|(t, row)| (t, row.into_iter().flatten().collect::<Vec<_>>()))
.map(|(t, row)| (t, row.into_iter().map(|(_, a)| a).collect::<Vec<_>>()))
.collect::<Vec<_>>();

assert_eq!(actual, vec![(0.into(), vec![Prop::Bool(true)])]);
Expand All @@ -1419,7 +1419,7 @@ mod db_tests {
.node(1)
.unwrap()
.rows()
.map(|(t, row)| (t, row.into_iter().flatten().collect::<Vec<_>>()))
.map(|(t, row)| (t, row.into_iter().map(|(_, a)| a).collect::<Vec<_>>()))
.collect::<Vec<_>>();

assert_eq!(
Expand Down Expand Up @@ -1448,7 +1448,7 @@ mod db_tests {
.node(1)
.unwrap()
.rows()
.map(|(t, row)| (t, row.into_iter().flatten().collect::<Vec<_>>()))
.map(|(t, row)| (t, row.into_iter().map(|(_, a)| a).collect::<Vec<_>>()))
.collect::<Vec<_>>();

let expected = vec![
Expand All @@ -1471,7 +1471,7 @@ mod db_tests {
.node(1)
.unwrap()
.rows()
.map(|(t, row)| (t, row.into_iter().flatten().collect::<Vec<_>>()))
.map(|(t, row)| (t, row.into_iter().map(|(_, a)| a).collect::<Vec<_>>()))
.collect::<Vec<_>>();

let expected = vec![
Expand Down Expand Up @@ -1502,21 +1502,23 @@ mod db_tests {
.add_node(2, 3, [("cool".to_string(), Prop::U64(3))], None)
.unwrap();

for id in 0..3 {
let actual = graph
.core_graph()
.nodes()
.node(VID(id))
.temp_prop_rows()
.map(|(t, row)| (t, row.into_iter().map(|(_, p)| p).collect::<Vec<_>>()))
.collect::<Vec<_>>();
test_storage!(&graph, |graph| {
for id in 0..3 {
let actual = graph
.core_graph()
.nodes()
.node(VID(id))
.temp_prop_rows()
.map(|(t, row)| (t, row.into_iter().map(|(_, p)| p).collect::<Vec<_>>()))
.collect::<Vec<_>>();

let expected = vec![(
TimeIndexEntry::new(id as i64, id),
vec![Some(Prop::U64((id as u64) + 1))],
)];
assert_eq!(actual, expected);
}
let expected = vec![(
TimeIndexEntry::new(id as i64, id),
vec![Some(Prop::U64((id as u64) + 1))],
)];
assert_eq!(actual, expected);
}
});
}

#[test]
Expand Down Expand Up @@ -1577,8 +1579,7 @@ mod db_tests {
.add_node(3, 1, [("cool".to_string(), Prop::Bool(false))], None)
.unwrap();

// FIXME: boolean properties not yet supported (Issue #48)
test_graph(&graph, |graph| {
test_storage!(&graph, |graph| {
let wg = graph.window(3, 15);
let v = wg.node(1).unwrap();

Expand Down Expand Up @@ -2040,7 +2041,7 @@ mod db_tests {

let actual = node
.rows()
.map(|(t, vals)| (t, vals.into_iter().flatten().collect()))
.map(|(t, row)| (t, row.into_iter().map(|(_, a)| a).collect::<Vec<_>>()))
.collect::<Vec<_>>();

let expected = vec![
Expand All @@ -2059,7 +2060,7 @@ mod db_tests {

let actual = node
.rows()
.map(|(t, vals)| (t, vals.into_iter().flatten().collect()))
.map(|(t, row)| (t, row.into_iter().map(|(_, a)| a).collect::<Vec<_>>()))
.collect::<Vec<_>>();

let expected = vec![(TimeIndexEntry::new(1, 0), vec![Prop::U64(1)])];
Expand Down
2 changes: 1 addition & 1 deletion raphtory/src/db/graph/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ impl<G, GH: CoreGraphOps + TimeSemantics> TemporalPropertyViewOps for NodeView<G
}

impl<G, GH: CoreGraphOps + TimeSemantics> TemporalPropertiesRowView for NodeView<G, GH> {
fn rows(&self) -> BoxedLIter<(TimeIndexEntry, Vec<Option<Prop>>)> {
fn rows(&self) -> BoxedLIter<(TimeIndexEntry, Vec<(usize, Prop)>)> {
self.graph.node_history_rows(self.node, None)
}

Expand Down
2 changes: 1 addition & 1 deletion raphtory/src/db/graph/views/deletion_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ impl TimeSemantics for PersistentGraph {
&self,
v: VID,
w: Option<Range<i64>>,
) -> BoxedLIter<(TimeIndexEntry, Vec<Option<Prop>>)> {
) -> BoxedLIter<(TimeIndexEntry, Vec<(usize, Prop)>)> {
self.0.node_history_rows(v, w)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ impl<'graph, G: GraphViewOps<'graph>> TimeSemantics for ExplodedEdgePropertyFilt
&self,
v: VID,
w: Option<Range<i64>>,
) -> BoxedLIter<(TimeIndexEntry, Vec<Option<Prop>>)> {
) -> BoxedLIter<(TimeIndexEntry, Vec<(usize, Prop)>)> {
self.graph.node_history_rows(v, w)
}

Expand Down
2 changes: 1 addition & 1 deletion raphtory/src/db/graph/views/window_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ impl<'graph, G: GraphViewOps<'graph>> TimeSemantics for WindowedGraph<G> {
&self,
v: VID,
w: Option<Range<i64>>,
) -> BoxedLIter<(TimeIndexEntry, Vec<Option<Prop>>)> {
) -> BoxedLIter<(TimeIndexEntry, Vec<(usize, Prop)>)> {
if self.window_is_empty() {
return Box::new(std::iter::empty());
}
Expand Down
23 changes: 9 additions & 14 deletions raphtory/src/disk_graph/graph_impl/prop_conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use crate::{
},
core::{
entities::{properties::props::PropMapper, VID},
storage::timeindex::TimeIndexOps,
utils::iter::GenLockedIter,
PropType,
},
Expand Down Expand Up @@ -52,25 +51,21 @@ pub fn make_node_properties_from_graph(
let builder = NodePropsBuilder::new(n, graph_dir)
.with_timestamps(|vid| {
let node = gs.node_entry(vid);
let additions = node.additions();
additions.iter().collect()
node.as_ref().temp_prop_rows().map(|(ts, _)| ts).collect()
})
.with_temporal_props(temporal_prop_keys, |prop_id, prop_key, ts, offsets| {
let prop_type = temporal_meta.get_dtype(prop_id).unwrap();
let col = arrow_array_from_props(
(0..n).flat_map(|vid| {
// let ts = node_ts(VID(vid), offsets, ts);
let ts = node_ts(VID(vid), offsets, ts);
let node = gs.node_entry(VID(vid));
let t_prop_col = node.as_ref().tprop(prop_id);
// let iter =
// GenLockedIter::from(node, |node| Box::new(node.tprop(prop_id).iter()));
// iter.merge_join_by(ts, |(t2, _), &t1| t2.cmp(t1))
// .map(|result| match result {
// itertools::EitherOrBoth::Both((_, t_prop), _) => Some(t_prop),
// _ => None,
// })

t_prop_col.iter()
let iter =
GenLockedIter::from(node, |node| Box::new(node.tprop(prop_id).iter()));
iter.merge_join_by(ts, |(t2, _), &t1| t2.cmp(t1))
.map(|result| match result {
itertools::EitherOrBoth::Both((_, t_prop), _) => Some(t_prop),
_ => None,
})
}),
prop_type,
);
Expand Down
Loading

0 comments on commit 243595d

Please sign in to comment.