From 49d2647134f01fbd49e669e209d083cab5ad460f Mon Sep 17 00:00:00 2001 From: Lucas Jeub Date: Mon, 27 May 2024 12:17:12 +0200 Subject: [PATCH 01/15] put the eval context into a struct and expose methods for getting arbitrary eval nodes and edges --- examples/rust/src/bin/hulongbay/main.rs | 18 +- .../global_temporal_three_node_motifs.rs | 21 +- .../local_temporal_three_node_motifs.rs | 8 +- .../src/db/api/storage/nodes/node_entry.rs | 4 + .../db/api/storage/nodes/node_owned_entry.rs | 4 + raphtory/src/db/api/storage/nodes/node_ref.rs | 4 + .../db/api/storage/nodes/node_storage_ops.rs | 6 + raphtory/src/db/api/view/node.rs | 4 +- raphtory/src/db/task/edge/eval_edge.rs | 16 +- raphtory/src/db/task/edge/eval_edges.rs | 15 +- raphtory/src/db/task/eval_graph.rs | 61 +++++ raphtory/src/db/task/mod.rs | 1 + raphtory/src/db/task/node/eval_node.rs | 212 +++++++----------- raphtory/src/db/task/task_runner.rs | 24 +- raphtory/src/db/task/task_state.rs | 2 +- 15 files changed, 217 insertions(+), 183 deletions(-) create mode 100644 raphtory/src/db/task/eval_graph.rs diff --git a/examples/rust/src/bin/hulongbay/main.rs b/examples/rust/src/bin/hulongbay/main.rs index d3702e10f1..9ec0207363 100644 --- a/examples/rust/src/bin/hulongbay/main.rs +++ b/examples/rust/src/bin/hulongbay/main.rs @@ -2,7 +2,13 @@ #![allow(dead_code)] use itertools::Itertools; use raphtory::{ - algorithms::{components::weakly_connected_components, motifs::triangle_count::triangle_count}, + algorithms::{ + components::weakly_connected_components, + motifs::{ + global_temporal_three_node_motifs::global_temporal_three_node_motif, + triangle_count::triangle_count, + }, + }, graph_loader::source::csv_loader::CsvLoader, prelude::*, }; @@ -207,8 +213,16 @@ fn try_main_bm() -> Result<(), Box> { Ok(()) } +fn try_motif() -> Result<(), Box> { + let args: Vec = env::args().collect(); + let data_dir = Path::new(args.get(1).ok_or(MissingArgumentError)?); + let graph = loader(data_dir)?; + global_temporal_three_node_motif(&graph, 3600, None); + Ok(()) +} + fn main() { - if let Err(e) = try_main_bm() { + if let Err(e) = try_motif() { eprintln!("Failed: {}", e); std::process::exit(1) } diff --git a/raphtory/src/algorithms/motifs/global_temporal_three_node_motifs.rs b/raphtory/src/algorithms/motifs/global_temporal_three_node_motifs.rs index f57c9f50f5..a2b6bd2699 100644 --- a/raphtory/src/algorithms/motifs/global_temporal_three_node_motifs.rs +++ b/raphtory/src/algorithms/motifs/global_temporal_three_node_motifs.rs @@ -23,15 +23,11 @@ use std::collections::HashMap; /////////////////////////////////////////////////////// -pub fn star_motif_count( - graph: &G, - evv: &EvalNodeView, - deltas: Vec, -) -> Vec<[usize; 32]> +pub fn star_motif_count(evv: &EvalNodeView, deltas: Vec) -> Vec<[usize; 32]> where G: StaticGraphViewOps, { - let two_n_c = twonode_motif_count(graph, evv, deltas.clone()); + let two_n_c = twonode_motif_count(evv, deltas.clone()); let neigh_map: HashMap = evv .neighbours() .into_iter() @@ -75,11 +71,7 @@ where /////////////////////////////////////////////////////// -pub fn twonode_motif_count( - graph: &G, - evv: &EvalNodeView, - deltas: Vec, -) -> Vec<[usize; 8]> +pub fn twonode_motif_count(evv: &EvalNodeView, deltas: Vec) -> Vec<[usize; 8]> where G: StaticGraphViewOps, { @@ -87,8 +79,8 @@ where for nb in evv.neighbours().into_iter() { let nb_id = nb.id(); - let out = graph.edge(evv.id(), nb_id); - let inc = graph.edge(nb_id, evv.id()); + let out = evv.graph().edge(evv.id(), nb_id); + let inc = evv.graph().edge(nb_id, evv.id()); let events: Vec = out .iter() .flat_map(|e| e.explode()) @@ -265,8 +257,7 @@ where let out1 = triangle_motifs(g, deltas.clone(), threads); let step1 = ATask::new(move |evv: &mut EvalNodeView| { - let g = evv.graph(); - let star_nodes = star_motif_count(g, evv, deltas.clone()); + let star_nodes = star_motif_count(evv, deltas.clone()); for (i, star) in star_nodes.iter().enumerate() { evv.global_update(&star_mc[i], *star); } diff --git a/raphtory/src/algorithms/motifs/local_temporal_three_node_motifs.rs b/raphtory/src/algorithms/motifs/local_temporal_three_node_motifs.rs index 794dbdf5ec..b8ecc03656 100644 --- a/raphtory/src/algorithms/motifs/local_temporal_three_node_motifs.rs +++ b/raphtory/src/algorithms/motifs/local_temporal_three_node_motifs.rs @@ -126,7 +126,6 @@ where /////////////////////////////////////////////////////// pub fn twonode_motif_count<'a, 'b, G, GH>( - graph: &'a G, evv: &'a EvalNodeView<'b, '_, G, MotifCounter, GH>, deltas: Vec, ) -> Vec<[usize; 8]> @@ -144,8 +143,8 @@ where for nb in evv.neighbours().into_iter() { let nb_id = nb.id(); - let out = graph.edge(evv.id(), nb_id); - let inc = graph.edge(nb_id, evv.id()); + let out = evv.graph().edge(evv.id(), nb_id); + let inc = evv.graph().edge(nb_id, evv.id()); let events: Vec = out .iter() .flat_map(|e| e.explode()) @@ -337,8 +336,7 @@ where let out1 = triangle_motifs(g, deltas.clone(), motifs_counter, threads); let step1 = ATask::new(move |evv: &mut EvalNodeView| { - let g = evv.graph(); - let two_nodes = twonode_motif_count(g, evv, deltas.clone()); + let two_nodes = twonode_motif_count(evv, deltas.clone()); let star_nodes = star_motif_count(evv, deltas.clone()); *evv.get_mut() = MotifCounter::new( diff --git a/raphtory/src/db/api/storage/nodes/node_entry.rs b/raphtory/src/db/api/storage/nodes/node_entry.rs index 4d8e3b8398..d699b2b59e 100644 --- a/raphtory/src/db/api/storage/nodes/node_entry.rs +++ b/raphtory/src/db/api/storage/nodes/node_entry.rs @@ -97,6 +97,10 @@ impl<'a, 'b: 'a> NodeStorageOps<'a> for &'a NodeStorageEntry<'b> { for_all!(self, node => node.vid()) } + fn id(self) -> u64 { + for_all!(self, node => node.id()) + } + fn name(self) -> Option<&'a str> { for_all!(self, node => node.name()) } diff --git a/raphtory/src/db/api/storage/nodes/node_owned_entry.rs b/raphtory/src/db/api/storage/nodes/node_owned_entry.rs index 33b3990bbb..8d16aa3297 100644 --- a/raphtory/src/db/api/storage/nodes/node_owned_entry.rs +++ b/raphtory/src/db/api/storage/nodes/node_owned_entry.rs @@ -94,6 +94,10 @@ impl<'a> NodeStorageOps<'a> for &'a NodeOwnedEntry { for_all!(self, node => node.vid()) } + fn id(self) -> u64 { + for_all!(self, node => node.id()) + } + fn name(self) -> Option<&'a str> { for_all!(self, node => node.name()) } diff --git a/raphtory/src/db/api/storage/nodes/node_ref.rs b/raphtory/src/db/api/storage/nodes/node_ref.rs index 6d26c33d47..2fa626818e 100644 --- a/raphtory/src/db/api/storage/nodes/node_ref.rs +++ b/raphtory/src/db/api/storage/nodes/node_ref.rs @@ -91,6 +91,10 @@ impl<'a> NodeStorageOps<'a> for NodeStorageRef<'a> { for_all!(self, node => node.vid()) } + fn id(self) -> u64 { + for_all!(self, node => node.id()) + } + fn name(self) -> Option<&'a str> { for_all!(self, node => node.name()) } diff --git a/raphtory/src/db/api/storage/nodes/node_storage_ops.rs b/raphtory/src/db/api/storage/nodes/node_storage_ops.rs index 027886d50b..14bb843e67 100644 --- a/raphtory/src/db/api/storage/nodes/node_storage_ops.rs +++ b/raphtory/src/db/api/storage/nodes/node_storage_ops.rs @@ -25,6 +25,8 @@ pub trait NodeStorageOps<'a>: Sized { fn vid(self) -> VID; + fn id(self) -> u64; + fn name(self) -> Option<&'a str>; fn find_edge(self, dst: VID, layer_ids: &LayerIds) -> Option; @@ -59,6 +61,10 @@ impl<'a> NodeStorageOps<'a> for &'a NodeStore { self.vid } + fn id(self) -> u64 { + self.global_id + } + fn name(self) -> Option<&'a str> { self.name.as_str() } diff --git a/raphtory/src/db/api/view/node.rs b/raphtory/src/db/api/view/node.rs index 05f077ea25..5a57a0a68b 100644 --- a/raphtory/src/db/api/view/node.rs +++ b/raphtory/src/db/api/view/node.rs @@ -6,7 +6,7 @@ use crate::{ }, db::api::{ properties::{internal::PropertiesOps, Properties}, - storage::storage_ops::GraphStorage, + storage::{nodes::node_storage_ops::NodeStorageOps, storage_ops::GraphStorage}, view::{ internal::{CoreGraphOps, OneHopFilter, TimeSemantics}, reset_filter::ResetFilter, @@ -175,7 +175,7 @@ impl<'graph, V: BaseNodeViewOps<'graph> + 'graph> NodeViewOps<'graph> for V { #[inline] fn id(&self) -> Self::ValueType { - self.map(|_cg, g, v| g.node_id(v)) + self.map(|cg, _g, v| cg.node(v).id()) } #[inline] fn name(&self) -> Self::ValueType { diff --git a/raphtory/src/db/task/edge/eval_edge.rs b/raphtory/src/db/task/edge/eval_edge.rs index 5f0a19915f..29b022aadf 100644 --- a/raphtory/src/db/task/edge/eval_edge.rs +++ b/raphtory/src/db/task/edge/eval_edge.rs @@ -18,7 +18,7 @@ use crate::{ use crate::db::task::edge::eval_edges::EvalEdges; -use crate::db::api::storage::storage_ops::GraphStorage; +use crate::db::{api::storage::storage_ops::GraphStorage, task::eval_graph::EvalGraph}; use std::{cell::RefCell, rc::Rc}; pub struct EvalEdgeView<'graph, 'a, G, GH, CS: Clone, S> { @@ -102,15 +102,19 @@ impl< let node_state = self.node_state.clone(); let local_state_prev = self.local_state_prev; let storage = self.storage; - EvalNodeView { + let base_graph = self.edge.base_graph; + let eval_graph = EvalGraph { ss, - node: node.node, - graph: node.base_graph, - base_graph: node.base_graph, + base_graph, storage, - local_state: None, local_state_prev, node_state, + }; + EvalNodeView { + node: node.node, + graph: node.base_graph, + eval_graph, + local_state: None, } } diff --git a/raphtory/src/db/task/edge/eval_edges.rs b/raphtory/src/db/task/edge/eval_edges.rs index d11bd896fe..afbea45040 100644 --- a/raphtory/src/db/task/edge/eval_edges.rs +++ b/raphtory/src/db/task/edge/eval_edges.rs @@ -12,6 +12,7 @@ use crate::{ graph::edges::Edges, task::{ edge::eval_edge::EvalEdgeView, + eval_graph::EvalGraph, node::{eval_node::EvalPathFromNode, eval_node_state::EVState}, task_state::PrevLocalState, }, @@ -168,14 +169,18 @@ impl< let local_state_prev = self.local_state_prev; let path = self.edges.map_nodes(op); let base_graph = self.edges.base_graph; + let storage = self.storage; + let eval_graph = EvalGraph { + ss, + base_graph, + storage, + local_state_prev, + node_state, + }; EvalPathFromNode { graph: base_graph, - base_graph, + base_graph: eval_graph, op: path.op, - ss, - node_state, - local_state_prev, - storage: self.storage, } } diff --git a/raphtory/src/db/task/eval_graph.rs b/raphtory/src/db/task/eval_graph.rs new file mode 100644 index 0000000000..421d3c9379 --- /dev/null +++ b/raphtory/src/db/task/eval_graph.rs @@ -0,0 +1,61 @@ +use crate::{ + core::{ + entities::nodes::node_ref::AsNodeRef, + state::compute_state::{ComputeState, ComputeStateVec}, + }, + db::{ + api::storage::storage_ops::GraphStorage, + task::{ + edge::eval_edge::EvalEdgeView, + node::{eval_node::EvalNodeView, eval_node_state::EVState}, + task_state::PrevLocalState, + }, + }, + prelude::GraphViewOps, +}; +use std::{cell::RefCell, rc::Rc}; + +#[derive(Debug)] +pub struct EvalGraph<'graph, 'a, G, S, CS: Clone = ComputeStateVec> { + pub(crate) ss: usize, + pub(crate) base_graph: &'graph G, + pub(crate) storage: &'graph GraphStorage, + pub(crate) local_state_prev: &'graph PrevLocalState<'a, S>, + pub(crate) node_state: Rc>>, +} + +impl<'graph, 'a, G, S, CS: Clone> Clone for EvalGraph<'graph, 'a, G, S, CS> { + fn clone(&self) -> Self { + Self { + ss: self.ss, + base_graph: self.base_graph, + storage: self.storage, + local_state_prev: self.local_state_prev, + node_state: self.node_state.clone(), + } + } +} + +impl<'graph, 'a: 'graph, G: GraphViewOps<'graph>, S: 'static, CS: ComputeState + Clone> + EvalGraph<'graph, 'a, G, S, CS> +{ + pub fn node(&self, n: impl AsNodeRef) -> Option> { + let node = (&self.base_graph).node(n)?; + Some(EvalNodeView::new_local(node.node, self.clone(), None)) + } + + pub fn edge( + &self, + src: N, + dst: N, + ) -> Option> { + let edge = (&self.base_graph).edge(src, dst)?; + Some(EvalEdgeView::new( + self.ss, + edge, + self.storage, + self.node_state.clone(), + self.local_state_prev, + )) + } +} diff --git a/raphtory/src/db/task/mod.rs b/raphtory/src/db/task/mod.rs index afb69622d4..141ef726e9 100644 --- a/raphtory/src/db/task/mod.rs +++ b/raphtory/src/db/task/mod.rs @@ -4,6 +4,7 @@ use std::sync::Arc; pub mod context; pub mod edge; +mod eval_graph; pub mod node; pub mod task; pub mod task_runner; diff --git a/raphtory/src/db/task/node/eval_node.rs b/raphtory/src/db/task/node/eval_node.rs index a799159c09..5181887161 100644 --- a/raphtory/src/db/task/node/eval_node.rs +++ b/raphtory/src/db/task/node/eval_node.rs @@ -12,11 +12,12 @@ use crate::{ api::{ properties::Properties, storage::storage_ops::GraphStorage, - view::{internal::OneHopFilter, BaseNodeViewOps, BoxedLIter, IntoDynBoxed}, + view::{internal::OneHopFilter, Base, BaseNodeViewOps, BoxedLIter, IntoDynBoxed}, }, graph::{edges::Edges, node::NodeView, path::PathFromNode}, task::{ - edge::eval_edges::EvalEdges, node::eval_node_state::EVState, task_state::PrevLocalState, + edge::eval_edges::EvalEdges, eval_graph::EvalGraph, node::eval_node_state::EVState, + task_state::PrevLocalState, }, }, prelude::{GraphViewOps, NodeTypesFilter}, @@ -28,37 +29,26 @@ use std::{ }; pub struct EvalNodeView<'graph, 'a: 'graph, G, S, GH = &'graph G, CS: Clone = ComputeStateVec> { - pub(crate) ss: usize, - pub(crate) node: VID, - pub(crate) base_graph: &'graph G, + pub node: VID, + pub(crate) eval_graph: EvalGraph<'graph, 'a, G, S, CS>, pub(crate) graph: GH, - pub(crate) storage: &'graph GraphStorage, pub(crate) local_state: Option<&'graph mut S>, - pub(crate) local_state_prev: &'graph PrevLocalState<'a, S>, - pub(crate) node_state: Rc>>, } impl<'graph, 'a: 'graph, G: GraphViewOps<'graph>, CS: ComputeState + 'a, S> EvalNodeView<'graph, 'a, G, S, &'graph G, CS> { pub(crate) fn new_local( - ss: usize, node: VID, - g: &'graph G, - storage: &'graph GraphStorage, + eval_graph: EvalGraph<'graph, 'a, G, S, CS>, local_state: Option<&'graph mut S>, - local_state_prev: &'graph PrevLocalState<'a, S>, - node_state: Rc>>, ) -> Self { + let graph = eval_graph.base_graph; Self { - ss, node, - base_graph: g, - graph: g, - storage, + eval_graph, + graph, local_state, - local_state_prev, - node_state, } } } @@ -73,16 +63,12 @@ impl< > Clone for EvalNodeView<'graph, 'a, G, S, GH, CS> { fn clone(&self) -> Self { - EvalNodeView::new_filtered( - self.ss, - self.node, - self.base_graph, - self.graph.clone(), - self.storage, - None, - self.local_state_prev, - self.node_state.clone(), - ) + Self { + node: self.node, + eval_graph: self.eval_graph.clone(), + graph: self.graph.clone(), + local_state: None, + } } } @@ -95,12 +81,12 @@ impl< GH: GraphViewOps<'graph>, > EvalNodeView<'graph, 'a, G, S, GH, CS> { - pub fn graph(&self) -> GH { - self.graph.clone() + pub fn graph(&self) -> EvalGraph<'graph, 'a, G, S, CS> { + self.eval_graph.clone() } pub fn prev(&self) -> &S { let VID(i) = self.node; - &self.local_state_prev.state[i] + &self.eval_graph.local_state_prev.state[i] } pub fn get_mut(&mut self) -> &mut S { @@ -118,24 +104,16 @@ impl< } pub(crate) fn new_filtered( - ss: usize, node: VID, - base_graph: &'graph G, + eval_graph: EvalGraph<'graph, 'a, G, S, CS>, graph: GH, - storage: &'graph GraphStorage, local_state: Option<&'graph mut S>, - local_state_prev: &'graph PrevLocalState<'a, S>, - node_state: Rc>>, ) -> Self { Self { - ss, node, - base_graph, + eval_graph, graph, - storage, local_state, - local_state_prev, - node_state, } } @@ -149,10 +127,11 @@ impl< id: &AccId, a: IN, ) { - self.node_state + self.eval_graph + .node_state .borrow_mut() .shard_mut() - .accumulate_into(self.ss, self.pid(), a, id); + .accumulate_into(self.eval_graph.ss, self.pid(), a, id); } pub fn global_update>( @@ -160,10 +139,11 @@ impl< id: &AccId, a: IN, ) { - self.node_state + self.eval_graph + .node_state .borrow_mut() .global_mut() - .accumulate_global(self.ss, a, id); + .accumulate_global(self.eval_graph.ss, a, id); } /// Reads the global state for a given accumulator, returned value is the global @@ -191,7 +171,11 @@ impl< OUT: StateType, A: StateType, { - self.node_state.borrow().global().read_global(self.ss, agg) + self.eval_graph + .node_state + .borrow() + .global() + .read_global(self.eval_graph.ss, agg) } /// Read the current value of the node state using the given accumulator. @@ -204,10 +188,11 @@ impl< A: StateType, OUT: std::fmt::Debug, { - self.node_state + self.eval_graph + .node_state .borrow() .shard() - .read_with_pid(self.ss, self.pid(), agg_r) + .read_with_pid(self.eval_graph.ss, self.pid(), agg_r) .unwrap_or(ACC::finish(&ACC::zero())) } @@ -221,7 +206,12 @@ impl< A: StateType, OUT: std::fmt::Debug, { - Entry::new(self.node_state.borrow(), *agg_r, &self.node, self.ss) + Entry::new( + self.eval_graph.node_state.borrow(), + *agg_r, + &self.node, + self.eval_graph.ss, + ) } /// Read the prev value of the node state using the given accumulator. @@ -234,10 +224,11 @@ impl< A: StateType, OUT: std::fmt::Debug, { - self.node_state + self.eval_graph + .node_state .borrow() .shard() - .read_with_pid(self.ss + 1, self.pid(), agg_r) + .read_with_pid(self.eval_graph.ss + 1, self.pid(), agg_r) .unwrap_or(ACC::finish(&ACC::zero())) } @@ -249,10 +240,11 @@ impl< A: StateType, OUT: std::fmt::Debug, { - self.node_state + self.eval_graph + .node_state .borrow() .global() - .read_global(self.ss + 1, agg_r) + .read_global(self.eval_graph.ss + 1, agg_r) .unwrap_or(ACC::finish(&ACC::zero())) } } @@ -266,12 +258,8 @@ pub struct EvalPathFromNode< S, > { pub graph: GH, - pub(crate) base_graph: &'graph G, + pub(crate) base_graph: EvalGraph<'graph, 'a, G, S, CS>, pub(crate) op: Arc BoxedLIter<'graph, VID> + Send + Sync + 'graph>, - pub(crate) storage: &'graph GraphStorage, - pub(crate) ss: usize, - pub(crate) node_state: Rc>>, - pub(crate) local_state_prev: &'graph PrevLocalState<'a, S>, } impl< @@ -288,24 +276,10 @@ impl< } pub fn iter(&self) -> impl Iterator> + 'graph { - let local = self.local_state_prev; - let node_state = self.node_state.clone(); - let ss = self.ss; - let base_graph = self.base_graph; + let base_graph = self.base_graph.clone(); let graph = self.graph.clone(); - let storage = self.storage; - self.iter_refs().map(move |v| { - EvalNodeView::new_filtered( - ss, - v, - base_graph, - graph.clone(), - storage, - None, - local, - node_state.clone(), - ) - }) + self.iter_refs() + .map(move |v| EvalNodeView::new_filtered(v, base_graph.clone(), graph.clone(), None)) } } @@ -338,12 +312,8 @@ impl< fn clone(&self) -> Self { EvalPathFromNode { graph: self.graph.clone(), - base_graph: self.base_graph, + base_graph: self.base_graph.clone(), op: self.op.clone(), - storage: self.storage, - ss: self.ss, - node_state: self.node_state.clone(), - local_state_prev: self.local_state_prev, } } } @@ -372,7 +342,7 @@ impl< op: F, ) -> Self::ValueType { let graph = self.graph.clone(); - let storage = self.storage; + let storage = self.base_graph.storage; Box::new(self.iter_refs().map(move |node| op(storage, &graph, node))) } @@ -387,12 +357,12 @@ impl< &self, op: F, ) -> Self::Edges { - let local_state_prev = self.local_state_prev; - let node_state = self.node_state.clone(); - let ss = self.ss; - let storage = self.storage; + let local_state_prev = self.base_graph.local_state_prev; + let node_state = self.base_graph.node_state.clone(); + let ss = self.base_graph.ss; + let storage = self.base_graph.storage; let path = PathFromNode::new_one_hop_filtered( - self.base_graph, + self.base_graph.base_graph, self.graph.clone(), self.op.clone(), ); @@ -415,7 +385,7 @@ impl< ) -> Self::PathType { let old_op = self.op.clone(); let graph = self.graph.clone(); - let storage = self.storage; + let storage = self.base_graph.storage; let new_op = Arc::new(move || { let op = op.clone(); let graph = graph.clone(); @@ -423,17 +393,11 @@ impl< .flat_map(move |vv| op(storage, &graph, vv)) .into_dyn_boxed() }); - let ss = self.ss; - let node_state = self.node_state.clone(); - let local_state_prev = self.local_state_prev; + EvalPathFromNode { - graph: self.base_graph, - base_graph: self.base_graph, + graph: self.base_graph.base_graph, + base_graph: self.base_graph.clone(), op: new_op, - storage, - ss, - node_state, - local_state_prev, } } } @@ -456,25 +420,18 @@ impl< } fn base_graph(&self) -> &Self::BaseGraph { - &self.base_graph + &self.base_graph.base_graph } fn one_hop_filtered>( &self, filtered_graph: GHH, ) -> Self::Filtered { - let storage = self.storage; - let local_state_prev = self.local_state_prev; - let node_state = self.node_state.clone(); - let ss = self.ss; + let base_graph = self.base_graph.clone(); EvalPathFromNode { graph: filtered_graph, - base_graph: self.base_graph, + base_graph, op: self.op.clone(), - storage, - ss, - node_state, - local_state_prev, } } } @@ -497,23 +454,15 @@ impl< } fn base_graph(&self) -> &Self::BaseGraph { - &self.base_graph + &self.eval_graph.base_graph } fn one_hop_filtered>( &self, filtered_graph: GHH, ) -> Self::Filtered { - EvalNodeView::new_filtered( - self.ss, - self.node, - self.base_graph, - filtered_graph, - self.storage, - None, - self.local_state_prev, - self.node_state.clone(), - ) + let eval_graph = self.eval_graph.clone(); + EvalNodeView::new_filtered(self.node, eval_graph, filtered_graph, None) } } @@ -540,7 +489,7 @@ impl< &self, op: F, ) -> Self::ValueType { - op(self.storage, &self.graph, self.node) + op(self.eval_graph.storage, &self.graph, self.node) } fn as_props(&self) -> Self::ValueType> { @@ -554,15 +503,15 @@ impl< &self, op: F, ) -> Self::Edges { - let ss = self.ss; - let local_state_prev = self.local_state_prev; - let node_state = self.node_state.clone(); + let ss = self.eval_graph.ss; + let local_state_prev = self.eval_graph.local_state_prev; + let node_state = self.eval_graph.node_state.clone(); let node = self.node; - let storage = self.storage; + let storage = self.eval_graph.storage; let graph = self.graph.clone(); let edges = Arc::new(move || op(storage, &graph, node).into_dyn_boxed()); let edges = Edges { - base_graph: self.base_graph, + base_graph: self.eval_graph.base_graph, graph: self.graph.clone(), edges, }; @@ -584,20 +533,13 @@ impl< ) -> Self::PathType { let graph = self.graph.clone(); let node = self.node; - let storage = self.storage; - + let storage = self.eval_graph.storage; let path_op = Arc::new(move || op(storage, &graph, node).into_dyn_boxed()); - let ss = self.ss; - let local_state_prev = self.local_state_prev; - let node_state = self.node_state.clone(); + let eval_graph = self.eval_graph.clone(); EvalPathFromNode { - graph: self.base_graph, - base_graph: self.base_graph, + graph: eval_graph.base_graph, + base_graph: eval_graph, op: path_op, - storage, - local_state_prev, - node_state, - ss, } } } diff --git a/raphtory/src/db/task/task_runner.rs b/raphtory/src/db/task/task_runner.rs index 735ed98b20..344f0c9c42 100644 --- a/raphtory/src/db/task/task_runner.rs +++ b/raphtory/src/db/task/task_runner.rs @@ -15,7 +15,10 @@ use crate::{ }, db::{ api::{storage::storage_ops::GraphStorage, view::StaticGraphViewOps}, - task::node::{eval_node::EvalNodeView, eval_node_state::EVState}, + task::{ + eval_graph::EvalGraph, + node::{eval_node::EvalNodeView, eval_node_state::EVState}, + }, }, }; use rayon::{prelude::*, ThreadPool}; @@ -61,24 +64,21 @@ impl TaskRunner { let global_state_view = global_state.as_cow(); let g = self.ctx.graph(); - let mut done = true; - let node_state = EVState::rc_from(shard_state_view, global_state_view); - let local = PrevLocalState::new(prev_local_state); let mut v_ref = morcel_id * morcel_size; + for local_state in morcel { if g.has_node(VID(v_ref)) { - let mut vv = EvalNodeView::new_local( - self.ctx.ss(), - v_ref.into(), - &g, + let eval_graph = EvalGraph { + ss: self.ctx.ss(), + base_graph: &g, storage, - Some(local_state), - &local, - node_state.clone(), - ); + local_state_prev: &local, + node_state: node_state.clone(), + }; + let mut vv = EvalNodeView::new_local(v_ref.into(), eval_graph, Some(local_state)); match task.run(&mut vv) { Step::Continue => { diff --git a/raphtory/src/db/task/task_state.rs b/raphtory/src/db/task/task_state.rs index 3aaf552916..8c9d7654ca 100644 --- a/raphtory/src/db/task/task_state.rs +++ b/raphtory/src/db/task/task_state.rs @@ -9,7 +9,7 @@ pub struct Global(Arc>); #[derive(Clone, Debug)] pub struct Shard(Arc>); -#[derive(Debug)] +#[derive(Debug, Copy, Clone)] pub(crate) struct PrevLocalState<'a, S> { pub(crate) state: &'a Vec, } From 1e2f7fca9e06fd034235a5378b6c99879c837c9b Mon Sep 17 00:00:00 2001 From: Lucas Jeub Date: Mon, 27 May 2024 10:45:58 +0200 Subject: [PATCH 02/15] use the eval graph --- .../src/algorithms/motifs/global_temporal_three_node_motifs.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/raphtory/src/algorithms/motifs/global_temporal_three_node_motifs.rs b/raphtory/src/algorithms/motifs/global_temporal_three_node_motifs.rs index a2b6bd2699..51f712f1bd 100644 --- a/raphtory/src/algorithms/motifs/global_temporal_three_node_motifs.rs +++ b/raphtory/src/algorithms/motifs/global_temporal_three_node_motifs.rs @@ -173,7 +173,8 @@ where .sorted() .permutations(2) .flat_map(|e| { - g.edge(*e.first().unwrap(), *e.get(1).unwrap()) + u.graph() + .edge(*e.first().unwrap(), *e.get(1).unwrap()) .iter() .flat_map(|edge| edge.explode()) .collect::>() From 181e3f01d286e6cd12d36c1149e2551d6794c13f Mon Sep 17 00:00:00 2001 From: Lucas Jeub Date: Mon, 27 May 2024 16:13:16 +0200 Subject: [PATCH 03/15] make private arrow crate a submodule and tweak all the scripts accordingly --- .github/workflows/test_python_workflow.yml | 4 +-- .github/workflows/test_rust_workflow.yml | 4 +-- .gitmodules | 3 ++ Cargo.toml | 4 +++ raphtory-arrow-private | 1 + raphtory-benchmark/Cargo.toml | 2 +- raphtory-cypher/Cargo.toml | 2 +- raphtory/Cargo.toml | 4 +-- scripts/activate_private_arrow.py | 32 +++++++++++++++++++++ scripts/deactivate_private_arrow.py | 33 ++++++++++++++++++++++ scripts/flip_ra.py | 24 ---------------- 11 files changed, 81 insertions(+), 32 deletions(-) create mode 100644 .gitmodules create mode 160000 raphtory-arrow-private create mode 100755 scripts/activate_private_arrow.py create mode 100755 scripts/deactivate_private_arrow.py delete mode 100755 scripts/flip_ra.py diff --git a/.github/workflows/test_python_workflow.yml b/.github/workflows/test_python_workflow.yml index 02aa9eaa50..ce7dfbcccb 100644 --- a/.github/workflows/test_python_workflow.yml +++ b/.github/workflows/test_python_workflow.yml @@ -59,8 +59,8 @@ jobs: with: python-version: ${{ matrix.python }} cache: 'pip' - - name: Flip raphtory-arrow in Cargo.toml - run: python ./scripts/flip_ra.py Cargo.toml + - name: Activate raphtory-arrow in Cargo.toml + run: python ./scripts/activate_private_arrow.py Cargo.toml - name: Run Maturin develop uses: PyO3/maturin-action@v1 with: diff --git a/.github/workflows/test_rust_workflow.yml b/.github/workflows/test_rust_workflow.yml index 680df601e2..4340c0019f 100644 --- a/.github/workflows/test_rust_workflow.yml +++ b/.github/workflows/test_rust_workflow.yml @@ -52,8 +52,8 @@ jobs: name: Cargo cache with: cache-all-crates: true - - name: Flip raphtory-arrow in Cargo.toml - run: python ./scripts/flip_ra.py + - name: Activate raphtory-arrow in Cargo.toml + run: python ./scripts/activate_private_arrow.py - name: Install bininstall uses: cargo-bins/cargo-binstall@main - name: Install nextest diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000000..12818e631f --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "raphtory-arrow-private"] + path = raphtory-arrow-private + url = git@github.com:Pometry/raphtory-arrow.git diff --git a/Cargo.toml b/Cargo.toml index dd3ba83662..52407e7b92 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,10 @@ inherits = "release" debug = true [workspace.dependencies] +#[public-arrow] +raphtory-arrow = { version = "0.8.1", path = "raphtory-arrow" } +#[private-arrow] +# raphtory-arrow = { path = "raphtory-arrow-private", package = "raphtory-arrow-private" } async-graphql = { version = "6.0.11", features = ["dynamic-schema"] } async-graphql-poem = "6.0.11" dynamic-graphql = "0.8.1" diff --git a/raphtory-arrow-private b/raphtory-arrow-private new file mode 160000 index 0000000000..78f231a104 --- /dev/null +++ b/raphtory-arrow-private @@ -0,0 +1 @@ +Subproject commit 78f231a10456a514bd56ed5912a18f431f245b87 diff --git a/raphtory-benchmark/Cargo.toml b/raphtory-benchmark/Cargo.toml index c5ac34388d..131ce09534 100644 --- a/raphtory-benchmark/Cargo.toml +++ b/raphtory-benchmark/Cargo.toml @@ -9,7 +9,7 @@ edition = "2021" criterion = { workspace = true } raphtory = { path = "../raphtory", features = ["io"] } raphtory-graphql = { path = "../raphtory-graphql", version = "0.8.1" } -raphtory-arrow = { path = "../raphtory-arrow", version = "0.8.1" } +raphtory-arrow.workspace = true sorted_vector_map = { workspace = true } rand = { workspace = true } rayon = { workspace = true } diff --git a/raphtory-cypher/Cargo.toml b/raphtory-cypher/Cargo.toml index 1ac40e906b..e142fa9719 100644 --- a/raphtory-cypher/Cargo.toml +++ b/raphtory-cypher/Cargo.toml @@ -15,7 +15,7 @@ edition.workspace = true [dependencies] raphtory = { path = "../raphtory" } -raphtory-arrow = { path = "../raphtory-arrow", optional = true, version = "0.8.1"} +raphtory-arrow = { workspace = true, optional = true } arrow.workspace = true arrow-buffer.workspace = true arrow-schema.workspace = true diff --git a/raphtory/Cargo.toml b/raphtory/Cargo.toml index c2aab2e7fd..e68f0825e3 100644 --- a/raphtory/Cargo.toml +++ b/raphtory/Cargo.toml @@ -15,7 +15,7 @@ homepage.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -raphtory-api = { path = "../raphtory-api", version="0.8.1" } +raphtory-api = { path = "../raphtory-api", version = "0.8.1" } bincode = { workspace = true } chrono = { workspace = true } itertools = { workspace = true } @@ -74,7 +74,7 @@ tempfile = { workspace = true, optional = true } bytemuck = { workspace = true, optional = true } rpds = { workspace = true, optional = true } thread_local = { workspace = true, optional = true } -raphtory-arrow = { path = "../raphtory-arrow", version="0.8.1", optional = true } +raphtory-arrow = { workspace = true, optional = true } [dev-dependencies] csv = { workspace = true } diff --git a/scripts/activate_private_arrow.py b/scripts/activate_private_arrow.py new file mode 100755 index 0000000000..582d5ec9ed --- /dev/null +++ b/scripts/activate_private_arrow.py @@ -0,0 +1,32 @@ +#!/usr/bin/env python3 +import subprocess +from pathlib import Path +import re + +directory = "./raphtory-arrow" +root_dir = Path(__file__).parent.parent +toml_file = root_dir / "Cargo.toml" + +with open(toml_file, "r") as f: + lines = f.readlines() + +for i, line in enumerate(lines[:-1]): + if "#[private-arrow]" in line: + next_line = lines[i + 1] + if next_line.strip().startswith("#") and "raphtory-arrow" in next_line: + lines[i + 1] = re.sub(r"#\s*", "", next_line, 1) + if "#[public-arrow]" in line: + next_line = lines[i + 1] + if next_line.strip().startswith("raphtory-arrow"): + lines[i + 1] = next_line.replace("raphtory-arrow", "# raphtory-arrow", 1) + +with open(toml_file, "w") as f: + f.writelines(lines) + +subprocess.run( + [ + "git", + "submodule", + "init", + ] +) diff --git a/scripts/deactivate_private_arrow.py b/scripts/deactivate_private_arrow.py new file mode 100755 index 0000000000..cb4990cf0f --- /dev/null +++ b/scripts/deactivate_private_arrow.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python3 +import subprocess +from pathlib import Path +import re + +directory = "./raphtory-arrow" +root_dir = Path(__file__).parent.parent +toml_file = root_dir / "Cargo.toml" + +with open(toml_file, "r") as f: + lines = f.readlines() + +for i, line in enumerate(lines[:-1]): + if "#[public-arrow]" in line: + next_line = lines[i + 1] + if next_line.strip().startswith("#") and "raphtory-arrow" in next_line: + lines[i + 1] = re.sub(r"#\s*", "", next_line, 1) + if "#[private-arrow]" in line: + next_line = lines[i + 1] + if next_line.strip().startswith("raphtory-arrow"): + lines[i + 1] = next_line.replace("raphtory-arrow", "# raphtory-arrow", 1) + +with open(toml_file, "w") as f: + f.writelines(lines) + +subprocess.run( + [ + "git", + "submodule", + "deinit", + "raphtory-arrow-private" + ] +) diff --git a/scripts/flip_ra.py b/scripts/flip_ra.py deleted file mode 100755 index 73e21ebaba..0000000000 --- a/scripts/flip_ra.py +++ /dev/null @@ -1,24 +0,0 @@ -#!/usr/bin/env python3 -import shutil -import os, stat -import subprocess - -directory = "./raphtory-arrow" - -def remove_readonly(func, path, _): - "Clear the readonly bit and reattempt the removal" - os.chmod(path, stat.S_IWRITE) - func(path) - -shutil.rmtree(directory, onerror=remove_readonly) - -subprocess.run( - [ - "git", - "clone", - "--depth", - "1", - "git@github.com:Pometry/raphtory-arrow.git", - "raphtory-arrow", - ] -) From a69a6e49c2db8723a02ecfd63a3a472655d8be94 Mon Sep 17 00:00:00 2001 From: Lucas Jeub Date: Mon, 27 May 2024 16:14:46 +0200 Subject: [PATCH 04/15] arrow interface improvements --- .../global_temporal_three_node_motifs.rs | 2 +- raphtory/src/arrow/graph_impl/core_ops.rs | 18 +-- raphtory/src/arrow/storage_interface/node.rs | 106 ++++++++++-------- raphtory/src/arrow/storage_interface/nodes.rs | 24 +--- .../src/arrow/storage_interface/nodes_ref.rs | 20 +--- raphtory/src/db/api/storage/storage_ops.rs | 6 +- 6 files changed, 84 insertions(+), 92 deletions(-) diff --git a/raphtory/src/algorithms/motifs/global_temporal_three_node_motifs.rs b/raphtory/src/algorithms/motifs/global_temporal_three_node_motifs.rs index 51f712f1bd..f8a618bad8 100644 --- a/raphtory/src/algorithms/motifs/global_temporal_three_node_motifs.rs +++ b/raphtory/src/algorithms/motifs/global_temporal_three_node_motifs.rs @@ -7,7 +7,7 @@ use crate::{ compute_state::ComputeStateVec, }, db::{ - api::view::{GraphViewOps, NodeViewOps, *}, + api::view::*, graph::views::node_subgraph::NodeSubgraph, task::{ context::Context, diff --git a/raphtory/src/arrow/graph_impl/core_ops.rs b/raphtory/src/arrow/graph_impl/core_ops.rs index ba032f2095..46ca6491e7 100644 --- a/raphtory/src/arrow/graph_impl/core_ops.rs +++ b/raphtory/src/arrow/graph_impl/core_ops.rs @@ -33,7 +33,7 @@ use crate::{ }; use itertools::Itertools; use polars_arrow::datatypes::ArrowDataType; -use raphtory_arrow::{properties::Properties, GID}; +use raphtory_arrow::{properties::Properties, GidRef, GID}; use rayon::prelude::*; impl CoreGraphOps for ArrowGraph { fn unfiltered_num_nodes(&self) -> usize { @@ -97,17 +97,17 @@ impl CoreGraphOps for ArrowGraph { fn node_id(&self, v: VID) -> u64 { match self.inner.node_gid(v).unwrap() { - GID::U64(n) => n, - GID::I64(n) => n as u64, - GID::Str(s) => s.id(), + GidRef::U64(n) => n, + GidRef::I64(n) => n as u64, + GidRef::Str(s) => s.id(), } } fn node_name(&self, v: VID) -> String { match self.inner.node_gid(v).unwrap() { - GID::U64(n) => n.to_string(), - GID::I64(n) => n.to_string(), - GID::Str(s) => s, + GidRef::U64(n) => n.to_string(), + GidRef::I64(n) => n.to_string(), + GidRef::Str(s) => s.to_owned(), } } @@ -200,7 +200,7 @@ impl CoreGraphOps for ArrowGraph { } fn core_nodes(&self) -> NodesStorage { - NodesStorage::Arrow(ArrowNodesOwned::new(&self.inner)) + NodesStorage::Arrow(ArrowNodesOwned::new(self.inner.clone())) } fn core_node_entry(&self, vid: VID) -> NodeStorageEntry { @@ -208,7 +208,7 @@ impl CoreGraphOps for ArrowGraph { } fn core_node_arc(&self, vid: VID) -> NodeOwnedEntry { - NodeOwnedEntry::Arrow(ArrowOwnedNode::new(&self.inner, vid)) + NodeOwnedEntry::Arrow(ArrowOwnedNode::new(self.inner.clone(), vid)) } fn core_edge_arc(&self, eid: ELID) -> EdgeOwnedEntry { diff --git a/raphtory/src/arrow/storage_interface/node.rs b/raphtory/src/arrow/storage_interface/node.rs index 06c95d2f9c..113c7b3902 100644 --- a/raphtory/src/arrow/storage_interface/node.rs +++ b/raphtory/src/arrow/storage_interface/node.rs @@ -1,6 +1,6 @@ use crate::{ core::{ - entities::{edges::edge_ref::EdgeRef, LayerIds, EID, VID}, + entities::{edges::edge_ref::EdgeRef, nodes::input_node::InputNode, LayerIds, EID, VID}, Direction, }, db::api::{ @@ -15,32 +15,28 @@ use crate::{ use itertools::Itertools; use raphtory_arrow::{ graph::TemporalGraph, graph_fragment::TempColGraphFragment, properties::Properties, - timestamps::TimeStamps, + timestamps::TimeStamps, GidRef, GID, }; use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator}; use std::{iter, sync::Arc}; #[derive(Copy, Clone, Debug)] pub struct ArrowNode<'a> { - pub(super) properties: Option<&'a Properties>, - pub(super) layers: &'a Arc<[TempColGraphFragment]>, + graph: &'a TemporalGraph, pub(super) vid: VID, } impl<'a> ArrowNode<'a> { pub(crate) fn new(graph: &'a TemporalGraph, vid: VID) -> Self { - Self { - properties: graph.node_properties(), - layers: graph.arc_layers(), - vid, - } + Self { graph, vid } } pub fn out_edges(self, layers: &'a LayerIds) -> impl Iterator + 'a { match layers { LayerIds::None => LayerVariants::None(iter::empty()), LayerIds::All => LayerVariants::All( - self.layers + self.graph + .layers() .iter() .enumerate() .map(|(layer_id, layer)| { @@ -54,7 +50,7 @@ impl<'a> ArrowNode<'a> { .kmerge_by(|e1, e2| e1.remote() <= e2.remote()), ), LayerIds::One(layer_id) => LayerVariants::One( - self.layers[*layer_id] + self.graph.layers()[*layer_id] .nodes_storage() .out_adj_list(self.vid) .map(move |(eid, dst)| { @@ -64,7 +60,7 @@ impl<'a> ArrowNode<'a> { LayerIds::Multiple(ids) => LayerVariants::Multiple( ids.iter() .map(|&layer_id| { - self.layers[layer_id] + self.graph.layers()[layer_id] .nodes_storage() .out_adj_list(self.vid) .map(move |(eid, dst)| { @@ -80,7 +76,8 @@ impl<'a> ArrowNode<'a> { match layers { LayerIds::None => LayerVariants::None(iter::empty()), LayerIds::All => LayerVariants::All( - self.layers + self.graph + .layers() .iter() .enumerate() .map(|(layer_id, layer)| { @@ -94,7 +91,7 @@ impl<'a> ArrowNode<'a> { .kmerge_by(|e1, e2| e1.remote() <= e2.remote()), ), LayerIds::One(layer_id) => LayerVariants::One( - self.layers[*layer_id] + self.graph.layers()[*layer_id] .nodes_storage() .in_adj_list(self.vid) .map(move |(eid, src)| { @@ -104,7 +101,7 @@ impl<'a> ArrowNode<'a> { LayerIds::Multiple(ids) => LayerVariants::Multiple( ids.iter() .map(|&layer_id| { - self.layers[layer_id] + self.graph.layers()[layer_id] .nodes_storage() .in_adj_list(self.vid) .map(move |(eid, src)| { @@ -125,8 +122,9 @@ impl<'a> ArrowNode<'a> { let mut additions = match layer_ids { LayerIds::None => Vec::with_capacity(1), LayerIds::All => { - let mut additions = Vec::with_capacity(self.layers.len() + 1); - self.layers + let mut additions = Vec::with_capacity(self.graph.layers().len() + 1); + self.graph + .layers() .par_iter() .map(|l| { TimeStamps::new(l.nodes_storage().additions().value(self.vid.index()), None) @@ -136,7 +134,7 @@ impl<'a> ArrowNode<'a> { } LayerIds::One(id) => { vec![TimeStamps::new( - self.layers[*id] + self.graph.layers()[*id] .nodes_storage() .additions() .value(self.vid.index()), @@ -148,7 +146,7 @@ impl<'a> ArrowNode<'a> { ids.par_iter() .map(|l| { TimeStamps::new( - self.layers[*l] + self.graph.layers()[*l] .nodes_storage() .additions() .value(self.vid.index()), @@ -159,7 +157,7 @@ impl<'a> ArrowNode<'a> { additions } }; - if let Some(props) = self.properties { + if let Some(props) = self.graph.node_properties() { let timestamps = props.temporal_props.timestamps::(self.vid); if timestamps.len() > 0 { let ts = timestamps.times(); @@ -174,15 +172,15 @@ impl<'a> NodeStorageOps<'a> for ArrowNode<'a> { fn degree(self, layers: &LayerIds, dir: Direction) -> usize { let single_layer = match layers { LayerIds::None => return 0, - LayerIds::All => match self.layers.len() { + LayerIds::All => match self.graph.layers().len() { 0 => return 0, - 1 => Some(&self.layers[0]), + 1 => Some(&self.graph.layers()[0]), _ => None, }, - LayerIds::One(id) => Some(&self.layers[*id]), + LayerIds::One(id) => Some(&self.graph.layers()[*id]), LayerIds::Multiple(ids) => match ids.len() { 0 => return 0, - 1 => Some(&self.layers[ids[0]]), + 1 => Some(&self.graph.layers()[ids[0]]), _ => None, }, }; @@ -221,7 +219,8 @@ impl<'a> NodeStorageOps<'a> for ArrowNode<'a> { } fn tprop(self, prop_id: usize) -> impl TPropOps<'a> { - self.properties + self.graph + .node_properties() .unwrap() .temporal_props .prop(self.vid, prop_id) @@ -247,30 +246,46 @@ impl<'a> NodeStorageOps<'a> for ArrowNode<'a> { self.vid } + fn id(self) -> u64 { + match self.graph.node_gid(self.vid).unwrap() { + GidRef::U64(v) => v, + GidRef::I64(v) => v as u64, + GidRef::Str(v) => v.id(), + } + } + fn name(self) -> Option<&'a str> { - todo!() + match self.graph.node_gid(self.vid).unwrap() { + GidRef::U64(_) => None, + GidRef::I64(_) => None, + GidRef::Str(v) => Some(v), + } } fn find_edge(self, dst: VID, layer_ids: &LayerIds) -> Option { match layer_ids { LayerIds::None => None, - LayerIds::All => match self.layers.len() { + LayerIds::All => match self.graph.layers().len() { 0 => None, 1 => { - let eid = self.layers[0].nodes_storage().find_edge(self.vid, dst)?; + let eid = self.graph.layers()[0] + .nodes_storage() + .find_edge(self.vid, dst)?; Some(EdgeRef::new_outgoing(eid, self.vid, dst).at_layer(0)) } _ => todo!("multilayer edge views not implemented in arrow yet"), }, LayerIds::One(id) => { - let eid = self.layers[*id].nodes_storage().find_edge(self.vid, dst)?; + let eid = self.graph.layers()[*id] + .nodes_storage() + .find_edge(self.vid, dst)?; Some(EdgeRef::new_outgoing(eid, self.vid, dst).at_layer(*id)) } LayerIds::Multiple(ids) => match ids.len() { 0 => None, 1 => { let layer = ids[0]; - let eid = self.layers[layer] + let eid = self.graph.layers()[layer] .nodes_storage() .find_edge(self.vid, dst)?; Some(EdgeRef::new_outgoing(eid, self.vid, dst).at_layer(layer)) @@ -283,23 +298,17 @@ impl<'a> NodeStorageOps<'a> for ArrowNode<'a> { #[derive(Clone, Debug)] pub struct ArrowOwnedNode { - properties: Option>, - layers: Arc<[TempColGraphFragment]>, + graph: Arc, vid: VID, } impl ArrowOwnedNode { - pub(crate) fn new(graph: &TemporalGraph, vid: VID) -> Self { - Self { - properties: graph.node_properties().cloned(), - layers: graph.arc_layers().clone(), - vid, - } + pub(crate) fn new(graph: Arc, vid: VID) -> Self { + Self { graph, vid } } pub fn as_ref(&self) -> ArrowNode { ArrowNode { - properties: self.properties.as_ref(), - layers: &self.layers, + graph: &self.graph, vid: self.vid, } } @@ -308,7 +317,7 @@ impl ArrowOwnedNode { match layers { LayerIds::None => LayerVariants::None(iter::empty()), LayerIds::All => { - let layers = self.layers; + let layers = self.graph.arc_layers().clone(); LayerVariants::All( (0..layers.len()) .map(move |layer_id| { @@ -328,7 +337,7 @@ impl ArrowOwnedNode { ) } LayerIds::One(layer_id) => { - let adj = self.layers[layer_id] + let adj = self.graph.layers()[layer_id] .nodes_storage() .adj_out() .clone() @@ -343,7 +352,7 @@ impl ArrowOwnedNode { (0..ids.len()) .map(move |i| { let layer_id = ids[i]; - let adj = self.layers[layer_id] + let adj = self.graph.layers()[layer_id] .nodes_storage() .adj_out() .clone() @@ -364,7 +373,7 @@ impl ArrowOwnedNode { match layers { LayerIds::None => LayerVariants::None(iter::empty()), LayerIds::All => { - let layers = self.layers; + let layers = self.graph.arc_layers().clone(); LayerVariants::All( (0..layers.len()) .map(move |layer_id| { @@ -390,7 +399,7 @@ impl ArrowOwnedNode { ) } LayerIds::One(layer_id) => { - let layer = &self.layers[layer_id]; + let layer = self.graph.layer(layer_id); let eids = layer .nodes_storage() .adj_in_edges() @@ -414,7 +423,7 @@ impl ArrowOwnedNode { (0..ids.len()) .map(move |i| { let layer_id = ids[i]; - let layer = &self.layers[layer_id]; + let layer = self.graph.layer(layer_id); let eids = layer .nodes_storage() .adj_in_edges() @@ -483,6 +492,11 @@ impl<'a> NodeStorageOps<'a> for &'a ArrowOwnedNode { self.vid } + #[inline] + fn id(self) -> u64 { + self.as_ref().id() + } + fn name(self) -> Option<&'a str> { self.as_ref().name() } diff --git a/raphtory/src/arrow/storage_interface/nodes.rs b/raphtory/src/arrow/storage_interface/nodes.rs index bb894416a9..281b750832 100644 --- a/raphtory/src/arrow/storage_interface/nodes.rs +++ b/raphtory/src/arrow/storage_interface/nodes.rs @@ -10,33 +10,19 @@ use std::sync::Arc; #[derive(Clone, Debug)] pub struct ArrowNodesOwned { - num_nodes: usize, - properties: Option>, - layers: Arc<[TempColGraphFragment]>, + graph: Arc, } impl ArrowNodesOwned { - pub(crate) fn new(graph: &TemporalGraph) -> Self { - Self { - num_nodes: graph.num_nodes(), - properties: graph.node_properties().cloned(), - layers: graph.layers().into(), - } + pub(crate) fn new(graph: Arc) -> Self { + Self { graph } } pub fn node(&self, vid: VID) -> ArrowNode { - ArrowNode { - properties: self.properties.as_ref(), - layers: &self.layers, - vid, - } + ArrowNode::new(&self.graph, vid) } pub fn as_ref(&self) -> ArrowNodesRef { - ArrowNodesRef { - num_nodes: self.num_nodes, - properties: self.properties.as_ref(), - layers: &self.layers, - } + ArrowNodesRef::new(&self.graph) } } diff --git a/raphtory/src/arrow/storage_interface/nodes_ref.rs b/raphtory/src/arrow/storage_interface/nodes_ref.rs index a2f9c5d20e..bc07730e27 100644 --- a/raphtory/src/arrow/storage_interface/nodes_ref.rs +++ b/raphtory/src/arrow/storage_interface/nodes_ref.rs @@ -7,35 +7,25 @@ use std::sync::Arc; #[derive(Copy, Clone, Debug)] pub struct ArrowNodesRef<'a> { - pub(super) num_nodes: usize, - pub(super) properties: Option<&'a Properties>, - pub(super) layers: &'a Arc<[TempColGraphFragment]>, + graph: &'a TemporalGraph, } impl<'a> ArrowNodesRef<'a> { pub(crate) fn new(graph: &'a TemporalGraph) -> Self { - Self { - num_nodes: graph.num_nodes(), - properties: graph.node_properties(), - layers: graph.arc_layers(), - } + Self { graph } } pub fn node(self, vid: VID) -> ArrowNode<'a> { - ArrowNode { - properties: self.properties, - layers: self.layers, - vid, - } + ArrowNode::new(self.graph, vid) } pub fn par_iter(self) -> impl IndexedParallelIterator> { - (0..self.num_nodes) + (0..self.graph.num_nodes()) .into_par_iter() .map(move |vid| self.node(VID(vid))) } pub fn iter(self) -> impl Iterator> { - (0..self.num_nodes).map(move |vid| self.node(VID(vid))) + (0..self.graph.num_nodes()).map(move |vid| self.node(VID(vid))) } } diff --git a/raphtory/src/db/api/storage/storage_ops.rs b/raphtory/src/db/api/storage/storage_ops.rs index 10d136afe8..1bd0119067 100644 --- a/raphtory/src/db/api/storage/storage_ops.rs +++ b/raphtory/src/db/api/storage/storage_ops.rs @@ -65,7 +65,9 @@ impl GraphStorage { match self { GraphStorage::Mem(storage) => NodesStorage::Mem(storage.nodes.clone()), #[cfg(feature = "arrow")] - GraphStorage::Arrow(storage) => NodesStorage::Arrow(ArrowNodesOwned::new(storage)), + GraphStorage::Arrow(storage) => { + NodesStorage::Arrow(ArrowNodesOwned::new(storage.clone())) + } } } @@ -82,7 +84,7 @@ impl GraphStorage { GraphStorage::Mem(storage) => NodeOwnedEntry::Mem(storage.nodes.arc_entry(vid)), #[cfg(feature = "arrow")] GraphStorage::Arrow(storage) => { - NodeOwnedEntry::Arrow(ArrowOwnedNode::new(storage, vid)) + NodeOwnedEntry::Arrow(ArrowOwnedNode::new(storage.clone(), vid)) } } } From a314dff0046e68efe08d30bfc6c6f6b506e81204 Mon Sep 17 00:00:00 2001 From: Lucas Jeub Date: Mon, 27 May 2024 16:20:24 +0200 Subject: [PATCH 05/15] fix the activation script --- scripts/activate_private_arrow.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/scripts/activate_private_arrow.py b/scripts/activate_private_arrow.py index 582d5ec9ed..e326cea242 100755 --- a/scripts/activate_private_arrow.py +++ b/scripts/activate_private_arrow.py @@ -27,6 +27,8 @@ [ "git", "submodule", - "init", + "update", + "--init", + "--recursive", ] ) From e12b57cf1b94d6d14846e31baef0f8c77eb951db Mon Sep 17 00:00:00 2001 From: Lucas Jeub Date: Mon, 27 May 2024 16:23:40 +0200 Subject: [PATCH 06/15] don't run cargo update here (breaks caching) --- .github/workflows/benchmark.yml | 4 ---- 1 file changed, 4 deletions(-) diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index 9345a920e2..3e68ac2074 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -44,10 +44,6 @@ jobs: toolchain: 1.77.0 override: true components: rustfmt, clippy - - name: Cargo update - uses: actions-rs/cargo@v1 - with: - command: update - name: Run benchmark (Unix) run: | set -o pipefail From 580afcf788b508d41149bbf383eb34009c01bbd4 Mon Sep 17 00:00:00 2001 From: Lucas Jeub Date: Tue, 28 May 2024 10:53:21 +0200 Subject: [PATCH 07/15] move arrow activation/deactivation logic to makefile --- .github/workflows/test_python_workflow.yml | 2 +- .github/workflows/test_rust_workflow.yml | 2 +- Makefile | 11 ++++++++++- scripts/activate_private_arrow.py | 10 ---------- scripts/deactivate_private_arrow.py | 9 --------- 5 files changed, 12 insertions(+), 22 deletions(-) diff --git a/.github/workflows/test_python_workflow.yml b/.github/workflows/test_python_workflow.yml index ce7dfbcccb..cad65b237f 100644 --- a/.github/workflows/test_python_workflow.yml +++ b/.github/workflows/test_python_workflow.yml @@ -60,7 +60,7 @@ jobs: python-version: ${{ matrix.python }} cache: 'pip' - name: Activate raphtory-arrow in Cargo.toml - run: python ./scripts/activate_private_arrow.py Cargo.toml + run: make pull-arrow - name: Run Maturin develop uses: PyO3/maturin-action@v1 with: diff --git a/.github/workflows/test_rust_workflow.yml b/.github/workflows/test_rust_workflow.yml index 4340c0019f..666cf343a4 100644 --- a/.github/workflows/test_rust_workflow.yml +++ b/.github/workflows/test_rust_workflow.yml @@ -53,7 +53,7 @@ jobs: with: cache-all-crates: true - name: Activate raphtory-arrow in Cargo.toml - run: python ./scripts/activate_private_arrow.py + run: make pull-arrow - name: Install bininstall uses: cargo-bins/cargo-binstall@main - name: Install nextest diff --git a/Makefile b/Makefile index 706a308552..6bd3acadbd 100644 --- a/Makefile +++ b/Makefile @@ -35,4 +35,13 @@ rust-test-all: cargo check -p raphtory --no-default-features --features "io" cargo check -p raphtory --no-default-features --features "python" cargo check -p raphtory --no-default-features --features "search" - cargo check -p raphtory --no-default-features --features "vectors" \ No newline at end of file + cargo check -p raphtory --no-default-features --features "vectors" + +activate-arrow: + ./scripts/activate_private_arrow.py + +deactivate-arrow: + ./scripts/deactivate_private_arrow.py + +pull-arrow: activate-arrow + git submodule update --init --recursive diff --git a/scripts/activate_private_arrow.py b/scripts/activate_private_arrow.py index e326cea242..5711de5513 100755 --- a/scripts/activate_private_arrow.py +++ b/scripts/activate_private_arrow.py @@ -22,13 +22,3 @@ with open(toml_file, "w") as f: f.writelines(lines) - -subprocess.run( - [ - "git", - "submodule", - "update", - "--init", - "--recursive", - ] -) diff --git a/scripts/deactivate_private_arrow.py b/scripts/deactivate_private_arrow.py index cb4990cf0f..c928ab6e30 100755 --- a/scripts/deactivate_private_arrow.py +++ b/scripts/deactivate_private_arrow.py @@ -22,12 +22,3 @@ with open(toml_file, "w") as f: f.writelines(lines) - -subprocess.run( - [ - "git", - "submodule", - "deinit", - "raphtory-arrow-private" - ] -) From 434a5a1ae5380e921a7ec3339ff28c402919b90a Mon Sep 17 00:00:00 2001 From: Lucas Jeub Date: Tue, 28 May 2024 10:53:53 +0200 Subject: [PATCH 08/15] clean up some warnings --- raphtory-graphql/src/model/graph/edge.rs | 1 - raphtory/src/db/api/storage/edges/edges.rs | 19 ------------------- raphtory/src/db/api/storage/storage_ops.rs | 2 +- raphtory/src/db/api/view/internal/core_ops.rs | 1 - raphtory/src/db/task/node/eval_node.rs | 7 +------ 5 files changed, 2 insertions(+), 28 deletions(-) diff --git a/raphtory-graphql/src/model/graph/edge.rs b/raphtory-graphql/src/model/graph/edge.rs index cb59c1589e..a513531c0a 100644 --- a/raphtory-graphql/src/model/graph/edge.rs +++ b/raphtory-graphql/src/model/graph/edge.rs @@ -1,5 +1,4 @@ use crate::model::graph::{node::Node, property::GqlProperties}; -use async_graphql::Error; use dynamic_graphql::{ResolvedObject, ResolvedObjectFields}; use itertools::Itertools; use raphtory::{ diff --git a/raphtory/src/db/api/storage/edges/edges.rs b/raphtory/src/db/api/storage/edges/edges.rs index 64ca63f915..559c26f7b3 100644 --- a/raphtory/src/db/api/storage/edges/edges.rs +++ b/raphtory/src/db/api/storage/edges/edges.rs @@ -41,25 +41,6 @@ pub enum EdgesStorageRef<'a> { } impl<'a> EdgesStorageRef<'a> { - pub fn get_layer(self, eid: EID, layer_id: usize) -> EdgeStorageRef<'a> { - match self { - EdgesStorageRef::Mem(storage) => EdgeStorageRef::Mem(storage.get(eid)), - #[cfg(feature = "arrow")] - EdgesStorageRef::Arrow(storage) => EdgeStorageRef::Arrow(storage.edge(eid, layer_id)), - } - } - - #[inline] - pub fn get(self, eid: EID) -> EdgeStorageRef<'a> { - match self { - EdgesStorageRef::Mem(storage) => EdgeStorageRef::Mem(storage.get(eid)), - #[cfg(feature = "arrow")] - EdgesStorageRef::Arrow(_) => { - todo!("getting multilayer edge not implemented for arrow graph") - } - } - } - #[cfg(feature = "arrow")] pub fn iter(self, layers: LayerIds) -> impl Iterator> { match self { diff --git a/raphtory/src/db/api/storage/storage_ops.rs b/raphtory/src/db/api/storage/storage_ops.rs index 1bd0119067..fd115279fa 100644 --- a/raphtory/src/db/api/storage/storage_ops.rs +++ b/raphtory/src/db/api/storage/storage_ops.rs @@ -43,7 +43,7 @@ use crate::{ }; use itertools::Itertools; use rayon::prelude::*; -use std::{iter, sync::Arc}; +use std::iter; #[derive(Debug, Clone)] pub enum GraphStorage { diff --git a/raphtory/src/db/api/view/internal/core_ops.rs b/raphtory/src/db/api/view/internal/core_ops.rs index 1e49986684..15ef9cf494 100644 --- a/raphtory/src/db/api/view/internal/core_ops.rs +++ b/raphtory/src/db/api/view/internal/core_ops.rs @@ -28,7 +28,6 @@ use crate::{ use enum_dispatch::enum_dispatch; #[cfg(feature = "arrow")] use raphtory_arrow::timestamps::TimeStamps; -use rayon::prelude::*; use std::ops::Range; /// Core functions that should (almost-)always be implemented by pointing at the underlying graph. diff --git a/raphtory/src/db/task/node/eval_node.rs b/raphtory/src/db/task/node/eval_node.rs index 5181887161..b22383e83f 100644 --- a/raphtory/src/db/task/node/eval_node.rs +++ b/raphtory/src/db/task/node/eval_node.rs @@ -17,16 +17,11 @@ use crate::{ graph::{edges::Edges, node::NodeView, path::PathFromNode}, task::{ edge::eval_edges::EvalEdges, eval_graph::EvalGraph, node::eval_node_state::EVState, - task_state::PrevLocalState, }, }, prelude::{GraphViewOps, NodeTypesFilter}, }; -use std::{ - cell::{Ref, RefCell}, - rc::Rc, - sync::Arc, -}; +use std::{cell::Ref, sync::Arc}; pub struct EvalNodeView<'graph, 'a: 'graph, G, S, GH = &'graph G, CS: Clone = ComputeStateVec> { pub node: VID, From 9d32dfeb988a26f336914d06713ed6c4f5ba4d32 Mon Sep 17 00:00:00 2001 From: Lucas Jeub Date: Tue, 28 May 2024 10:54:08 +0200 Subject: [PATCH 09/15] add temporal motifs to benchmark --- raphtory-benchmark/benches/algobench.rs | 34 ++++++++++++------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/raphtory-benchmark/benches/algobench.rs b/raphtory-benchmark/benches/algobench.rs index 2c23009d96..b879875321 100644 --- a/raphtory-benchmark/benches/algobench.rs +++ b/raphtory-benchmark/benches/algobench.rs @@ -8,7 +8,10 @@ use raphtory::{ clustering_coefficient::clustering_coefficient, local_clustering_coefficient::local_clustering_coefficient, }, - motifs::local_triangle_count::local_triangle_count, + motifs::{ + global_temporal_three_node_motifs::global_temporal_three_node_motif, + local_triangle_count::local_triangle_count, + }, }, graphgen::random_attachment::random_attachment, prelude::*, @@ -16,22 +19,6 @@ use raphtory::{ use rayon::prelude::*; mod common; - -//TODO swap to new trianglecount -// pub fn global_triangle_count_analysis(c: &mut Criterion) { -// let mut group = c.benchmark_group("global_triangle_count"); -// group.sample_size(10); -// bench(&mut group, "global_triangle_count", None, |b| { -// let g = raphtory_db::graph_loader::lotr_graph::lotr_graph(1); -// let windowed_graph = g.window(i64::MIN, i64::MAX); -// b.iter(|| { -// global_triangle_count(&windowed_graph).unwrap(); -// }); -// }); -// -// group.finish(); -// } - pub fn local_triangle_count_analysis(c: &mut Criterion) { let mut group = c.benchmark_group("local_triangle_count"); group.sample_size(10); @@ -132,6 +119,18 @@ pub fn graphgen_large_concomp(c: &mut Criterion) { group.finish() } +pub fn temporal_motifs(c: &mut Criterion) { + let mut group = c.benchmark_group("temporal_motifs"); + + bench(&mut group, "temporal_motifs", None, |b| { + let g: Graph = raphtory::graph_loader::example::lotr_graph::lotr_graph(); + + b.iter(|| global_temporal_three_node_motif(&g, 100, None)) + }); + + group.finish(); +} + criterion_group!( benches, local_triangle_count_analysis, @@ -139,5 +138,6 @@ criterion_group!( graphgen_large_clustering_coeff, graphgen_large_pagerank, graphgen_large_concomp, + temporal_motifs, ); criterion_main!(benches); From 5a14cd10370b7021f99e1fff4de6d6661008e4c6 Mon Sep 17 00:00:00 2001 From: Lucas Jeub Date: Tue, 28 May 2024 11:40:32 +0200 Subject: [PATCH 10/15] fix the over-eager cleanup --- raphtory/src/arrow/storage_interface/node.rs | 5 +--- raphtory/src/arrow/storage_interface/nodes.rs | 4 +-- .../src/arrow/storage_interface/nodes_ref.rs | 5 +--- raphtory/src/db/api/storage/storage_ops.rs | 30 ++++++++++--------- raphtory/src/db/api/view/internal/core_ops.rs | 5 +++- raphtory/src/db/task/node/eval_node.rs | 2 +- raphtory/src/python/graph/edges.rs | 5 ++-- 7 files changed, 26 insertions(+), 30 deletions(-) diff --git a/raphtory/src/arrow/storage_interface/node.rs b/raphtory/src/arrow/storage_interface/node.rs index 113c7b3902..24bb91563e 100644 --- a/raphtory/src/arrow/storage_interface/node.rs +++ b/raphtory/src/arrow/storage_interface/node.rs @@ -13,10 +13,7 @@ use crate::{ }, }; use itertools::Itertools; -use raphtory_arrow::{ - graph::TemporalGraph, graph_fragment::TempColGraphFragment, properties::Properties, - timestamps::TimeStamps, GidRef, GID, -}; +use raphtory_arrow::{graph::TemporalGraph, timestamps::TimeStamps, GidRef}; use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator}; use std::{iter, sync::Arc}; diff --git a/raphtory/src/arrow/storage_interface/nodes.rs b/raphtory/src/arrow/storage_interface/nodes.rs index 281b750832..899347cbea 100644 --- a/raphtory/src/arrow/storage_interface/nodes.rs +++ b/raphtory/src/arrow/storage_interface/nodes.rs @@ -3,9 +3,7 @@ use crate::{ core::entities::VID, }; -use raphtory_arrow::{ - graph::TemporalGraph, graph_fragment::TempColGraphFragment, properties::Properties, -}; +use raphtory_arrow::graph::TemporalGraph; use std::sync::Arc; #[derive(Clone, Debug)] diff --git a/raphtory/src/arrow/storage_interface/nodes_ref.rs b/raphtory/src/arrow/storage_interface/nodes_ref.rs index bc07730e27..441bdbad60 100644 --- a/raphtory/src/arrow/storage_interface/nodes_ref.rs +++ b/raphtory/src/arrow/storage_interface/nodes_ref.rs @@ -1,9 +1,6 @@ use crate::{arrow::storage_interface::node::ArrowNode, core::entities::VID}; -use raphtory_arrow::{ - graph::TemporalGraph, graph_fragment::TempColGraphFragment, properties::Properties, -}; +use raphtory_arrow::graph::TemporalGraph; use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; -use std::sync::Arc; #[derive(Copy, Clone, Debug)] pub struct ArrowNodesRef<'a> { diff --git a/raphtory/src/db/api/storage/storage_ops.rs b/raphtory/src/db/api/storage/storage_ops.rs index fd115279fa..e6e387d2e8 100644 --- a/raphtory/src/db/api/storage/storage_ops.rs +++ b/raphtory/src/db/api/storage/storage_ops.rs @@ -1,17 +1,3 @@ -#[cfg(feature = "arrow")] -use crate::{ - arrow::storage_interface::{ - edges::ArrowEdges, - edges_ref::ArrowEdgesRef, - node::{ArrowNode, ArrowOwnedNode}, - nodes::ArrowNodesOwned, - nodes_ref::ArrowNodesRef, - }, - db::api::storage::variants::storage_variants::StorageVariants, -}; -#[cfg(feature = "arrow")] -use raphtory_arrow::graph::TemporalGraph; - use crate::{ core::{ entities::{edges::edge_ref::EdgeRef, LayerIds, EID, VID}, @@ -45,6 +31,22 @@ use itertools::Itertools; use rayon::prelude::*; use std::iter; +#[cfg(feature = "arrow")] +use crate::{ + arrow::storage_interface::{ + edges::ArrowEdges, + edges_ref::ArrowEdgesRef, + node::{ArrowNode, ArrowOwnedNode}, + nodes::ArrowNodesOwned, + nodes_ref::ArrowNodesRef, + }, + db::api::storage::variants::storage_variants::StorageVariants, +}; +#[cfg(feature = "arrow")] +use raphtory_arrow::graph::TemporalGraph; +#[cfg(feature = "arrow")] +use std::sync::Arc; + #[derive(Debug, Clone)] pub enum GraphStorage { Mem(LockedGraph), diff --git a/raphtory/src/db/api/view/internal/core_ops.rs b/raphtory/src/db/api/view/internal/core_ops.rs index 15ef9cf494..97bc4d700d 100644 --- a/raphtory/src/db/api/view/internal/core_ops.rs +++ b/raphtory/src/db/api/view/internal/core_ops.rs @@ -26,9 +26,12 @@ use crate::{ }, }; use enum_dispatch::enum_dispatch; +use std::ops::Range; + #[cfg(feature = "arrow")] use raphtory_arrow::timestamps::TimeStamps; -use std::ops::Range; +#[cfg(feature = "arrow")] +use rayon::prelude::*; /// Core functions that should (almost-)always be implemented by pointing at the underlying graph. #[enum_dispatch] diff --git a/raphtory/src/db/task/node/eval_node.rs b/raphtory/src/db/task/node/eval_node.rs index b22383e83f..3c252467d8 100644 --- a/raphtory/src/db/task/node/eval_node.rs +++ b/raphtory/src/db/task/node/eval_node.rs @@ -12,7 +12,7 @@ use crate::{ api::{ properties::Properties, storage::storage_ops::GraphStorage, - view::{internal::OneHopFilter, Base, BaseNodeViewOps, BoxedLIter, IntoDynBoxed}, + view::{internal::OneHopFilter, BaseNodeViewOps, BoxedLIter, IntoDynBoxed}, }, graph::{edges::Edges, node::NodeView, path::PathFromNode}, task::{ diff --git a/raphtory/src/python/graph/edges.rs b/raphtory/src/python/graph/edges.rs index 5eed35478a..ccae2313c6 100644 --- a/raphtory/src/python/graph/edges.rs +++ b/raphtory/src/python/graph/edges.rs @@ -19,9 +19,8 @@ use crate::{ ArcStringIterable, ArcStringVecIterable, BoolIterable, I64Iterable, I64VecIterable, NestedArcStringIterable, NestedArcStringVecIterable, NestedBoolIterable, NestedI64VecIterable, NestedOptionI64Iterable, NestedU64U64Iterable, - NestedUtcDateTimeIterable, NestedVecUtcDateTimeIterable, OptionArcStringIterable, - OptionI64Iterable, OptionUtcDateTimeIterable, OptionVecUtcDateTimeIterable, - U64U64Iterable, + NestedUtcDateTimeIterable, NestedVecUtcDateTimeIterable, OptionI64Iterable, + OptionUtcDateTimeIterable, OptionVecUtcDateTimeIterable, U64U64Iterable, }, }, utils::{ From 4dcf413e08d58d2d0ab46685e2ee9467f0ddcb5f Mon Sep 17 00:00:00 2001 From: narnolddd Date: Tue, 28 May 2024 15:14:36 +0100 Subject: [PATCH 11/15] tidy up and add k merge optimisation --- .../global_temporal_three_node_motifs.rs | 44 +++-- .../local_temporal_three_node_motifs.rs | 163 +++++++++++++----- 2 files changed, 139 insertions(+), 68 deletions(-) diff --git a/raphtory/src/algorithms/motifs/global_temporal_three_node_motifs.rs b/raphtory/src/algorithms/motifs/global_temporal_three_node_motifs.rs index f8a618bad8..9867cb3734 100644 --- a/raphtory/src/algorithms/motifs/global_temporal_three_node_motifs.rs +++ b/raphtory/src/algorithms/motifs/global_temporal_three_node_motifs.rs @@ -36,9 +36,9 @@ where .collect(); let events = evv .edges() - .explode() .iter() - .sorted_by_key(|e| e.time_and_index()) + .map(|e| e.explode()) + .kmerge_by(|e1, e2| e1.time_and_index() < e2.time_and_index()) .map(|edge| { if edge.src().id() == evv.id() { star_event(neigh_map[&edge.dst().id()], 1, edge.time().unwrap()) @@ -84,8 +84,7 @@ where let events: Vec = out .iter() .flat_map(|e| e.explode()) - .chain(inc.iter().flat_map(|e| e.explode())) - .sorted_by_key(|e| e.time_and_index()) + .merge_by(inc.iter().flat_map(|e| e.explode()), |e1,e2| e1.time_and_index() < e2.time_and_index()) .map(|e| { two_node_event( if e.src().id() == evv.id() { 1 } else { 0 }, @@ -111,10 +110,12 @@ pub fn triangle_motifs(graph: &G, deltas: Vec, threads: Option) - where G: StaticGraphViewOps, { + // Create K-Core graph to recursively remove nodes of degree < 2 let node_set = k_core_set(graph, 2, usize::MAX, None); - let g: NodeSubgraph = graph.subgraph(node_set); - let mut ctx_sub: Context, ComputeStateVec> = Context::from(&g); + let kcore_subgraph: NodeSubgraph = graph.subgraph(node_set); + let mut ctx_subgraph: Context, ComputeStateVec> = Context::from(&kcore_subgraph); + // Triangle Accumulator let neighbours_set = accumulators::hash_set::(0); let tri_mc = deltas @@ -124,14 +125,14 @@ where let tri_clone = tri_mc.clone(); tri_mc.clone().iter().for_each(|mc| { - ctx_sub.global_agg::<[usize; 8], [usize; 8], [usize; 8], ArrConst, 8>>( + ctx_subgraph.global_agg::<[usize; 8], [usize; 8], [usize; 8], ArrConst, 8>>( *mc, ) }); - ctx_sub.agg(neighbours_set); + ctx_subgraph.agg(neighbours_set); - let step1 = ATask::new(move |u: &mut EvalNodeView, ()>| { + let neighbourhood_update_step = ATask::new(move |u: &mut EvalNodeView, ()>| { for v in u.neighbours() { if u.id() > v.id() { v.update(&neighbours_set, u.id()); @@ -140,7 +141,7 @@ where Step::Continue }); - let step2 = ATask::new(move |u: &mut EvalNodeView, ()>| { + let intersection_compute_step = ATask::new(move |u: &mut EvalNodeView, ()>| { for v in u.neighbours() { // Find triangles on the UV edge if u.id() > v.id() { @@ -172,14 +173,14 @@ where .into_iter() .sorted() .permutations(2) - .flat_map(|e| { + .map(|e| { u.graph() .edge(*e.first().unwrap(), *e.get(1).unwrap()) .iter() .flat_map(|edge| edge.explode()) .collect::>() }) - .sorted_by_key(|e| e.time_and_index()) + .kmerge_by(|e1, e2| e1.time_and_index() < e2.time_and_index()) .map(|e| { let (src_id, dst_id) = (e.src().id(), e.dst().id()); let uid = u.id(); @@ -219,11 +220,11 @@ where Step::Continue }); - let mut runner: TaskRunner, _> = TaskRunner::new(ctx_sub); + let mut runner: TaskRunner, _> = TaskRunner::new(ctx_subgraph); runner.run( - vec![Job::new(step1)], - vec![Job::new(step2)], + vec![Job::new(neighbourhood_update_step)], + vec![Job::new(intersection_compute_step)], None, |egs, _, _, _| { tri_mc.iter().map(|mc| egs.finalize::<[usize; 8], [usize;8], [usize; 8], ArrConst,8>>(mc)).collect_vec() @@ -252,12 +253,11 @@ where .collect_vec(); let star_clone = star_mc.clone(); - star_mc.iter().for_each(|mc| ctx.global_agg(*mc)); - let out1 = triangle_motifs(g, deltas.clone(), threads); + let triadic_motifs = triangle_motifs(g, deltas.clone(), threads); - let step1 = ATask::new(move |evv: &mut EvalNodeView| { + let star_count_step = ATask::new(move |evv: &mut EvalNodeView| { let star_nodes = star_motif_count(evv, deltas.clone()); for (i, star) in star_nodes.iter().enumerate() { evv.global_update(&star_mc[i], *star); @@ -266,14 +266,12 @@ where }); let mut runner: TaskRunner = TaskRunner::new(ctx); - // let star_ref = &star_mc; - runner.run( vec![], - vec![Job::new(step1)], + vec![Job::new(star_count_step)], None, |egs, _ , _ , _ | { - out1.iter().enumerate().map(|(i,tri)| { + triadic_motifs.iter().enumerate().map(|(i,tri)| { let mut tmp = egs.finalize::<[usize; 32], [usize;32], [usize; 32], ArrConst,32>>(&star_clone[i]) .iter().copied() .collect_vec(); @@ -370,4 +368,4 @@ mod motifs_test { #[cfg(feature = "arrow")] test(&arrow_graph); } -} +} \ No newline at end of file diff --git a/raphtory/src/algorithms/motifs/local_temporal_three_node_motifs.rs b/raphtory/src/algorithms/motifs/local_temporal_three_node_motifs.rs index b8ecc03656..eca200b892 100644 --- a/raphtory/src/algorithms/motifs/local_temporal_three_node_motifs.rs +++ b/raphtory/src/algorithms/motifs/local_temporal_three_node_motifs.rs @@ -101,9 +101,9 @@ where .collect(); let events = evv .edges() - .explode() .iter() - .sorted_by_key(|e| e.time_and_index()) + .map(|e| e.explode()) + .kmerge_by(|e1, e2| e1.time_and_index() < e2.time_and_index()) .map(|edge| { if edge.src().id() == evv.id() { star_event(neigh_map[&edge.dst().id()], 1, edge.time().unwrap()) @@ -136,11 +136,6 @@ where { let mut results = deltas.iter().map(|_| [0; 8]).collect::>(); - // Define a closure for sorting by time_and_index() - let _sort_by_time_and_index = |e1: &EdgeView, e2: &EdgeView| -> Ordering { - Ord::cmp(&e1.time_and_index(), &e2.time_and_index()) - }; - for nb in evv.neighbours().into_iter() { let nb_id = nb.id(); let out = evv.graph().edge(evv.id(), nb_id); @@ -148,8 +143,7 @@ where let events: Vec = out .iter() .flat_map(|e| e.explode()) - .chain(inc.iter().flat_map(|e| e.explode())) - .sorted_by_key(|e| e.time_and_index()) + .merge_by(inc.iter().flat_map(|e| e.explode()), |e1,e2| e1.time_and_index() < e2.time_and_index()) .map(|e| { two_node_event( if e.src().id() == evv.id() { 1 } else { 0 }, @@ -182,22 +176,16 @@ where { let delta_len = deltas.len(); - // Define a closure for sorting by time_and_index() - let _sort_by_time_and_index = - |e1: &EdgeView, NodeSubgraph>, - e2: &EdgeView, NodeSubgraph>| - -> Ordering { Ord::cmp(&e1.time_and_index(), &e2.time_and_index()) }; - - // Define a closure for sorting by time() + // Create K-Core graph to recursively remove nodes of degree < 2 let node_set = k_core_set(graph, 2, usize::MAX, None); - let g: NodeSubgraph = graph.subgraph(node_set); - let mut ctx: Context, ComputeStateVec> = Context::from(&g); + let kcore_subgraph: NodeSubgraph = graph.subgraph(node_set); + let mut ctx_subgraph: Context, ComputeStateVec> = Context::from(&kcore_subgraph); + // Triangle Accumulator let neighbours_set = accumulators::hash_set::(1); + ctx_subgraph.agg(neighbours_set); - ctx.agg(neighbours_set); - - let step1 = ATask::new(move |u: &mut EvalNodeView, MotifCounter>| { + let neighbourhood_update_step = ATask::new(move |u: &mut EvalNodeView, MotifCounter>| { for v in u.neighbours() { if u.id() > v.id() { v.update(&neighbours_set, u.id()); @@ -206,7 +194,7 @@ where Step::Continue }); - let step2 = ATask::new(move |u: &mut EvalNodeView, MotifCounter>| { + let intersection_compute_step = ATask::new(move |u: &mut EvalNodeView, MotifCounter>| { let uu = u.get_mut(); if uu.triangle.is_empty() { uu.triangle = vec![[0usize; 8]; delta_len]; @@ -225,7 +213,7 @@ where if intersection_nbs.is_empty() { continue; } - // let mut nb_ct = 0; + intersection_nbs.iter().for_each(|w| { // For each triangle, run the triangle count. @@ -233,13 +221,13 @@ where .into_iter() .sorted() .permutations(2) - .flat_map(|e| { - g.edge(*e.first().unwrap(), *e.get(1).unwrap()) + .map(|e| { + u.graph().edge(*e.first().unwrap(), *e.get(1).unwrap()) .iter() .flat_map(|edge| edge.explode()) .collect::>() }) - .sorted_by_key(|e| e.time_and_index()) + .kmerge_by(|e1, e2| e1.time_and_index() < e2.time_and_index()) .map(|e| { let (src_id, dst_id) = (e.src().id(), e.dst().id()); let (uid, _vid) = (u.id(), v.id()); @@ -271,15 +259,11 @@ where let delta = deltas[i]; let mut tri_count = init_tri_count(2); tri_count.execute(&all_exploded, delta); - let tmp_counts: Iter = tri_count.return_counts().iter(); - - // Triangle counts are going to be WRONG without w - // update_counter(&mut vec![u, &v], motifs_count_id, tmp_counts); - + let intermediate_counts: Iter = tri_count.return_counts().iter(); let mc_u = u.get_mut(); let triangle_u = mc_u.triangle[i] .iter() - .zip(tmp_counts.clone()) + .zip(intermediate_counts.clone()) .map(|(&i1, &i2)| i1 + i2) .collect::>() .try_into() @@ -291,11 +275,11 @@ where Step::Continue }); - let mut runner: TaskRunner, _> = TaskRunner::new(ctx); + let mut runner: TaskRunner, _> = TaskRunner::new(ctx_subgraph); runner.run( - vec![Job::new(step1)], - vec![Job::read_only(step2)], + vec![Job::new(neighbourhood_update_step)], + vec![Job::read_only(intersection_compute_step)], None, |_, _, _els, mut local| { let mut tri_motifs = HashMap::new(); @@ -333,9 +317,9 @@ where ctx.agg(motifs_counter); - let out1 = triangle_motifs(g, deltas.clone(), motifs_counter, threads); + let triadic_motifs = triangle_motifs(g, deltas.clone(), motifs_counter, threads); - let step1 = ATask::new(move |evv: &mut EvalNodeView| { + let star_motif_step = ATask::new(move |evv: &mut EvalNodeView| { let two_nodes = twonode_motif_count(evv, deltas.clone()); let star_nodes = star_motif_count(evv, deltas.clone()); @@ -352,14 +336,17 @@ where let mut runner: TaskRunner = TaskRunner::new(ctx); runner.run( - vec![Job::new(step1)], + vec![Job::new(star_motif_step)], vec![], None, |_, _, _els, local| { let mut motifs = HashMap::new(); for (vref, mc) in enumerate(local) { + if mc.two_nodes.is_empty() || mc.star_nodes.is_empty() { + continue; + } let v_gid = g.node_name(vref.into()); - let triangles = out1 + let triangles = triadic_motifs .get(&v_gid) .cloned() .unwrap_or_else(|| vec![[0usize; 8]; delta_len]); @@ -408,9 +395,8 @@ mod motifs_test { graph } - #[test] - fn test_local_motif() { - let graph = load_graph(vec![ + fn load_sample_graph() -> Graph { + let edges = vec![ (1, 1, 2), (2, 1, 3), (3, 1, 4), @@ -434,7 +420,13 @@ mod motifs_test { (21, 11, 1), (22, 9, 11), (23, 11, 9), - ]); + ]; + load_graph(edges) + } + + #[test] + fn test_local_motif() { + let graph = load_sample_graph(); let test_dir = TempDir::new().unwrap(); #[cfg(feature = "arrow")] @@ -445,6 +437,7 @@ mod motifs_test { let actual = binding .iter() .map(|(k, v)| (k, v[0].clone())) + .into_iter() .collect::>>(); let expected: HashMap> = HashMap::from([ @@ -527,15 +520,95 @@ mod motifs_test { ), ]); - for ind in 3..12 { + for ind in 1..12 { assert_eq!( actual.get(&ind.to_string()).unwrap(), expected.get(&ind.to_string()).unwrap() ); } } - test(&graph); #[cfg(feature = "arrow")] test(&arrow_graph); + test(&graph); + } + + #[test] + fn test_windowed_graph() { + let g = load_sample_graph(); + let g_windowed = g.before(11).after(0); + println! {"windowed graph has {:?} vertices",g_windowed.count_nodes()} + + let binding = temporal_three_node_motif(&g_windowed, Vec::from([10]), None); + let actual = binding + .iter() + .map(|(k, v)| (k, v[0].clone())) + .into_iter() + .collect::>>(); + + let expected: HashMap> = HashMap::from([ + ( + "1".to_string(), + vec![ + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, + ], + ), + ( + "2".to_string(), + vec![ + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + ], + ), + ( + "3".to_string(), + vec![ + 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 2, 0, + ], + ), + ( + "4".to_string(), + vec![ + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 2, 0, + ], + ), + ( + "5".to_string(), + vec![ + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, + ], + ), + ( + "6".to_string(), + vec![ + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + ], + ), + ( + "7".to_string(), + vec![ + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + ], + ), + ( + "8".to_string(), + vec![ + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + ], + ), + ]); + + for ind in 1..8 { + assert_eq!( + actual.get(&ind.to_string()).unwrap(), + expected.get(&ind.to_string()).unwrap() + ); + } } } From f74449bdb17b7137e89cf2f780fdde94f802a22e Mon Sep 17 00:00:00 2001 From: narnolddd Date: Tue, 28 May 2024 15:15:06 +0100 Subject: [PATCH 12/15] run fmt --- .../global_temporal_three_node_motifs.rs | 16 +- .../local_temporal_three_node_motifs.rs | 180 +++++++++--------- 2 files changed, 103 insertions(+), 93 deletions(-) diff --git a/raphtory/src/algorithms/motifs/global_temporal_three_node_motifs.rs b/raphtory/src/algorithms/motifs/global_temporal_three_node_motifs.rs index 9867cb3734..b5a3aab6b8 100644 --- a/raphtory/src/algorithms/motifs/global_temporal_three_node_motifs.rs +++ b/raphtory/src/algorithms/motifs/global_temporal_three_node_motifs.rs @@ -84,7 +84,9 @@ where let events: Vec = out .iter() .flat_map(|e| e.explode()) - .merge_by(inc.iter().flat_map(|e| e.explode()), |e1,e2| e1.time_and_index() < e2.time_and_index()) + .merge_by(inc.iter().flat_map(|e| e.explode()), |e1, e2| { + e1.time_and_index() < e2.time_and_index() + }) .map(|e| { two_node_event( if e.src().id() == evv.id() { 1 } else { 0 }, @@ -113,7 +115,8 @@ where // Create K-Core graph to recursively remove nodes of degree < 2 let node_set = k_core_set(graph, 2, usize::MAX, None); let kcore_subgraph: NodeSubgraph = graph.subgraph(node_set); - let mut ctx_subgraph: Context, ComputeStateVec> = Context::from(&kcore_subgraph); + let mut ctx_subgraph: Context, ComputeStateVec> = + Context::from(&kcore_subgraph); // Triangle Accumulator let neighbours_set = accumulators::hash_set::(0); @@ -125,9 +128,10 @@ where let tri_clone = tri_mc.clone(); tri_mc.clone().iter().for_each(|mc| { - ctx_subgraph.global_agg::<[usize; 8], [usize; 8], [usize; 8], ArrConst, 8>>( - *mc, - ) + ctx_subgraph + .global_agg::<[usize; 8], [usize; 8], [usize; 8], ArrConst, 8>>( + *mc, + ) }); ctx_subgraph.agg(neighbours_set); @@ -368,4 +372,4 @@ mod motifs_test { #[cfg(feature = "arrow")] test(&arrow_graph); } -} \ No newline at end of file +} diff --git a/raphtory/src/algorithms/motifs/local_temporal_three_node_motifs.rs b/raphtory/src/algorithms/motifs/local_temporal_three_node_motifs.rs index eca200b892..1bf923b563 100644 --- a/raphtory/src/algorithms/motifs/local_temporal_three_node_motifs.rs +++ b/raphtory/src/algorithms/motifs/local_temporal_three_node_motifs.rs @@ -143,7 +143,9 @@ where let events: Vec = out .iter() .flat_map(|e| e.explode()) - .merge_by(inc.iter().flat_map(|e| e.explode()), |e1,e2| e1.time_and_index() < e2.time_and_index()) + .merge_by(inc.iter().flat_map(|e| e.explode()), |e1, e2| { + e1.time_and_index() < e2.time_and_index() + }) .map(|e| { two_node_event( if e.src().id() == evv.id() { 1 } else { 0 }, @@ -179,101 +181,105 @@ where // Create K-Core graph to recursively remove nodes of degree < 2 let node_set = k_core_set(graph, 2, usize::MAX, None); let kcore_subgraph: NodeSubgraph = graph.subgraph(node_set); - let mut ctx_subgraph: Context, ComputeStateVec> = Context::from(&kcore_subgraph); + let mut ctx_subgraph: Context, ComputeStateVec> = + Context::from(&kcore_subgraph); // Triangle Accumulator let neighbours_set = accumulators::hash_set::(1); ctx_subgraph.agg(neighbours_set); - let neighbourhood_update_step = ATask::new(move |u: &mut EvalNodeView, MotifCounter>| { - for v in u.neighbours() { - if u.id() > v.id() { - v.update(&neighbours_set, u.id()); - } - } - Step::Continue - }); - - let intersection_compute_step = ATask::new(move |u: &mut EvalNodeView, MotifCounter>| { - let uu = u.get_mut(); - if uu.triangle.is_empty() { - uu.triangle = vec![[0usize; 8]; delta_len]; - } - for v in u.neighbours() { - // Find triangles on the UV edge - let intersection_nbs: Vec = match v.entry(&neighbours_set).read_ref() { - Some(v_set) => { - let u_set = FxHashSet::from_iter(u.neighbours().id()); - let intersection = u_set.intersection(v_set).cloned().collect::>(); - intersection + let neighbourhood_update_step = + ATask::new(move |u: &mut EvalNodeView, MotifCounter>| { + for v in u.neighbours() { + if u.id() > v.id() { + v.update(&neighbours_set, u.id()); } - None => vec![], - }; - - if intersection_nbs.is_empty() { - continue; } + Step::Continue + }); + + let intersection_compute_step = + ATask::new(move |u: &mut EvalNodeView, MotifCounter>| { + let uu = u.get_mut(); + if uu.triangle.is_empty() { + uu.triangle = vec![[0usize; 8]; delta_len]; + } + for v in u.neighbours() { + // Find triangles on the UV edge + let intersection_nbs: Vec = match v.entry(&neighbours_set).read_ref() { + Some(v_set) => { + let u_set = FxHashSet::from_iter(u.neighbours().id()); + let intersection = u_set.intersection(v_set).cloned().collect::>(); + intersection + } + None => vec![], + }; + + if intersection_nbs.is_empty() { + continue; + } - intersection_nbs.iter().for_each(|w| { - // For each triangle, run the triangle count. - - let all_exploded = vec![u.id(), v.id(), *w] - .into_iter() - .sorted() - .permutations(2) - .map(|e| { - u.graph().edge(*e.first().unwrap(), *e.get(1).unwrap()) + intersection_nbs.iter().for_each(|w| { + // For each triangle, run the triangle count. + + let all_exploded = vec![u.id(), v.id(), *w] + .into_iter() + .sorted() + .permutations(2) + .map(|e| { + u.graph() + .edge(*e.first().unwrap(), *e.get(1).unwrap()) + .iter() + .flat_map(|edge| edge.explode()) + .collect::>() + }) + .kmerge_by(|e1, e2| e1.time_and_index() < e2.time_and_index()) + .map(|e| { + let (src_id, dst_id) = (e.src().id(), e.dst().id()); + let (uid, _vid) = (u.id(), v.id()); + if src_id == *w { + new_triangle_edge( + false, + if dst_id == uid { 0 } else { 1 }, + 0, + 0, + e.time().unwrap(), + ) + } else if dst_id == *w { + new_triangle_edge( + false, + if src_id == uid { 0 } else { 1 }, + 0, + 1, + e.time().unwrap(), + ) + } else if src_id == uid { + new_triangle_edge(true, 1, 0, 1, e.time().unwrap()) + } else { + new_triangle_edge(true, 0, 0, 0, e.time().unwrap()) + } + }) + .collect::>(); + + for i in 0..deltas.len() { + let delta = deltas[i]; + let mut tri_count = init_tri_count(2); + tri_count.execute(&all_exploded, delta); + let intermediate_counts: Iter = tri_count.return_counts().iter(); + let mc_u = u.get_mut(); + let triangle_u = mc_u.triangle[i] .iter() - .flat_map(|edge| edge.explode()) - .collect::>() - }) - .kmerge_by(|e1, e2| e1.time_and_index() < e2.time_and_index()) - .map(|e| { - let (src_id, dst_id) = (e.src().id(), e.dst().id()); - let (uid, _vid) = (u.id(), v.id()); - if src_id == *w { - new_triangle_edge( - false, - if dst_id == uid { 0 } else { 1 }, - 0, - 0, - e.time().unwrap(), - ) - } else if dst_id == *w { - new_triangle_edge( - false, - if src_id == uid { 0 } else { 1 }, - 0, - 1, - e.time().unwrap(), - ) - } else if src_id == uid { - new_triangle_edge(true, 1, 0, 1, e.time().unwrap()) - } else { - new_triangle_edge(true, 0, 0, 0, e.time().unwrap()) - } - }) - .collect::>(); - - for i in 0..deltas.len() { - let delta = deltas[i]; - let mut tri_count = init_tri_count(2); - tri_count.execute(&all_exploded, delta); - let intermediate_counts: Iter = tri_count.return_counts().iter(); - let mc_u = u.get_mut(); - let triangle_u = mc_u.triangle[i] - .iter() - .zip(intermediate_counts.clone()) - .map(|(&i1, &i2)| i1 + i2) - .collect::>() - .try_into() - .unwrap(); - mc_u.triangle[i] = triangle_u; - } - }) - } - Step::Continue - }); + .zip(intermediate_counts.clone()) + .map(|(&i1, &i2)| i1 + i2) + .collect::>() + .try_into() + .unwrap(); + mc_u.triangle[i] = triangle_u; + } + }) + } + Step::Continue + }); let mut runner: TaskRunner, _> = TaskRunner::new(ctx_subgraph); From 1d8492566bb4a40ced46f06b324405ad9d6c6623 Mon Sep 17 00:00:00 2001 From: narnolddd Date: Tue, 28 May 2024 15:37:24 +0100 Subject: [PATCH 13/15] remove single threaded version to avoid confusion --- raphtory/src/algorithms/motifs/mod.rs | 1 - .../motifs/three_node_local_single_thread.rs | 499 ------------------ 2 files changed, 500 deletions(-) delete mode 100644 raphtory/src/algorithms/motifs/three_node_local_single_thread.rs diff --git a/raphtory/src/algorithms/motifs/mod.rs b/raphtory/src/algorithms/motifs/mod.rs index ebe5a51810..2f06d08608 100644 --- a/raphtory/src/algorithms/motifs/mod.rs +++ b/raphtory/src/algorithms/motifs/mod.rs @@ -1,7 +1,6 @@ pub mod global_temporal_three_node_motifs; pub mod local_temporal_three_node_motifs; pub mod local_triangle_count; -pub mod three_node_local_single_thread; pub mod three_node_motifs; pub mod triangle_count; pub mod triplet_count; diff --git a/raphtory/src/algorithms/motifs/three_node_local_single_thread.rs b/raphtory/src/algorithms/motifs/three_node_local_single_thread.rs deleted file mode 100644 index 80a52e27e3..0000000000 --- a/raphtory/src/algorithms/motifs/three_node_local_single_thread.rs +++ /dev/null @@ -1,499 +0,0 @@ -use crate::algorithms::algorithm_result::AlgorithmResult; -/// This class regards the counting of the number of three edge, up-to-three node delta-temporal motifs in the graph, using the algorithm of Paranjape et al, Motifs in Temporal Networks (2017). -/// We point the reader to this reference for more information on the algorithm and background, but provide a short summary below. -/// -/// ## Motifs included -/// -/// ### Stars -/// -/// There are three classes (in the order they are outputted) of star motif on three nodes based on the switching behaviour of the edges between the two leaf nodes. -/// -/// - PRE: Stars of the form i<->j, i<->j, i<->k (ie two interactions with leaf j followed by one with leaf k) -/// - MID: Stars of the form i<->j, i<->k, i<->j (ie switching interactions from leaf j to leaf k, back to j again) -/// - POST: Stars of the form i<->j, i<->k, i<->k (ie one interaction with leaf j followed by two with leaf k) -/// -/// Within each of these classes is 8 motifs depending on the direction of the first to the last edge -- incoming "I" or outgoing "O". -/// These are enumerated in the order III, IIO, IOI, IOO, OII, OIO, OOI, OOO (like binary with "I"-0 and "O"-1). -/// -/// ### Two node motifs -/// -/// Also included are two node motifs, of which there are 8 when counted from the perspective of each node. These are characterised by the direction of each edge, enumerated -/// in the above order. Note that for the global graph counts, each motif is counted in both directions (a single III motif for one node is an OOO motif for the other node). -/// -/// ### Triangles -/// -/// There are 8 triangle motifs: -/// -/// 1. i --> j, k --> j, i --> k -/// 2. i --> j, k --> i, j --> k -/// 3. i --> j, j --> k, i --> k -/// 4. i --> j, i --> k, j --> k -/// 5. i --> j, k --> j, k --> i -/// 6. i --> j, k --> i, k --> j -/// 7. i --> j, j --> k, k --> i -/// 8. i --> j, i --> k, k --> j -/// -use crate::{algorithms::motifs::three_node_motifs::*, db::api::view::*}; -use std::collections::HashMap; - -fn star_motif_count<'graph, G: GraphViewOps<'graph>>(graph: &G, v: u64, delta: i64) -> [usize; 24] { - if let Some(node) = graph.node(v) { - let neigh_map: HashMap = node - .neighbours() - .iter() - .enumerate() - .map(|(num, nb)| (nb.id(), num)) - .collect(); - let mut exploded_edges = node - .edges() - .explode() - .iter() - .map(|edge| { - if edge.src().id() == v { - star_event(neigh_map[&edge.dst().id()], 1, edge.time().unwrap()) - } else { - star_event(neigh_map[&edge.src().id()], 0, edge.time().unwrap()) - } - }) - .collect::>(); - exploded_edges.sort_by_key(|e| e.time); - let mut star_count = init_star_count(neigh_map.len()); - star_count.execute(&exploded_edges, delta); - star_count.return_counts() - } else { - [0; 24] - } -} - -fn twonode_motif_count<'graph, G: GraphViewOps<'graph>>( - graph: &G, - v: u64, - delta: i64, -) -> [usize; 8] { - let mut counts = [0; 8]; - if let Some(node) = graph.node(v) { - for nb in node.neighbours().iter() { - let nb_id = nb.id(); - let out = graph.edge(node.id(), nb_id); - let inc = graph.edge(nb_id, node.id()); - let mut all_exploded = match (out, inc) { - (Some(o), Some(i)) => o - .explode() - .iter() - .chain(i.explode()) - .map(|e| { - two_node_event(if e.src().id() == v { 1 } else { 0 }, e.time().unwrap()) - }) - .collect::>(), - (Some(o), None) => o - .explode() - .iter() - .map(|e| two_node_event(1, e.time().unwrap())) - .collect::>(), - (None, Some(i)) => i - .explode() - .iter() - .map(|e| two_node_event(0, e.time().unwrap())) - .collect::>(), - (None, None) => Vec::new(), - }; - all_exploded.sort_by_key(|e| e.time); - let mut two_node_counter = init_two_node_count(); - two_node_counter.execute(&all_exploded, delta); - let two_node_result = two_node_counter.return_counts(); - for i in 0..8 { - counts[i] += two_node_result[i]; - } - } - } - counts -} - -fn triangle_motif_count<'graph, G: GraphViewOps<'graph>>( - graph: &G, - delta: i64, -) -> AlgorithmResult, Vec> { - let mut counts: HashMap> = HashMap::new(); - for u in graph.nodes() { - counts.insert(u.id(), vec![0; 8]); - } - for u in graph.nodes() { - let uid = u.id(); - for v in u.neighbours().iter().filter(|x| x.id() > uid) { - for nb in u.neighbours().iter().filter(|x| x.id() > v.id()) { - let mut tri_edges: Vec = Vec::new(); - let out = graph.edge(v.id(), nb.id()); - let inc = graph.edge(nb.id(), v.id()); - // The following code checks for triangles - match (out, inc) { - (Some(o), Some(i)) => { - tri_edges.append( - &mut o - .explode() - .iter() - .map(|e| new_triangle_edge(false, 1, 0, 1, e.time().unwrap())) - .collect::>(), - ); - tri_edges.append( - &mut i - .explode() - .iter() - .map(|e| new_triangle_edge(false, 1, 0, 0, e.time().unwrap())) - .collect::>(), - ); - } - (Some(o), None) => { - tri_edges.append( - &mut o - .explode() - .iter() - .map(|e| new_triangle_edge(false, 1, 0, 1, e.time().unwrap())) - .collect::>(), - ); - } - (None, Some(i)) => { - tri_edges.append( - &mut i - .explode() - .iter() - .map(|e| new_triangle_edge(false, 1, 0, 0, e.time().unwrap())) - .collect::>(), - ); - } - (None, None) => { - continue; - } - } - if !tri_edges.is_empty() { - let uout = graph.edge(uid, nb.id()); - let uin = graph.edge(nb.id(), uid); - match (uout, uin) { - (Some(o), Some(i)) => { - tri_edges.append( - &mut o - .explode() - .iter() - .map(|e| new_triangle_edge(false, 0, 0, 1, e.time().unwrap())) - .collect::>(), - ); - tri_edges.append( - &mut i - .explode() - .iter() - .map(|e| new_triangle_edge(false, 0, 0, 0, e.time().unwrap())) - .collect::>(), - ); - } - (Some(o), None) => { - tri_edges.append( - &mut o - .explode() - .iter() - .map(|e| new_triangle_edge(false, 0, 0, 1, e.time().unwrap())) - .collect::>(), - ); - } - (None, Some(i)) => { - tri_edges.append( - &mut i - .explode() - .iter() - .map(|e| new_triangle_edge(false, 0, 0, 0, e.time().unwrap())) - .collect::>(), - ); - } - (None, None) => { - continue; - } - } - // found triangle at this point!! - let u_to_v = match graph.edge(uid, v.id()) { - Some(edge) => { - let r = edge - .explode() - .iter() - .map(|e| new_triangle_edge(true, 1, 0, 1, e.time().unwrap())) - .collect::>(); - r.into_iter() - } - None => vec![].into_iter(), - }; - let v_to_u = match graph.edge(v.id(), uid) { - Some(edge) => { - let r = edge - .explode() - .iter() - .map(|e| new_triangle_edge(true, 0, 0, 0, e.time().unwrap())) - .collect::>(); - r.into_iter() - } - None => vec![].into_iter(), - }; - tri_edges.append(&mut u_to_v.collect::>()); - tri_edges.append(&mut v_to_u.collect::>()); - tri_edges.sort_by_key(|e| e.time); - - let mut tri_count = init_tri_count(1); - tri_count.execute(&tri_edges, delta); - let tmp_counts = tri_count.return_counts().iter(); - for id in [uid, v.id(), nb.id()] { - counts.insert( - id, - counts - .get(&id) - .unwrap() - .iter() - .zip(tmp_counts.clone()) - .map(|(&i1, &i2)| i1 + i2) - .collect::>(), - ); - } - } - } - } - } - - // Made this as i did not want to modify/damage the above working algorithm - let new_counts: HashMap> = counts - .iter() - .map(|(uid, val)| (graph.node(*uid).unwrap().node.0, val.to_owned())) - .collect(); - - let results_type = std::any::type_name::>(); - AlgorithmResult::new( - graph.clone(), - "Three node local single thread", - results_type, - new_counts, - ) -} - -/// Computes the number of each type of motif that each node participates in. -/// -/// # Arguments -/// -/// * `g` - A reference to the graph -/// * `delta` - Maximum time difference between the first and last edge of the -/// motif. NB if time for edges was given as a UNIX epoch, this should be given in seconds, otherwise -/// milliseconds should be used (if edge times were given as string) -/// -/// Returns: -/// -/// A dictionary with node ids (u64) as keys and a 40 dimensional array of motif counts as a value. The first 24 elements are star counts, -/// the next 8 are two-node motif counts and the final 8 are triangle counts. -/// -/// # Notes -/// -/// For this local count, a node is counted as participating in a motif in the following way. For star motifs, only the centre node counts -/// the motif. For two node motifs, both constituent nodes count the motif. For triangles, all three constituent nodes count the motif. -/// -/// -pub fn local_temporal_three_node_motifs<'graph, G: GraphViewOps<'graph>>( - graph: &G, - delta: i64, -) -> AlgorithmResult, Vec> { - let mut counts = triangle_motif_count(graph, delta); - - for v in graph.nodes() { - let vid = v.id(); - let two_nodes = twonode_motif_count(graph, vid, delta).to_vec(); - let tmp_stars = star_motif_count(graph, vid, delta); - let stars: Vec = tmp_stars - .iter() - .zip(two_nodes.iter().cycle().take(24)) - .map(|(&x1, &x2)| x1 - x2) - .collect(); - let mut final_cts = Vec::new(); - final_cts.extend(stars.into_iter()); - final_cts.extend(two_nodes.into_iter()); - final_cts.extend(counts.get(v.clone()).unwrap().iter()); - counts.result.insert(v.node.0, final_cts); - } - - let results_type = std::any::type_name::>(); - AlgorithmResult::new( - graph.clone(), - "Three node local single thread", - results_type, - counts.result, - ) -} - -/// Computes the number of each type of motif there is in the graph. -/// -/// # Arguments -/// -/// * `g` - A reference to the graph -/// * `delta` - Maximum time difference between the first and last edge of the -/// motif. NB if time for edges was given as a UNIX epoch, this should be given in seconds, otherwise -/// milliseconds should be used (if edge times were given as string) -/// -/// Returns: -/// -/// A 40 dimensional array with the counts of each motif, given in the same order as described in the class summary. Note that the two-node motif counts are symmetrical so it may be more useful just to consider the first four elements. -/// -/// # Notes -/// -/// This is achieved by calling the local motif counting algorithm, summing the resulting arrays and dealing with overcounted motifs: the triangles (by dividing each motif count by three) and two-node motifs (dividing by two). -/// -/// -pub fn global_temporal_three_node_motifs<'graph, G: GraphViewOps<'graph>>( - graph: &G, - delta: i64, -) -> Vec { - let counts = local_temporal_three_node_motifs(graph, delta) - .get_all_values() - .to_owned(); - let mut tmp_counts = counts.iter().fold(vec![0; 40], |acc, x| { - acc.iter() - .zip(x.iter()) - .map(|(x1, x2)| x1 + x2) - .collect::>() - }); - for ind in 32..40 { - tmp_counts[ind] /= 3; - } - tmp_counts -} - -#[cfg(test)] -mod local_motif_test { - use crate::{ - algorithms::motifs::three_node_local_single_thread::*, - db::{api::mutation::AdditionOps, graph::graph::Graph}, - prelude::NO_PROPS, - }; - use tempfile::TempDir; - - #[test] - fn test_init() { - let graph = Graph::new(); - - let vs = vec![ - (1, 2, 1), - (1, 3, 2), - (1, 4, 3), - (3, 1, 4), - (3, 4, 5), - (3, 5, 6), - (4, 5, 7), - (5, 6, 8), - (5, 8, 9), - (7, 5, 10), - (8, 5, 11), - (1, 9, 12), - (9, 1, 13), - (6, 3, 14), - (4, 8, 15), - (8, 3, 16), - (5, 10, 17), - (10, 5, 18), - (10, 8, 19), - (1, 11, 20), - (11, 1, 21), - (9, 11, 22), - (11, 9, 23), - ]; - - for (src, dst, time) in &vs { - graph.add_edge(*time, *src, *dst, NO_PROPS, None).unwrap(); - } - - let test_dir = TempDir::new().unwrap(); - #[cfg(feature = "arrow")] - let arrow_graph = graph.persist_as_arrow(test_dir.path()).unwrap(); - - fn test(graph: &G) { - // let counts = star_motif_count(&graph, 1, 100); - let counts_result = local_temporal_three_node_motifs(graph, 10); - // FIXME: Should test this - let _global_counts = global_temporal_three_node_motifs(graph, 10); - let expected: HashMap> = HashMap::from([ - ( - 1, - vec![ - 0, 0, 0, 0, 1, 2, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 2, 0, - ], - ), - ( - 10, - vec![ - 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 1, 0, 1, - ], - ), - ( - 11, - vec![ - 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 0, 1, 0, - ], - ), - ( - 2, - vec![ - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - ], - ), - ( - 3, - vec![ - 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 2, 0, 1, 2, 0, - ], - ), - ( - 4, - vec![ - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 0, 1, 2, 0, - ], - ), - ( - 5, - vec![ - 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 4, 0, 0, 0, 3, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 1, 2, 1, 3, 0, 1, 1, 1, - ], - ), - ( - 6, - vec![ - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, - ], - ), - ( - 7, - vec![ - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - ], - ), - ( - 8, - vec![ - 0, 0, 2, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 1, 2, 1, 2, 0, 1, 0, 1, - ], - ), - ( - 9, - vec![ - 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 0, 1, 0, - ], - ), - ]); - for ind in 1..12 { - assert_eq!(counts_result.get(ind).unwrap(), expected.get(&ind).unwrap()) - } - // print!("{:?}", global_counts); - } - test(&graph); - #[cfg(feature = "arrow")] - test(&arrow_graph); - } -} From bbbf399afdaba1df5024f7aeca7515dc1d4ad035 Mon Sep 17 00:00:00 2001 From: narnolddd Date: Mon, 3 Jun 2024 16:46:07 +0100 Subject: [PATCH 14/15] caught n squared subroutine in local motifs and also noticed index for triangles wasn't matching with api description --- .../global_temporal_three_node_motifs.rs | 3 +- .../local_temporal_three_node_motifs.rs | 240 ++++++++++++------ raphtory/src/python/packages/algorithms.rs | 8 +- 3 files changed, 166 insertions(+), 85 deletions(-) diff --git a/raphtory/src/algorithms/motifs/global_temporal_three_node_motifs.rs b/raphtory/src/algorithms/motifs/global_temporal_three_node_motifs.rs index b5a3aab6b8..6e28a692f7 100644 --- a/raphtory/src/algorithms/motifs/global_temporal_three_node_motifs.rs +++ b/raphtory/src/algorithms/motifs/global_temporal_three_node_motifs.rs @@ -120,6 +120,7 @@ where // Triangle Accumulator let neighbours_set = accumulators::hash_set::(0); + ctx_subgraph.agg(neighbours_set); let tri_mc = deltas .iter() @@ -134,8 +135,6 @@ where ) }); - ctx_subgraph.agg(neighbours_set); - let neighbourhood_update_step = ATask::new(move |u: &mut EvalNodeView, ()>| { for v in u.neighbours() { if u.id() > v.id() { diff --git a/raphtory/src/algorithms/motifs/local_temporal_three_node_motifs.rs b/raphtory/src/algorithms/motifs/local_temporal_three_node_motifs.rs index 1bf923b563..4281cc7373 100644 --- a/raphtory/src/algorithms/motifs/local_temporal_three_node_motifs.rs +++ b/raphtory/src/algorithms/motifs/local_temporal_three_node_motifs.rs @@ -23,6 +23,7 @@ use crate::{ use itertools::{enumerate, Itertools}; use num_traits::Zero; +use raphtory_api::core::entities::VID; use rustc_hash::FxHashSet; use std::{cmp::Ordering, collections::HashMap, mem, ops::Add, slice::Iter}; /////////////////////////////////////////////////////// @@ -93,11 +94,11 @@ where G: GraphViewOps<'graph>, GH: GraphViewOps<'graph>, { - let neigh_map: HashMap = evv + let neigh_map: HashMap = evv .neighbours() .into_iter() .enumerate() - .map(|(num, nb)| (nb.id(), num)) + .map(|(num, nb)| (nb.node, num)) .collect(); let events = evv .edges() @@ -105,10 +106,10 @@ where .map(|e| e.explode()) .kmerge_by(|e1, e2| e1.time_and_index() < e2.time_and_index()) .map(|edge| { - if edge.src().id() == evv.id() { - star_event(neigh_map[&edge.dst().id()], 1, edge.time().unwrap()) + if edge.src().node == evv.node { + star_event(neigh_map[&edge.dst().node], 1, edge.time().unwrap()) } else { - star_event(neigh_map[&edge.src().id()], 0, edge.time().unwrap()) + star_event(neigh_map[&edge.src().node], 0, edge.time().unwrap()) } }) .collect::>(); @@ -137,9 +138,9 @@ where let mut results = deltas.iter().map(|_| [0; 8]).collect::>(); for nb in evv.neighbours().into_iter() { - let nb_id = nb.id(); - let out = evv.graph().edge(evv.id(), nb_id); - let inc = evv.graph().edge(nb_id, evv.id()); + let nb_id = nb.node; + let out = evv.graph().edge(evv.node, nb_id); + let inc = evv.graph().edge(nb_id, evv.node); let events: Vec = out .iter() .flat_map(|e| e.explode()) @@ -148,7 +149,7 @@ where }) .map(|e| { two_node_event( - if e.src().id() == evv.id() { 1 } else { 0 }, + if e.src().node == evv.node { 1 } else { 0 }, e.time().unwrap(), ) }) @@ -170,7 +171,6 @@ where pub fn triangle_motifs( graph: &G, deltas: Vec, - _motifs_count_id: AccId>, threads: Option, ) -> HashMap> where @@ -185,15 +185,13 @@ where Context::from(&kcore_subgraph); // Triangle Accumulator - let neighbours_set = accumulators::hash_set::(1); + let neighbours_set = accumulators::hash_set::(1); ctx_subgraph.agg(neighbours_set); let neighbourhood_update_step = ATask::new(move |u: &mut EvalNodeView, MotifCounter>| { for v in u.neighbours() { - if u.id() > v.id() { - v.update(&neighbours_set, u.id()); - } + v.update(&neighbours_set, u.node); } Step::Continue }); @@ -206,13 +204,21 @@ where } for v in u.neighbours() { // Find triangles on the UV edge - let intersection_nbs: Vec = match v.entry(&neighbours_set).read_ref() { - Some(v_set) => { - let u_set = FxHashSet::from_iter(u.neighbours().id()); - let intersection = u_set.intersection(v_set).cloned().collect::>(); - intersection + let intersection_nbs = { + match ( + u.entry(&neighbours_set) + .read_ref() + .unwrap_or(&FxHashSet::default()), + v.entry(&neighbours_set) + .read_ref() + .unwrap_or(&FxHashSet::default()), + ) { + (u_set, v_set) => { + let intersection = + u_set.intersection(v_set).cloned().collect::>(); + intersection + } } - None => vec![], }; if intersection_nbs.is_empty() { @@ -220,61 +226,62 @@ where } intersection_nbs.iter().for_each(|w| { - // For each triangle, run the triangle count. - - let all_exploded = vec![u.id(), v.id(), *w] - .into_iter() - .sorted() - .permutations(2) - .map(|e| { - u.graph() - .edge(*e.first().unwrap(), *e.get(1).unwrap()) + if w > &v.node { + // For each triangle, run the triangle count. + + let all_exploded = vec![u.node, v.node, *w] + .into_iter() + .sorted() + .permutations(2) + .map(|e| { + u.graph() + .edge(*e.get(0).unwrap(), *e.get(1).unwrap()) + .iter() + .flat_map(|edge| edge.explode()) + .collect::>() + }) + .kmerge_by(|e1, e2| e1.time_and_index() < e2.time_and_index()) + .map(|e| { + let (src_id, dst_id) = (e.src().node, e.dst().node); + let (uid, _vid) = (u.node, v.node); + if src_id == *w { + new_triangle_edge( + false, + if dst_id == uid { 0 } else { 1 }, + 0, + 0, + e.time().unwrap(), + ) + } else if dst_id == *w { + new_triangle_edge( + false, + if src_id == uid { 0 } else { 1 }, + 0, + 1, + e.time().unwrap(), + ) + } else if src_id == uid { + new_triangle_edge(true, 1, 0, 1, e.time().unwrap()) + } else { + new_triangle_edge(true, 0, 0, 0, e.time().unwrap()) + } + }) + .collect::>(); + + deltas.iter().enumerate().for_each(|(mc, delta)| { + let mut tri_count = init_tri_count(2); + tri_count.execute(&all_exploded, *delta); + let intermediate_counts: Iter = tri_count.return_counts().iter(); + let mc_u = u.get_mut(); + let triangle_u = mc_u.triangle[mc] .iter() - .flat_map(|edge| edge.explode()) - .collect::>() - }) - .kmerge_by(|e1, e2| e1.time_and_index() < e2.time_and_index()) - .map(|e| { - let (src_id, dst_id) = (e.src().id(), e.dst().id()); - let (uid, _vid) = (u.id(), v.id()); - if src_id == *w { - new_triangle_edge( - false, - if dst_id == uid { 0 } else { 1 }, - 0, - 0, - e.time().unwrap(), - ) - } else if dst_id == *w { - new_triangle_edge( - false, - if src_id == uid { 0 } else { 1 }, - 0, - 1, - e.time().unwrap(), - ) - } else if src_id == uid { - new_triangle_edge(true, 1, 0, 1, e.time().unwrap()) - } else { - new_triangle_edge(true, 0, 0, 0, e.time().unwrap()) - } + .zip(intermediate_counts.clone()) + .map(|(&i1, &i2)| i1 + i2) + .collect::>() + .try_into() + .unwrap(); + mc_u.triangle[mc] = triangle_u; }) - .collect::>(); - - for i in 0..deltas.len() { - let delta = deltas[i]; - let mut tri_count = init_tri_count(2); - tri_count.execute(&all_exploded, delta); - let intermediate_counts: Iter = tri_count.return_counts().iter(); - let mc_u = u.get_mut(); - let triangle_u = mc_u.triangle[i] - .iter() - .zip(intermediate_counts.clone()) - .map(|(&i1, &i2)| i1 + i2) - .collect::>() - .try_into() - .unwrap(); - mc_u.triangle[i] = triangle_u; } }) } @@ -285,7 +292,7 @@ where runner.run( vec![Job::new(neighbourhood_update_step)], - vec![Job::read_only(intersection_compute_step)], + vec![Job::new(intersection_compute_step)], None, |_, _, _els, mut local| { let mut tri_motifs = HashMap::new(); @@ -317,13 +324,12 @@ pub fn temporal_three_node_motif( where G: StaticGraphViewOps, { - let mut ctx: Context = g.into(); - let motifs_counter = val::(0); let delta_len = deltas.len(); + let mut ctx: Context = g.into(); - ctx.agg(motifs_counter); - - let triadic_motifs = triangle_motifs(g, deltas.clone(), motifs_counter, threads); + println!("Running triangle step"); + let triadic_motifs = triangle_motifs(g, deltas.clone(), threads); + println!("Running rest of motifs"); let star_motif_step = ATask::new(move |evv: &mut EvalNodeView| { let two_nodes = twonode_motif_count(evv, deltas.clone()); @@ -430,6 +436,82 @@ mod motifs_test { load_graph(edges) } + #[ignore] + #[test] + fn test_triangle_motif() { + let ij_kj_ik = vec![(1, 1, 2), (2, 3, 2), (3, 1, 3)]; + let g = load_graph(ij_kj_ik); + let mc = temporal_three_node_motif(&g, vec![3], None) + .iter() + .map(|(k, v)| (k.clone(), v[0].clone())) + .into_iter() + .collect::>>(); + println!("{:?}", mc.get("3").unwrap()); + + let ij_ki_jk = vec![(1, 1, 2), (2, 3, 1), (3, 2, 3)]; + let g = load_graph(ij_ki_jk); + let mc = temporal_three_node_motif(&g, vec![3], None) + .iter() + .map(|(k, v)| (k.clone(), v[0].clone())) + .into_iter() + .collect::>>(); + println!("{:?}", mc.get("3").unwrap()); + + let ij_jk_ik = vec![(1, 1, 2), (2, 2, 3), (3, 1, 3)]; + let g = load_graph(ij_jk_ik); + let mc = temporal_three_node_motif(&g, vec![3], None) + .iter() + .map(|(k, v)| (k.clone(), v[0].clone())) + .into_iter() + .collect::>>(); + println!("{:?}", mc.get("3").unwrap()); + + let ij_ik_jk = vec![(1, 1, 2), (2, 1, 3), (3, 2, 3)]; + let g = load_graph(ij_ik_jk); + let mc = temporal_three_node_motif(&g, vec![3], None) + .iter() + .map(|(k, v)| (k.clone(), v[0].clone())) + .into_iter() + .collect::>>(); + println!("{:?}", mc.get("3").unwrap()); + + let ij_kj_ki = vec![(1, 1, 2), (2, 3, 2), (3, 3, 1)]; + let g = load_graph(ij_kj_ki); + let mc = temporal_three_node_motif(&g, vec![3], None) + .iter() + .map(|(k, v)| (k.clone(), v[0].clone())) + .into_iter() + .collect::>>(); + println!("{:?}", mc.get("3").unwrap()); + + let ij_ki_kj = vec![(1, 1, 2), (2, 3, 1), (3, 3, 2)]; + let g = load_graph(ij_ki_kj); + let mc = temporal_three_node_motif(&g, vec![3], None) + .iter() + .map(|(k, v)| (k.clone(), v[0].clone())) + .into_iter() + .collect::>>(); + println!("{:?}", mc.get("3").unwrap()); + + let ij_jk_ki = vec![(1, 1, 2), (2, 2, 3), (3, 3, 1)]; + let g = load_graph(ij_jk_ki); + let mc = temporal_three_node_motif(&g, vec![3], None) + .iter() + .map(|(k, v)| (k.clone(), v[0].clone())) + .into_iter() + .collect::>>(); + println!("{:?}", mc.get("3").unwrap()); + + let ij_ik_kj = vec![(1, 1, 2), (2, 1, 3), (3, 3, 2)]; + let g = load_graph(ij_ik_kj); + let mc = temporal_three_node_motif(&g, vec![3], None) + .iter() + .map(|(k, v)| (k.clone(), v[0].clone())) + .into_iter() + .collect::>>(); + println!("{:?}", mc.get("3").unwrap()); + } + #[test] fn test_local_motif() { let graph = load_sample_graph(); diff --git a/raphtory/src/python/packages/algorithms.rs b/raphtory/src/python/packages/algorithms.rs index c260a3ede3..b92eb3013d 100644 --- a/raphtory/src/python/packages/algorithms.rs +++ b/raphtory/src/python/packages/algorithms.rs @@ -399,12 +399,12 @@ pub fn global_clustering_coefficient(g: &PyGraphView) -> f64 { /// There are 8 triangle motifs: /// /// 1. i --> j, k --> j, i --> k -/// 2. i --> j, k --> i, j --> k +/// 2. i --> j, k --> j, k --> i /// 3. i --> j, j --> k, i --> k -/// 4. i --> j, i --> k, j --> k -/// 5. i --> j, k --> j, k --> i +/// 4. i --> j, j --> k, k --> i +/// 5. i --> j, k --> i, j --> k /// 6. i --> j, k --> i, k --> j -/// 7. i --> j, j --> k, k --> i +/// 7. i --> j, i --> k, j --> k /// 8. i --> j, i --> k, k --> j /// /// Arguments: From c3fbb44a97ce6c9cc514eca21f7339ae3ebf3ea9 Mon Sep 17 00:00:00 2001 From: narnolddd Date: Mon, 3 Jun 2024 16:51:19 +0100 Subject: [PATCH 15/15] change to internal id instead of id method --- .../global_temporal_three_node_motifs.rs | 33 ++++++++++--------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/raphtory/src/algorithms/motifs/global_temporal_three_node_motifs.rs b/raphtory/src/algorithms/motifs/global_temporal_three_node_motifs.rs index 6e28a692f7..0d64c52df7 100644 --- a/raphtory/src/algorithms/motifs/global_temporal_three_node_motifs.rs +++ b/raphtory/src/algorithms/motifs/global_temporal_three_node_motifs.rs @@ -18,6 +18,7 @@ use crate::{ }, }; use itertools::Itertools; +use raphtory_api::core::entities::VID; use rustc_hash::FxHashSet; use std::collections::HashMap; @@ -28,11 +29,11 @@ where G: StaticGraphViewOps, { let two_n_c = twonode_motif_count(evv, deltas.clone()); - let neigh_map: HashMap = evv + let neigh_map: HashMap = evv .neighbours() .into_iter() .enumerate() - .map(|(num, nb)| (nb.id(), num)) + .map(|(num, nb)| (nb.node, num)) .collect(); let events = evv .edges() @@ -40,10 +41,10 @@ where .map(|e| e.explode()) .kmerge_by(|e1, e2| e1.time_and_index() < e2.time_and_index()) .map(|edge| { - if edge.src().id() == evv.id() { - star_event(neigh_map[&edge.dst().id()], 1, edge.time().unwrap()) + if edge.src().node == evv.node { + star_event(neigh_map[&edge.dst().node], 1, edge.time().unwrap()) } else { - star_event(neigh_map[&edge.src().id()], 0, edge.time().unwrap()) + star_event(neigh_map[&edge.src().node], 0, edge.time().unwrap()) } }) .collect::>(); @@ -78,9 +79,9 @@ where let mut results = deltas.iter().map(|_| [0; 8]).collect::>(); for nb in evv.neighbours().into_iter() { - let nb_id = nb.id(); - let out = evv.graph().edge(evv.id(), nb_id); - let inc = evv.graph().edge(nb_id, evv.id()); + let nb_id = nb.node; + let out = evv.graph().edge(evv.node, nb_id); + let inc = evv.graph().edge(nb_id, evv.node); let events: Vec = out .iter() .flat_map(|e| e.explode()) @@ -89,7 +90,7 @@ where }) .map(|e| { two_node_event( - if e.src().id() == evv.id() { 1 } else { 0 }, + if e.src().node == evv.node { 1 } else { 0 }, e.time().unwrap(), ) }) @@ -119,7 +120,7 @@ where Context::from(&kcore_subgraph); // Triangle Accumulator - let neighbours_set = accumulators::hash_set::(0); + let neighbours_set = accumulators::hash_set::(0); ctx_subgraph.agg(neighbours_set); let tri_mc = deltas @@ -137,8 +138,8 @@ where let neighbourhood_update_step = ATask::new(move |u: &mut EvalNodeView, ()>| { for v in u.neighbours() { - if u.id() > v.id() { - v.update(&neighbours_set, u.id()); + if u.node > v.node { + v.update(&neighbours_set, u.node); } } Step::Continue @@ -147,7 +148,7 @@ where let intersection_compute_step = ATask::new(move |u: &mut EvalNodeView, ()>| { for v in u.neighbours() { // Find triangles on the UV edge - if u.id() > v.id() { + if u.node > v.node { let intersection_nbs = { match ( u.entry(&neighbours_set) @@ -172,7 +173,7 @@ where intersection_nbs.iter().for_each(|w| { // For each triangle, run the triangle count. - let all_exploded = vec![u.id(), v.id(), *w] + let all_exploded = vec![u.node, v.node, *w] .into_iter() .sorted() .permutations(2) @@ -185,8 +186,8 @@ where }) .kmerge_by(|e1, e2| e1.time_and_index() < e2.time_and_index()) .map(|e| { - let (src_id, dst_id) = (e.src().id(), e.dst().id()); - let uid = u.id(); + let (src_id, dst_id) = (e.src().node, e.dst().node); + let uid = u.node; if src_id == *w { new_triangle_edge( false,