diff --git a/python/src/lib.rs b/python/src/lib.rs index bfe089b579..3d159efea0 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -25,6 +25,7 @@ use raphtory_core::python::{ }; use raphtory_storage::python::packages::algorithms::*; +#[cfg(feature = "arrow")] use raphtory_core::python::graph::arrow::{PyArrowGraph, PyGraphQuery, PyState}; macro_rules! add_functions { diff --git a/raphtory/src/algorithms/motifs/triangle_count.rs b/raphtory/src/algorithms/motifs/triangle_count.rs index 611ac09f4d..6c95d0973c 100644 --- a/raphtory/src/algorithms/motifs/triangle_count.rs +++ b/raphtory/src/algorithms/motifs/triangle_count.rs @@ -76,7 +76,7 @@ pub fn triangle_count(graph: &G, threads: Option) let step1 = ATask::new(move |s: &mut EvalNodeView, ()>| { for t in s.neighbours() { if s.node > t.node { - t.update(&neighbours_set, s.node.node); + t.update(&neighbours_set, s.node); } } Step::Continue diff --git a/raphtory/src/db/task/edge/eval_edge.rs b/raphtory/src/db/task/edge/eval_edge.rs index ba97f880b7..5f0a19915f 100644 --- a/raphtory/src/db/task/edge/eval_edge.rs +++ b/raphtory/src/db/task/edge/eval_edge.rs @@ -18,11 +18,13 @@ use crate::{ use crate::db::task::edge::eval_edges::EvalEdges; +use crate::db::api::storage::storage_ops::GraphStorage; use std::{cell::RefCell, rc::Rc}; pub struct EvalEdgeView<'graph, 'a, G, GH, CS: Clone, S> { pub(crate) ss: usize, pub(crate) edge: EdgeView<&'graph G, GH>, + pub(crate) storage: &'graph GraphStorage, pub(crate) node_state: Rc>>, pub(crate) local_state_prev: &'graph PrevLocalState<'a, S>, } @@ -39,12 +41,14 @@ impl< pub(crate) fn new( ss: usize, edge: EdgeView<&'graph G, GH>, + storage: &'graph GraphStorage, node_state: Rc>>, local_state_prev: &'graph PrevLocalState<'a, S>, ) -> Self { Self { ss, edge, + storage, node_state, local_state_prev, } @@ -97,9 +101,13 @@ impl< let ss = self.ss; let node_state = self.node_state.clone(); let local_state_prev = self.local_state_prev; + let storage = self.storage; EvalNodeView { ss, - node, + node: node.node, + graph: node.base_graph, + base_graph: node.base_graph, + storage, local_state: None, local_state_prev, node_state, @@ -117,9 +125,11 @@ impl< let ss = self.ss; let node_state = self.node_state.clone(); let local_state_prev = self.local_state_prev; + let storage = self.storage; EvalEdges { ss, edges, + storage, node_state, local_state_prev, } @@ -139,6 +149,7 @@ impl< Self { ss: self.ss, edge: self.edge.clone(), + storage: self.storage, node_state: self.node_state.clone(), local_state_prev: self.local_state_prev, } @@ -174,6 +185,7 @@ impl< EvalEdgeView::new( self.ss, edge, + self.storage, self.node_state.clone(), self.local_state_prev, ) diff --git a/raphtory/src/db/task/edge/eval_edges.rs b/raphtory/src/db/task/edge/eval_edges.rs index e9b783de45..5021b24962 100644 --- a/raphtory/src/db/task/edge/eval_edges.rs +++ b/raphtory/src/db/task/edge/eval_edges.rs @@ -6,6 +6,7 @@ use crate::{ db::{ api::{ properties::Properties, + storage::storage_ops::GraphStorage, view::{internal::OneHopFilter, BaseEdgeViewOps, BoxedLIter}, }, graph::edges::Edges, @@ -22,6 +23,7 @@ use std::{cell::RefCell, rc::Rc}; pub struct EvalEdges<'graph, 'a, G, GH, CS: Clone, S> { pub(crate) ss: usize, pub(crate) edges: Edges<'graph, &'graph G, GH>, + pub(crate) storage: &'graph GraphStorage, pub(crate) node_state: Rc>>, pub(crate) local_state_prev: &'graph PrevLocalState<'a, S>, } @@ -57,9 +59,11 @@ impl<'graph, 'a: 'graph, G: GraphViewOps<'graph>, GH: GraphViewOps<'graph>, CS: let ss = self.ss; let node_state = self.node_state.clone(); let local_state_prev = self.local_state_prev; + let storage = self.storage; EvalEdges { ss, edges, + storage, node_state, local_state_prev, } @@ -79,9 +83,11 @@ impl< let node_state = self.node_state.clone(); let ss = self.ss; let local_state_prev = self.local_state_prev; + let storage = self.storage; self.edges.iter().map(move |edge| EvalEdgeView { ss, edge, + storage, node_state: node_state.clone(), local_state_prev, }) @@ -104,9 +110,11 @@ impl< let node_state = self.node_state; let ss = self.ss; let local_state_prev = self.local_state_prev; + let storage = self.storage; Box::new(self.edges.iter().map(move |edge| EvalEdgeView { ss, edge, + storage, node_state: node_state.clone(), local_state_prev, })) @@ -159,11 +167,15 @@ impl< let node_state = self.node_state.clone(); let local_state_prev = self.local_state_prev; let path = self.edges.map_nodes(op); + let base_graph = self.edges.base_graph; EvalPathFromNode { - path, + graph: base_graph, + base_graph: base_graph, + op: path.op, ss, node_state, local_state_prev, + storage: self.storage, } } @@ -178,8 +190,10 @@ impl< let node_state = self.node_state.clone(); let local_state_prev = self.local_state_prev; let edges = self.edges.map_exploded(op); + let storage = self.storage; Self { ss, + storage, node_state, local_state_prev, edges, diff --git a/raphtory/src/db/task/node/eval_node.rs b/raphtory/src/db/task/node/eval_node.rs index f44d15780a..a799159c09 100644 --- a/raphtory/src/db/task/node/eval_node.rs +++ b/raphtory/src/db/task/node/eval_node.rs @@ -12,9 +12,9 @@ use crate::{ api::{ properties::Properties, storage::storage_ops::GraphStorage, - view::{internal::OneHopFilter, BaseNodeViewOps}, + view::{internal::OneHopFilter, BaseNodeViewOps, BoxedLIter, IntoDynBoxed}, }, - graph::{node::NodeView, path::PathFromNode}, + graph::{edges::Edges, node::NodeView, path::PathFromNode}, task::{ edge::eval_edges::EvalEdges, node::eval_node_state::EVState, task_state::PrevLocalState, }, @@ -24,11 +24,15 @@ use crate::{ use std::{ cell::{Ref, RefCell}, rc::Rc, + sync::Arc, }; pub struct EvalNodeView<'graph, 'a: 'graph, G, S, GH = &'graph G, CS: Clone = ComputeStateVec> { pub(crate) ss: usize, - pub(crate) node: NodeView<&'graph G, GH>, + pub(crate) node: VID, + pub(crate) base_graph: &'graph G, + pub(crate) graph: GH, + pub(crate) storage: &'graph GraphStorage, pub(crate) local_state: Option<&'graph mut S>, pub(crate) local_state_prev: &'graph PrevLocalState<'a, S>, pub(crate) node_state: Rc>>, @@ -39,20 +43,19 @@ impl<'graph, 'a: 'graph, G: GraphViewOps<'graph>, CS: ComputeState + 'a, S> { pub(crate) fn new_local( ss: usize, - v_ref: VID, + node: VID, g: &'graph G, + storage: &'graph GraphStorage, local_state: Option<&'graph mut S>, local_state_prev: &'graph PrevLocalState<'a, S>, node_state: Rc>>, ) -> Self { - let node = NodeView { - base_graph: g, - graph: g, - node: v_ref, - }; Self { ss, node, + base_graph: g, + graph: g, + storage, local_state, local_state_prev, node_state, @@ -70,9 +73,12 @@ impl< > Clone for EvalNodeView<'graph, 'a, G, S, GH, CS> { fn clone(&self) -> Self { - EvalNodeView::new_from_node( + EvalNodeView::new_filtered( self.ss, - self.node.clone(), + self.node, + self.base_graph, + self.graph.clone(), + self.storage, None, self.local_state_prev, self.node_state.clone(), @@ -90,10 +96,10 @@ impl< > EvalNodeView<'graph, 'a, G, S, GH, CS> { pub fn graph(&self) -> GH { - self.node.graph.clone() + self.graph.clone() } pub fn prev(&self) -> &S { - let VID(i) = self.node.node; + let VID(i) = self.node; &self.local_state_prev.state[i] } @@ -111,9 +117,12 @@ impl< } } - pub(crate) fn new_from_node( + pub(crate) fn new_filtered( ss: usize, - node: NodeView<&'graph G, GH>, + node: VID, + base_graph: &'graph G, + graph: GH, + storage: &'graph GraphStorage, local_state: Option<&'graph mut S>, local_state_prev: &'graph PrevLocalState<'a, S>, node_state: Rc>>, @@ -121,27 +130,17 @@ impl< Self { ss, node, + base_graph, + graph, + storage, local_state, local_state_prev, node_state, } } - pub(crate) fn update_node>( - &self, - node: NodeView<&'graph G, GHH>, - ) -> EvalNodeView<'graph, 'a, G, S, GHH, CS> { - EvalNodeView::new_from_node( - self.ss, - node, - None, - self.local_state_prev, - self.node_state.clone(), - ) - } - fn pid(&self) -> usize { - let VID(i) = self.node.node; + let VID(i) = self.node; i } @@ -222,7 +221,7 @@ impl< A: StateType, OUT: std::fmt::Debug, { - Entry::new(self.node_state.borrow(), *agg_r, &self.node.node, self.ss) + Entry::new(self.node_state.borrow(), *agg_r, &self.node, self.ss) } /// Read the prev value of the node state using the given accumulator. @@ -266,7 +265,10 @@ pub struct EvalPathFromNode< CS: ComputeState, S, > { - pub(crate) path: PathFromNode<'graph, &'graph G, GH>, + pub graph: GH, + pub(crate) base_graph: &'graph G, + pub(crate) op: Arc BoxedLIter<'graph, VID> + Send + Sync + 'graph>, + pub(crate) storage: &'graph GraphStorage, pub(crate) ss: usize, pub(crate) node_state: Rc>>, pub(crate) local_state_prev: &'graph PrevLocalState<'a, S>, @@ -281,13 +283,29 @@ impl< GH: GraphViewOps<'graph>, > EvalPathFromNode<'graph, 'a, G, GH, CS, S> { + fn iter_refs(&self) -> impl Iterator + 'graph { + (self.op)() + } + pub fn iter(&self) -> impl Iterator> + 'graph { let local = self.local_state_prev; let node_state = self.node_state.clone(); let ss = self.ss; - self.path - .iter() - .map(move |v| EvalNodeView::new_from_node(ss, v, None, local, node_state.clone())) + let base_graph = self.base_graph; + let graph = self.graph.clone(); + let storage = self.storage; + self.iter_refs().map(move |v| { + EvalNodeView::new_filtered( + ss, + v, + base_graph, + graph.clone(), + storage, + None, + local, + node_state.clone(), + ) + }) } } @@ -319,7 +337,10 @@ impl< { fn clone(&self) -> Self { EvalPathFromNode { - path: self.path.clone(), + graph: self.graph.clone(), + base_graph: self.base_graph, + op: self.op.clone(), + storage: self.storage, ss: self.ss, node_state: self.node_state.clone(), local_state_prev: self.local_state_prev, @@ -350,11 +371,13 @@ impl< &self, op: F, ) -> Self::ValueType { - self.path.map(op) + let graph = self.graph.clone(); + let storage = self.storage; + Box::new(self.iter_refs().map(move |node| op(storage, &graph, node))) } fn as_props(&self) -> Self::ValueType> { - self.path.as_props() + self.map(|_cg, g, v| Properties::new(NodeView::new_internal(g.clone(), v))) } fn map_edges< @@ -367,12 +390,19 @@ impl< let local_state_prev = self.local_state_prev; let node_state = self.node_state.clone(); let ss = self.ss; - let edges = self.path.map_edges(op); + let storage = self.storage; + let path = PathFromNode::new_one_hop_filtered( + self.base_graph, + self.graph.clone(), + self.op.clone(), + ); + let edges = path.map_edges(op); EvalEdges { ss, edges, node_state, local_state_prev, + storage, } } @@ -383,12 +413,24 @@ impl< &self, op: F, ) -> Self::PathType { - let path = self.path.hop(op); + let old_op = self.op.clone(); + let graph = self.graph.clone(); + let storage = self.storage; + let new_op = Arc::new(move || { + let op = op.clone(); + let graph = graph.clone(); + old_op() + .flat_map(move |vv| op(storage, &graph, vv)) + .into_dyn_boxed() + }); let ss = self.ss; let node_state = self.node_state.clone(); let local_state_prev = self.local_state_prev; EvalPathFromNode { - path, + graph: self.base_graph, + base_graph: self.base_graph, + op: new_op, + storage, ss, node_state, local_state_prev, @@ -410,23 +452,26 @@ impl< type Filtered> = EvalPathFromNode<'graph, 'a, G, GHH, CS, S>; fn current_filter(&self) -> &Self::FilteredGraph { - self.path.current_filter() + &self.graph } fn base_graph(&self) -> &Self::BaseGraph { - &self.path.base_graph + &self.base_graph } fn one_hop_filtered>( &self, filtered_graph: GHH, ) -> Self::Filtered { - let path = self.path.one_hop_filtered(filtered_graph); + let storage = self.storage; let local_state_prev = self.local_state_prev; let node_state = self.node_state.clone(); let ss = self.ss; EvalPathFromNode { - path, + graph: filtered_graph, + base_graph: self.base_graph, + op: self.op.clone(), + storage, ss, node_state, local_state_prev, @@ -448,19 +493,27 @@ impl< type Filtered> = EvalNodeView<'graph, 'a, G, S, GHH, CS>; fn current_filter(&self) -> &Self::FilteredGraph { - &self.node.graph + &self.graph } fn base_graph(&self) -> &Self::BaseGraph { - &self.node.base_graph + &self.base_graph } fn one_hop_filtered>( &self, filtered_graph: GHH, ) -> Self::Filtered { - let node = self.node.one_hop_filtered(filtered_graph); - self.update_node(node) + EvalNodeView::new_filtered( + self.ss, + self.node, + self.base_graph, + filtered_graph, + self.storage, + None, + self.local_state_prev, + self.node_state.clone(), + ) } } @@ -476,7 +529,7 @@ impl< type BaseGraph = &'graph G; type Graph = GH; type ValueType = T where T: 'graph; - type PropType = NodeView<&'graph G, GH>; + type PropType = NodeView; type PathType = EvalPathFromNode<'graph, 'a, G, &'graph G, CS, S>; type Edges = EvalEdges<'graph, 'a, G, GH, CS, S>; @@ -487,11 +540,11 @@ impl< &self, op: F, ) -> Self::ValueType { - self.node.map(op) + op(self.storage, &self.graph, self.node) } fn as_props(&self) -> Self::ValueType> { - self.node.as_props() + Properties::new(NodeView::new_internal(self.graph.clone(), self.node)) } fn map_edges< @@ -504,12 +557,21 @@ impl< let ss = self.ss; let local_state_prev = self.local_state_prev; let node_state = self.node_state.clone(); - let edges = self.node.map_edges(op); + let node = self.node; + let storage = self.storage; + let graph = self.graph.clone(); + let edges = Arc::new(move || op(storage, &graph, node).into_dyn_boxed()); + let edges = Edges { + base_graph: self.base_graph, + graph: self.graph.clone(), + edges, + }; EvalEdges { ss, edges, node_state, local_state_prev, + storage, } } @@ -520,12 +582,19 @@ impl< &self, op: F, ) -> Self::PathType { - let path = self.node.hop(op); + let graph = self.graph.clone(); + let node = self.node; + let storage = self.storage; + + let path_op = Arc::new(move || op(storage, &graph, node).into_dyn_boxed()); let ss = self.ss; let local_state_prev = self.local_state_prev; let node_state = self.node_state.clone(); EvalPathFromNode { - path, + graph: self.base_graph, + base_graph: self.base_graph, + op: path_op, + storage, local_state_prev, node_state, ss, diff --git a/raphtory/src/db/task/task_runner.rs b/raphtory/src/db/task/task_runner.rs index 049a8b877d..735ed98b20 100644 --- a/raphtory/src/db/task/task_runner.rs +++ b/raphtory/src/db/task/task_runner.rs @@ -14,7 +14,7 @@ use crate::{ }, }, db::{ - api::view::StaticGraphViewOps, + api::{storage::storage_ops::GraphStorage, view::StaticGraphViewOps}, task::node::{eval_node::EvalNodeView, eval_node_state::EVState}, }, }; @@ -50,6 +50,7 @@ impl TaskRunner { global_state: &Global, morcel: &mut [S], prev_local_state: &Vec, + storage: &GraphStorage, atomic_done: &AtomicBool, morcel_size: usize, morcel_id: usize, @@ -73,6 +74,7 @@ impl TaskRunner { self.ctx.ss(), v_ref.into(), &g, + storage, Some(local_state), &local, node_state.clone(), @@ -124,6 +126,7 @@ impl TaskRunner { global_state: Global, mut local_state: Vec, prev_local_state: &Vec, + storage: &GraphStorage, ) -> (bool, Shard, Global, Vec) { pool.install(move || { let mut new_shard_state = shard_state; @@ -144,6 +147,7 @@ impl TaskRunner { &new_global_state, morcel, prev_local_state, + storage, &atomic_done, morcel_size, morcel_id, @@ -161,6 +165,7 @@ impl TaskRunner { &new_global_state, morcel, prev_local_state, + storage, &atomic_done, morcel_size, morcel_id, @@ -220,6 +225,7 @@ impl TaskRunner { let pool = num_threads.map(custom_pool).unwrap_or_else(|| POOL.clone()); let num_nodes = self.ctx.graph().unfiltered_num_nodes(); + let storage = self.ctx.graph().core_graph(); let morcel_size = num_nodes.min(16_000); let num_chunks = if morcel_size == 0 { 1 @@ -245,6 +251,7 @@ impl TaskRunner { global_state, cur_local_state, &prev_local_state, + &storage, ); // To allow the init step to cache stuff we will copy everything from cur_local_state to prev_local_state @@ -259,6 +266,7 @@ impl TaskRunner { global_state, cur_local_state, &prev_local_state, + &storage, ); // copy and reset the state from the step that just ended