diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index 9345a920e2..3e68ac2074 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -44,10 +44,6 @@ jobs: toolchain: 1.77.0 override: true components: rustfmt, clippy - - name: Cargo update - uses: actions-rs/cargo@v1 - with: - command: update - name: Run benchmark (Unix) run: | set -o pipefail diff --git a/.github/workflows/test_python_workflow.yml b/.github/workflows/test_python_workflow.yml index 02aa9eaa50..cad65b237f 100644 --- a/.github/workflows/test_python_workflow.yml +++ b/.github/workflows/test_python_workflow.yml @@ -59,8 +59,8 @@ jobs: with: python-version: ${{ matrix.python }} cache: 'pip' - - name: Flip raphtory-arrow in Cargo.toml - run: python ./scripts/flip_ra.py Cargo.toml + - name: Activate raphtory-arrow in Cargo.toml + run: make pull-arrow - name: Run Maturin develop uses: PyO3/maturin-action@v1 with: diff --git a/.github/workflows/test_rust_workflow.yml b/.github/workflows/test_rust_workflow.yml index 680df601e2..666cf343a4 100644 --- a/.github/workflows/test_rust_workflow.yml +++ b/.github/workflows/test_rust_workflow.yml @@ -52,8 +52,8 @@ jobs: name: Cargo cache with: cache-all-crates: true - - name: Flip raphtory-arrow in Cargo.toml - run: python ./scripts/flip_ra.py + - name: Activate raphtory-arrow in Cargo.toml + run: make pull-arrow - name: Install bininstall uses: cargo-bins/cargo-binstall@main - name: Install nextest diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000000..12818e631f --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "raphtory-arrow-private"] + path = raphtory-arrow-private + url = git@github.com:Pometry/raphtory-arrow.git diff --git a/Cargo.toml b/Cargo.toml index dd3ba83662..52407e7b92 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,10 @@ inherits = "release" debug = true [workspace.dependencies] +#[public-arrow] +raphtory-arrow = { version = "0.8.1", path = "raphtory-arrow" } +#[private-arrow] +# raphtory-arrow = { path = "raphtory-arrow-private", package = "raphtory-arrow-private" } async-graphql = { version = "6.0.11", features = ["dynamic-schema"] } async-graphql-poem = "6.0.11" dynamic-graphql = "0.8.1" diff --git a/Makefile b/Makefile index 706a308552..6bd3acadbd 100644 --- a/Makefile +++ b/Makefile @@ -35,4 +35,13 @@ rust-test-all: cargo check -p raphtory --no-default-features --features "io" cargo check -p raphtory --no-default-features --features "python" cargo check -p raphtory --no-default-features --features "search" - cargo check -p raphtory --no-default-features --features "vectors" \ No newline at end of file + cargo check -p raphtory --no-default-features --features "vectors" + +activate-arrow: + ./scripts/activate_private_arrow.py + +deactivate-arrow: + ./scripts/deactivate_private_arrow.py + +pull-arrow: activate-arrow + git submodule update --init --recursive diff --git a/examples/rust/src/bin/hulongbay/main.rs b/examples/rust/src/bin/hulongbay/main.rs index d3702e10f1..9ec0207363 100644 --- a/examples/rust/src/bin/hulongbay/main.rs +++ b/examples/rust/src/bin/hulongbay/main.rs @@ -2,7 +2,13 @@ #![allow(dead_code)] use itertools::Itertools; use raphtory::{ - algorithms::{components::weakly_connected_components, motifs::triangle_count::triangle_count}, + algorithms::{ + components::weakly_connected_components, + motifs::{ + global_temporal_three_node_motifs::global_temporal_three_node_motif, + triangle_count::triangle_count, + }, + }, graph_loader::source::csv_loader::CsvLoader, prelude::*, }; @@ -207,8 +213,16 @@ fn try_main_bm() -> Result<(), Box> { Ok(()) } +fn try_motif() -> Result<(), Box> { + let args: Vec = env::args().collect(); + let data_dir = Path::new(args.get(1).ok_or(MissingArgumentError)?); + let graph = loader(data_dir)?; + global_temporal_three_node_motif(&graph, 3600, None); + Ok(()) +} + fn main() { - if let Err(e) = try_main_bm() { + if let Err(e) = try_motif() { eprintln!("Failed: {}", e); std::process::exit(1) } diff --git a/raphtory-arrow-private b/raphtory-arrow-private new file mode 160000 index 0000000000..78f231a104 --- /dev/null +++ b/raphtory-arrow-private @@ -0,0 +1 @@ +Subproject commit 78f231a10456a514bd56ed5912a18f431f245b87 diff --git a/raphtory-benchmark/Cargo.toml b/raphtory-benchmark/Cargo.toml index c5ac34388d..131ce09534 100644 --- a/raphtory-benchmark/Cargo.toml +++ b/raphtory-benchmark/Cargo.toml @@ -9,7 +9,7 @@ edition = "2021" criterion = { workspace = true } raphtory = { path = "../raphtory", features = ["io"] } raphtory-graphql = { path = "../raphtory-graphql", version = "0.8.1" } -raphtory-arrow = { path = "../raphtory-arrow", version = "0.8.1" } +raphtory-arrow.workspace = true sorted_vector_map = { workspace = true } rand = { workspace = true } rayon = { workspace = true } diff --git a/raphtory-benchmark/benches/algobench.rs b/raphtory-benchmark/benches/algobench.rs index 2c23009d96..b879875321 100644 --- a/raphtory-benchmark/benches/algobench.rs +++ b/raphtory-benchmark/benches/algobench.rs @@ -8,7 +8,10 @@ use raphtory::{ clustering_coefficient::clustering_coefficient, local_clustering_coefficient::local_clustering_coefficient, }, - motifs::local_triangle_count::local_triangle_count, + motifs::{ + global_temporal_three_node_motifs::global_temporal_three_node_motif, + local_triangle_count::local_triangle_count, + }, }, graphgen::random_attachment::random_attachment, prelude::*, @@ -16,22 +19,6 @@ use raphtory::{ use rayon::prelude::*; mod common; - -//TODO swap to new trianglecount -// pub fn global_triangle_count_analysis(c: &mut Criterion) { -// let mut group = c.benchmark_group("global_triangle_count"); -// group.sample_size(10); -// bench(&mut group, "global_triangle_count", None, |b| { -// let g = raphtory_db::graph_loader::lotr_graph::lotr_graph(1); -// let windowed_graph = g.window(i64::MIN, i64::MAX); -// b.iter(|| { -// global_triangle_count(&windowed_graph).unwrap(); -// }); -// }); -// -// group.finish(); -// } - pub fn local_triangle_count_analysis(c: &mut Criterion) { let mut group = c.benchmark_group("local_triangle_count"); group.sample_size(10); @@ -132,6 +119,18 @@ pub fn graphgen_large_concomp(c: &mut Criterion) { group.finish() } +pub fn temporal_motifs(c: &mut Criterion) { + let mut group = c.benchmark_group("temporal_motifs"); + + bench(&mut group, "temporal_motifs", None, |b| { + let g: Graph = raphtory::graph_loader::example::lotr_graph::lotr_graph(); + + b.iter(|| global_temporal_three_node_motif(&g, 100, None)) + }); + + group.finish(); +} + criterion_group!( benches, local_triangle_count_analysis, @@ -139,5 +138,6 @@ criterion_group!( graphgen_large_clustering_coeff, graphgen_large_pagerank, graphgen_large_concomp, + temporal_motifs, ); criterion_main!(benches); diff --git a/raphtory-cypher/Cargo.toml b/raphtory-cypher/Cargo.toml index 1ac40e906b..e142fa9719 100644 --- a/raphtory-cypher/Cargo.toml +++ b/raphtory-cypher/Cargo.toml @@ -15,7 +15,7 @@ edition.workspace = true [dependencies] raphtory = { path = "../raphtory" } -raphtory-arrow = { path = "../raphtory-arrow", optional = true, version = "0.8.1"} +raphtory-arrow = { workspace = true, optional = true } arrow.workspace = true arrow-buffer.workspace = true arrow-schema.workspace = true diff --git a/raphtory-graphql/src/model/graph/edge.rs b/raphtory-graphql/src/model/graph/edge.rs index cb59c1589e..a513531c0a 100644 --- a/raphtory-graphql/src/model/graph/edge.rs +++ b/raphtory-graphql/src/model/graph/edge.rs @@ -1,5 +1,4 @@ use crate::model::graph::{node::Node, property::GqlProperties}; -use async_graphql::Error; use dynamic_graphql::{ResolvedObject, ResolvedObjectFields}; use itertools::Itertools; use raphtory::{ diff --git a/raphtory/Cargo.toml b/raphtory/Cargo.toml index c2aab2e7fd..e68f0825e3 100644 --- a/raphtory/Cargo.toml +++ b/raphtory/Cargo.toml @@ -15,7 +15,7 @@ homepage.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -raphtory-api = { path = "../raphtory-api", version="0.8.1" } +raphtory-api = { path = "../raphtory-api", version = "0.8.1" } bincode = { workspace = true } chrono = { workspace = true } itertools = { workspace = true } @@ -74,7 +74,7 @@ tempfile = { workspace = true, optional = true } bytemuck = { workspace = true, optional = true } rpds = { workspace = true, optional = true } thread_local = { workspace = true, optional = true } -raphtory-arrow = { path = "../raphtory-arrow", version="0.8.1", optional = true } +raphtory-arrow = { workspace = true, optional = true } [dev-dependencies] csv = { workspace = true } diff --git a/raphtory/src/algorithms/motifs/global_temporal_three_node_motifs.rs b/raphtory/src/algorithms/motifs/global_temporal_three_node_motifs.rs index f57c9f50f5..f8a618bad8 100644 --- a/raphtory/src/algorithms/motifs/global_temporal_three_node_motifs.rs +++ b/raphtory/src/algorithms/motifs/global_temporal_three_node_motifs.rs @@ -7,7 +7,7 @@ use crate::{ compute_state::ComputeStateVec, }, db::{ - api::view::{GraphViewOps, NodeViewOps, *}, + api::view::*, graph::views::node_subgraph::NodeSubgraph, task::{ context::Context, @@ -23,15 +23,11 @@ use std::collections::HashMap; /////////////////////////////////////////////////////// -pub fn star_motif_count( - graph: &G, - evv: &EvalNodeView, - deltas: Vec, -) -> Vec<[usize; 32]> +pub fn star_motif_count(evv: &EvalNodeView, deltas: Vec) -> Vec<[usize; 32]> where G: StaticGraphViewOps, { - let two_n_c = twonode_motif_count(graph, evv, deltas.clone()); + let two_n_c = twonode_motif_count(evv, deltas.clone()); let neigh_map: HashMap = evv .neighbours() .into_iter() @@ -75,11 +71,7 @@ where /////////////////////////////////////////////////////// -pub fn twonode_motif_count( - graph: &G, - evv: &EvalNodeView, - deltas: Vec, -) -> Vec<[usize; 8]> +pub fn twonode_motif_count(evv: &EvalNodeView, deltas: Vec) -> Vec<[usize; 8]> where G: StaticGraphViewOps, { @@ -87,8 +79,8 @@ where for nb in evv.neighbours().into_iter() { let nb_id = nb.id(); - let out = graph.edge(evv.id(), nb_id); - let inc = graph.edge(nb_id, evv.id()); + let out = evv.graph().edge(evv.id(), nb_id); + let inc = evv.graph().edge(nb_id, evv.id()); let events: Vec = out .iter() .flat_map(|e| e.explode()) @@ -181,7 +173,8 @@ where .sorted() .permutations(2) .flat_map(|e| { - g.edge(*e.first().unwrap(), *e.get(1).unwrap()) + u.graph() + .edge(*e.first().unwrap(), *e.get(1).unwrap()) .iter() .flat_map(|edge| edge.explode()) .collect::>() @@ -265,8 +258,7 @@ where let out1 = triangle_motifs(g, deltas.clone(), threads); let step1 = ATask::new(move |evv: &mut EvalNodeView| { - let g = evv.graph(); - let star_nodes = star_motif_count(g, evv, deltas.clone()); + let star_nodes = star_motif_count(evv, deltas.clone()); for (i, star) in star_nodes.iter().enumerate() { evv.global_update(&star_mc[i], *star); } diff --git a/raphtory/src/algorithms/motifs/local_temporal_three_node_motifs.rs b/raphtory/src/algorithms/motifs/local_temporal_three_node_motifs.rs index 794dbdf5ec..b8ecc03656 100644 --- a/raphtory/src/algorithms/motifs/local_temporal_three_node_motifs.rs +++ b/raphtory/src/algorithms/motifs/local_temporal_three_node_motifs.rs @@ -126,7 +126,6 @@ where /////////////////////////////////////////////////////// pub fn twonode_motif_count<'a, 'b, G, GH>( - graph: &'a G, evv: &'a EvalNodeView<'b, '_, G, MotifCounter, GH>, deltas: Vec, ) -> Vec<[usize; 8]> @@ -144,8 +143,8 @@ where for nb in evv.neighbours().into_iter() { let nb_id = nb.id(); - let out = graph.edge(evv.id(), nb_id); - let inc = graph.edge(nb_id, evv.id()); + let out = evv.graph().edge(evv.id(), nb_id); + let inc = evv.graph().edge(nb_id, evv.id()); let events: Vec = out .iter() .flat_map(|e| e.explode()) @@ -337,8 +336,7 @@ where let out1 = triangle_motifs(g, deltas.clone(), motifs_counter, threads); let step1 = ATask::new(move |evv: &mut EvalNodeView| { - let g = evv.graph(); - let two_nodes = twonode_motif_count(g, evv, deltas.clone()); + let two_nodes = twonode_motif_count(evv, deltas.clone()); let star_nodes = star_motif_count(evv, deltas.clone()); *evv.get_mut() = MotifCounter::new( diff --git a/raphtory/src/arrow/graph_impl/core_ops.rs b/raphtory/src/arrow/graph_impl/core_ops.rs index ba032f2095..46ca6491e7 100644 --- a/raphtory/src/arrow/graph_impl/core_ops.rs +++ b/raphtory/src/arrow/graph_impl/core_ops.rs @@ -33,7 +33,7 @@ use crate::{ }; use itertools::Itertools; use polars_arrow::datatypes::ArrowDataType; -use raphtory_arrow::{properties::Properties, GID}; +use raphtory_arrow::{properties::Properties, GidRef, GID}; use rayon::prelude::*; impl CoreGraphOps for ArrowGraph { fn unfiltered_num_nodes(&self) -> usize { @@ -97,17 +97,17 @@ impl CoreGraphOps for ArrowGraph { fn node_id(&self, v: VID) -> u64 { match self.inner.node_gid(v).unwrap() { - GID::U64(n) => n, - GID::I64(n) => n as u64, - GID::Str(s) => s.id(), + GidRef::U64(n) => n, + GidRef::I64(n) => n as u64, + GidRef::Str(s) => s.id(), } } fn node_name(&self, v: VID) -> String { match self.inner.node_gid(v).unwrap() { - GID::U64(n) => n.to_string(), - GID::I64(n) => n.to_string(), - GID::Str(s) => s, + GidRef::U64(n) => n.to_string(), + GidRef::I64(n) => n.to_string(), + GidRef::Str(s) => s.to_owned(), } } @@ -200,7 +200,7 @@ impl CoreGraphOps for ArrowGraph { } fn core_nodes(&self) -> NodesStorage { - NodesStorage::Arrow(ArrowNodesOwned::new(&self.inner)) + NodesStorage::Arrow(ArrowNodesOwned::new(self.inner.clone())) } fn core_node_entry(&self, vid: VID) -> NodeStorageEntry { @@ -208,7 +208,7 @@ impl CoreGraphOps for ArrowGraph { } fn core_node_arc(&self, vid: VID) -> NodeOwnedEntry { - NodeOwnedEntry::Arrow(ArrowOwnedNode::new(&self.inner, vid)) + NodeOwnedEntry::Arrow(ArrowOwnedNode::new(self.inner.clone(), vid)) } fn core_edge_arc(&self, eid: ELID) -> EdgeOwnedEntry { diff --git a/raphtory/src/arrow/storage_interface/node.rs b/raphtory/src/arrow/storage_interface/node.rs index 06c95d2f9c..24bb91563e 100644 --- a/raphtory/src/arrow/storage_interface/node.rs +++ b/raphtory/src/arrow/storage_interface/node.rs @@ -1,6 +1,6 @@ use crate::{ core::{ - entities::{edges::edge_ref::EdgeRef, LayerIds, EID, VID}, + entities::{edges::edge_ref::EdgeRef, nodes::input_node::InputNode, LayerIds, EID, VID}, Direction, }, db::api::{ @@ -13,34 +13,27 @@ use crate::{ }, }; use itertools::Itertools; -use raphtory_arrow::{ - graph::TemporalGraph, graph_fragment::TempColGraphFragment, properties::Properties, - timestamps::TimeStamps, -}; +use raphtory_arrow::{graph::TemporalGraph, timestamps::TimeStamps, GidRef}; use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator}; use std::{iter, sync::Arc}; #[derive(Copy, Clone, Debug)] pub struct ArrowNode<'a> { - pub(super) properties: Option<&'a Properties>, - pub(super) layers: &'a Arc<[TempColGraphFragment]>, + graph: &'a TemporalGraph, pub(super) vid: VID, } impl<'a> ArrowNode<'a> { pub(crate) fn new(graph: &'a TemporalGraph, vid: VID) -> Self { - Self { - properties: graph.node_properties(), - layers: graph.arc_layers(), - vid, - } + Self { graph, vid } } pub fn out_edges(self, layers: &'a LayerIds) -> impl Iterator + 'a { match layers { LayerIds::None => LayerVariants::None(iter::empty()), LayerIds::All => LayerVariants::All( - self.layers + self.graph + .layers() .iter() .enumerate() .map(|(layer_id, layer)| { @@ -54,7 +47,7 @@ impl<'a> ArrowNode<'a> { .kmerge_by(|e1, e2| e1.remote() <= e2.remote()), ), LayerIds::One(layer_id) => LayerVariants::One( - self.layers[*layer_id] + self.graph.layers()[*layer_id] .nodes_storage() .out_adj_list(self.vid) .map(move |(eid, dst)| { @@ -64,7 +57,7 @@ impl<'a> ArrowNode<'a> { LayerIds::Multiple(ids) => LayerVariants::Multiple( ids.iter() .map(|&layer_id| { - self.layers[layer_id] + self.graph.layers()[layer_id] .nodes_storage() .out_adj_list(self.vid) .map(move |(eid, dst)| { @@ -80,7 +73,8 @@ impl<'a> ArrowNode<'a> { match layers { LayerIds::None => LayerVariants::None(iter::empty()), LayerIds::All => LayerVariants::All( - self.layers + self.graph + .layers() .iter() .enumerate() .map(|(layer_id, layer)| { @@ -94,7 +88,7 @@ impl<'a> ArrowNode<'a> { .kmerge_by(|e1, e2| e1.remote() <= e2.remote()), ), LayerIds::One(layer_id) => LayerVariants::One( - self.layers[*layer_id] + self.graph.layers()[*layer_id] .nodes_storage() .in_adj_list(self.vid) .map(move |(eid, src)| { @@ -104,7 +98,7 @@ impl<'a> ArrowNode<'a> { LayerIds::Multiple(ids) => LayerVariants::Multiple( ids.iter() .map(|&layer_id| { - self.layers[layer_id] + self.graph.layers()[layer_id] .nodes_storage() .in_adj_list(self.vid) .map(move |(eid, src)| { @@ -125,8 +119,9 @@ impl<'a> ArrowNode<'a> { let mut additions = match layer_ids { LayerIds::None => Vec::with_capacity(1), LayerIds::All => { - let mut additions = Vec::with_capacity(self.layers.len() + 1); - self.layers + let mut additions = Vec::with_capacity(self.graph.layers().len() + 1); + self.graph + .layers() .par_iter() .map(|l| { TimeStamps::new(l.nodes_storage().additions().value(self.vid.index()), None) @@ -136,7 +131,7 @@ impl<'a> ArrowNode<'a> { } LayerIds::One(id) => { vec![TimeStamps::new( - self.layers[*id] + self.graph.layers()[*id] .nodes_storage() .additions() .value(self.vid.index()), @@ -148,7 +143,7 @@ impl<'a> ArrowNode<'a> { ids.par_iter() .map(|l| { TimeStamps::new( - self.layers[*l] + self.graph.layers()[*l] .nodes_storage() .additions() .value(self.vid.index()), @@ -159,7 +154,7 @@ impl<'a> ArrowNode<'a> { additions } }; - if let Some(props) = self.properties { + if let Some(props) = self.graph.node_properties() { let timestamps = props.temporal_props.timestamps::(self.vid); if timestamps.len() > 0 { let ts = timestamps.times(); @@ -174,15 +169,15 @@ impl<'a> NodeStorageOps<'a> for ArrowNode<'a> { fn degree(self, layers: &LayerIds, dir: Direction) -> usize { let single_layer = match layers { LayerIds::None => return 0, - LayerIds::All => match self.layers.len() { + LayerIds::All => match self.graph.layers().len() { 0 => return 0, - 1 => Some(&self.layers[0]), + 1 => Some(&self.graph.layers()[0]), _ => None, }, - LayerIds::One(id) => Some(&self.layers[*id]), + LayerIds::One(id) => Some(&self.graph.layers()[*id]), LayerIds::Multiple(ids) => match ids.len() { 0 => return 0, - 1 => Some(&self.layers[ids[0]]), + 1 => Some(&self.graph.layers()[ids[0]]), _ => None, }, }; @@ -221,7 +216,8 @@ impl<'a> NodeStorageOps<'a> for ArrowNode<'a> { } fn tprop(self, prop_id: usize) -> impl TPropOps<'a> { - self.properties + self.graph + .node_properties() .unwrap() .temporal_props .prop(self.vid, prop_id) @@ -247,30 +243,46 @@ impl<'a> NodeStorageOps<'a> for ArrowNode<'a> { self.vid } + fn id(self) -> u64 { + match self.graph.node_gid(self.vid).unwrap() { + GidRef::U64(v) => v, + GidRef::I64(v) => v as u64, + GidRef::Str(v) => v.id(), + } + } + fn name(self) -> Option<&'a str> { - todo!() + match self.graph.node_gid(self.vid).unwrap() { + GidRef::U64(_) => None, + GidRef::I64(_) => None, + GidRef::Str(v) => Some(v), + } } fn find_edge(self, dst: VID, layer_ids: &LayerIds) -> Option { match layer_ids { LayerIds::None => None, - LayerIds::All => match self.layers.len() { + LayerIds::All => match self.graph.layers().len() { 0 => None, 1 => { - let eid = self.layers[0].nodes_storage().find_edge(self.vid, dst)?; + let eid = self.graph.layers()[0] + .nodes_storage() + .find_edge(self.vid, dst)?; Some(EdgeRef::new_outgoing(eid, self.vid, dst).at_layer(0)) } _ => todo!("multilayer edge views not implemented in arrow yet"), }, LayerIds::One(id) => { - let eid = self.layers[*id].nodes_storage().find_edge(self.vid, dst)?; + let eid = self.graph.layers()[*id] + .nodes_storage() + .find_edge(self.vid, dst)?; Some(EdgeRef::new_outgoing(eid, self.vid, dst).at_layer(*id)) } LayerIds::Multiple(ids) => match ids.len() { 0 => None, 1 => { let layer = ids[0]; - let eid = self.layers[layer] + let eid = self.graph.layers()[layer] .nodes_storage() .find_edge(self.vid, dst)?; Some(EdgeRef::new_outgoing(eid, self.vid, dst).at_layer(layer)) @@ -283,23 +295,17 @@ impl<'a> NodeStorageOps<'a> for ArrowNode<'a> { #[derive(Clone, Debug)] pub struct ArrowOwnedNode { - properties: Option>, - layers: Arc<[TempColGraphFragment]>, + graph: Arc, vid: VID, } impl ArrowOwnedNode { - pub(crate) fn new(graph: &TemporalGraph, vid: VID) -> Self { - Self { - properties: graph.node_properties().cloned(), - layers: graph.arc_layers().clone(), - vid, - } + pub(crate) fn new(graph: Arc, vid: VID) -> Self { + Self { graph, vid } } pub fn as_ref(&self) -> ArrowNode { ArrowNode { - properties: self.properties.as_ref(), - layers: &self.layers, + graph: &self.graph, vid: self.vid, } } @@ -308,7 +314,7 @@ impl ArrowOwnedNode { match layers { LayerIds::None => LayerVariants::None(iter::empty()), LayerIds::All => { - let layers = self.layers; + let layers = self.graph.arc_layers().clone(); LayerVariants::All( (0..layers.len()) .map(move |layer_id| { @@ -328,7 +334,7 @@ impl ArrowOwnedNode { ) } LayerIds::One(layer_id) => { - let adj = self.layers[layer_id] + let adj = self.graph.layers()[layer_id] .nodes_storage() .adj_out() .clone() @@ -343,7 +349,7 @@ impl ArrowOwnedNode { (0..ids.len()) .map(move |i| { let layer_id = ids[i]; - let adj = self.layers[layer_id] + let adj = self.graph.layers()[layer_id] .nodes_storage() .adj_out() .clone() @@ -364,7 +370,7 @@ impl ArrowOwnedNode { match layers { LayerIds::None => LayerVariants::None(iter::empty()), LayerIds::All => { - let layers = self.layers; + let layers = self.graph.arc_layers().clone(); LayerVariants::All( (0..layers.len()) .map(move |layer_id| { @@ -390,7 +396,7 @@ impl ArrowOwnedNode { ) } LayerIds::One(layer_id) => { - let layer = &self.layers[layer_id]; + let layer = self.graph.layer(layer_id); let eids = layer .nodes_storage() .adj_in_edges() @@ -414,7 +420,7 @@ impl ArrowOwnedNode { (0..ids.len()) .map(move |i| { let layer_id = ids[i]; - let layer = &self.layers[layer_id]; + let layer = self.graph.layer(layer_id); let eids = layer .nodes_storage() .adj_in_edges() @@ -483,6 +489,11 @@ impl<'a> NodeStorageOps<'a> for &'a ArrowOwnedNode { self.vid } + #[inline] + fn id(self) -> u64 { + self.as_ref().id() + } + fn name(self) -> Option<&'a str> { self.as_ref().name() } diff --git a/raphtory/src/arrow/storage_interface/nodes.rs b/raphtory/src/arrow/storage_interface/nodes.rs index bb894416a9..899347cbea 100644 --- a/raphtory/src/arrow/storage_interface/nodes.rs +++ b/raphtory/src/arrow/storage_interface/nodes.rs @@ -3,40 +3,24 @@ use crate::{ core::entities::VID, }; -use raphtory_arrow::{ - graph::TemporalGraph, graph_fragment::TempColGraphFragment, properties::Properties, -}; +use raphtory_arrow::graph::TemporalGraph; use std::sync::Arc; #[derive(Clone, Debug)] pub struct ArrowNodesOwned { - num_nodes: usize, - properties: Option>, - layers: Arc<[TempColGraphFragment]>, + graph: Arc, } impl ArrowNodesOwned { - pub(crate) fn new(graph: &TemporalGraph) -> Self { - Self { - num_nodes: graph.num_nodes(), - properties: graph.node_properties().cloned(), - layers: graph.layers().into(), - } + pub(crate) fn new(graph: Arc) -> Self { + Self { graph } } pub fn node(&self, vid: VID) -> ArrowNode { - ArrowNode { - properties: self.properties.as_ref(), - layers: &self.layers, - vid, - } + ArrowNode::new(&self.graph, vid) } pub fn as_ref(&self) -> ArrowNodesRef { - ArrowNodesRef { - num_nodes: self.num_nodes, - properties: self.properties.as_ref(), - layers: &self.layers, - } + ArrowNodesRef::new(&self.graph) } } diff --git a/raphtory/src/arrow/storage_interface/nodes_ref.rs b/raphtory/src/arrow/storage_interface/nodes_ref.rs index a2f9c5d20e..441bdbad60 100644 --- a/raphtory/src/arrow/storage_interface/nodes_ref.rs +++ b/raphtory/src/arrow/storage_interface/nodes_ref.rs @@ -1,41 +1,28 @@ use crate::{arrow::storage_interface::node::ArrowNode, core::entities::VID}; -use raphtory_arrow::{ - graph::TemporalGraph, graph_fragment::TempColGraphFragment, properties::Properties, -}; +use raphtory_arrow::graph::TemporalGraph; use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; -use std::sync::Arc; #[derive(Copy, Clone, Debug)] pub struct ArrowNodesRef<'a> { - pub(super) num_nodes: usize, - pub(super) properties: Option<&'a Properties>, - pub(super) layers: &'a Arc<[TempColGraphFragment]>, + graph: &'a TemporalGraph, } impl<'a> ArrowNodesRef<'a> { pub(crate) fn new(graph: &'a TemporalGraph) -> Self { - Self { - num_nodes: graph.num_nodes(), - properties: graph.node_properties(), - layers: graph.arc_layers(), - } + Self { graph } } pub fn node(self, vid: VID) -> ArrowNode<'a> { - ArrowNode { - properties: self.properties, - layers: self.layers, - vid, - } + ArrowNode::new(self.graph, vid) } pub fn par_iter(self) -> impl IndexedParallelIterator> { - (0..self.num_nodes) + (0..self.graph.num_nodes()) .into_par_iter() .map(move |vid| self.node(VID(vid))) } pub fn iter(self) -> impl Iterator> { - (0..self.num_nodes).map(move |vid| self.node(VID(vid))) + (0..self.graph.num_nodes()).map(move |vid| self.node(VID(vid))) } } diff --git a/raphtory/src/db/api/storage/edges/edges.rs b/raphtory/src/db/api/storage/edges/edges.rs index 64ca63f915..559c26f7b3 100644 --- a/raphtory/src/db/api/storage/edges/edges.rs +++ b/raphtory/src/db/api/storage/edges/edges.rs @@ -41,25 +41,6 @@ pub enum EdgesStorageRef<'a> { } impl<'a> EdgesStorageRef<'a> { - pub fn get_layer(self, eid: EID, layer_id: usize) -> EdgeStorageRef<'a> { - match self { - EdgesStorageRef::Mem(storage) => EdgeStorageRef::Mem(storage.get(eid)), - #[cfg(feature = "arrow")] - EdgesStorageRef::Arrow(storage) => EdgeStorageRef::Arrow(storage.edge(eid, layer_id)), - } - } - - #[inline] - pub fn get(self, eid: EID) -> EdgeStorageRef<'a> { - match self { - EdgesStorageRef::Mem(storage) => EdgeStorageRef::Mem(storage.get(eid)), - #[cfg(feature = "arrow")] - EdgesStorageRef::Arrow(_) => { - todo!("getting multilayer edge not implemented for arrow graph") - } - } - } - #[cfg(feature = "arrow")] pub fn iter(self, layers: LayerIds) -> impl Iterator> { match self { diff --git a/raphtory/src/db/api/storage/nodes/node_entry.rs b/raphtory/src/db/api/storage/nodes/node_entry.rs index 4d8e3b8398..d699b2b59e 100644 --- a/raphtory/src/db/api/storage/nodes/node_entry.rs +++ b/raphtory/src/db/api/storage/nodes/node_entry.rs @@ -97,6 +97,10 @@ impl<'a, 'b: 'a> NodeStorageOps<'a> for &'a NodeStorageEntry<'b> { for_all!(self, node => node.vid()) } + fn id(self) -> u64 { + for_all!(self, node => node.id()) + } + fn name(self) -> Option<&'a str> { for_all!(self, node => node.name()) } diff --git a/raphtory/src/db/api/storage/nodes/node_owned_entry.rs b/raphtory/src/db/api/storage/nodes/node_owned_entry.rs index 33b3990bbb..8d16aa3297 100644 --- a/raphtory/src/db/api/storage/nodes/node_owned_entry.rs +++ b/raphtory/src/db/api/storage/nodes/node_owned_entry.rs @@ -94,6 +94,10 @@ impl<'a> NodeStorageOps<'a> for &'a NodeOwnedEntry { for_all!(self, node => node.vid()) } + fn id(self) -> u64 { + for_all!(self, node => node.id()) + } + fn name(self) -> Option<&'a str> { for_all!(self, node => node.name()) } diff --git a/raphtory/src/db/api/storage/nodes/node_ref.rs b/raphtory/src/db/api/storage/nodes/node_ref.rs index 6d26c33d47..2fa626818e 100644 --- a/raphtory/src/db/api/storage/nodes/node_ref.rs +++ b/raphtory/src/db/api/storage/nodes/node_ref.rs @@ -91,6 +91,10 @@ impl<'a> NodeStorageOps<'a> for NodeStorageRef<'a> { for_all!(self, node => node.vid()) } + fn id(self) -> u64 { + for_all!(self, node => node.id()) + } + fn name(self) -> Option<&'a str> { for_all!(self, node => node.name()) } diff --git a/raphtory/src/db/api/storage/nodes/node_storage_ops.rs b/raphtory/src/db/api/storage/nodes/node_storage_ops.rs index 027886d50b..14bb843e67 100644 --- a/raphtory/src/db/api/storage/nodes/node_storage_ops.rs +++ b/raphtory/src/db/api/storage/nodes/node_storage_ops.rs @@ -25,6 +25,8 @@ pub trait NodeStorageOps<'a>: Sized { fn vid(self) -> VID; + fn id(self) -> u64; + fn name(self) -> Option<&'a str>; fn find_edge(self, dst: VID, layer_ids: &LayerIds) -> Option; @@ -59,6 +61,10 @@ impl<'a> NodeStorageOps<'a> for &'a NodeStore { self.vid } + fn id(self) -> u64 { + self.global_id + } + fn name(self) -> Option<&'a str> { self.name.as_str() } diff --git a/raphtory/src/db/api/storage/storage_ops.rs b/raphtory/src/db/api/storage/storage_ops.rs index 10d136afe8..e6e387d2e8 100644 --- a/raphtory/src/db/api/storage/storage_ops.rs +++ b/raphtory/src/db/api/storage/storage_ops.rs @@ -1,17 +1,3 @@ -#[cfg(feature = "arrow")] -use crate::{ - arrow::storage_interface::{ - edges::ArrowEdges, - edges_ref::ArrowEdgesRef, - node::{ArrowNode, ArrowOwnedNode}, - nodes::ArrowNodesOwned, - nodes_ref::ArrowNodesRef, - }, - db::api::storage::variants::storage_variants::StorageVariants, -}; -#[cfg(feature = "arrow")] -use raphtory_arrow::graph::TemporalGraph; - use crate::{ core::{ entities::{edges::edge_ref::EdgeRef, LayerIds, EID, VID}, @@ -43,7 +29,23 @@ use crate::{ }; use itertools::Itertools; use rayon::prelude::*; -use std::{iter, sync::Arc}; +use std::iter; + +#[cfg(feature = "arrow")] +use crate::{ + arrow::storage_interface::{ + edges::ArrowEdges, + edges_ref::ArrowEdgesRef, + node::{ArrowNode, ArrowOwnedNode}, + nodes::ArrowNodesOwned, + nodes_ref::ArrowNodesRef, + }, + db::api::storage::variants::storage_variants::StorageVariants, +}; +#[cfg(feature = "arrow")] +use raphtory_arrow::graph::TemporalGraph; +#[cfg(feature = "arrow")] +use std::sync::Arc; #[derive(Debug, Clone)] pub enum GraphStorage { @@ -65,7 +67,9 @@ impl GraphStorage { match self { GraphStorage::Mem(storage) => NodesStorage::Mem(storage.nodes.clone()), #[cfg(feature = "arrow")] - GraphStorage::Arrow(storage) => NodesStorage::Arrow(ArrowNodesOwned::new(storage)), + GraphStorage::Arrow(storage) => { + NodesStorage::Arrow(ArrowNodesOwned::new(storage.clone())) + } } } @@ -82,7 +86,7 @@ impl GraphStorage { GraphStorage::Mem(storage) => NodeOwnedEntry::Mem(storage.nodes.arc_entry(vid)), #[cfg(feature = "arrow")] GraphStorage::Arrow(storage) => { - NodeOwnedEntry::Arrow(ArrowOwnedNode::new(storage, vid)) + NodeOwnedEntry::Arrow(ArrowOwnedNode::new(storage.clone(), vid)) } } } diff --git a/raphtory/src/db/api/view/internal/core_ops.rs b/raphtory/src/db/api/view/internal/core_ops.rs index 1e49986684..97bc4d700d 100644 --- a/raphtory/src/db/api/view/internal/core_ops.rs +++ b/raphtory/src/db/api/view/internal/core_ops.rs @@ -26,10 +26,12 @@ use crate::{ }, }; use enum_dispatch::enum_dispatch; +use std::ops::Range; + #[cfg(feature = "arrow")] use raphtory_arrow::timestamps::TimeStamps; +#[cfg(feature = "arrow")] use rayon::prelude::*; -use std::ops::Range; /// Core functions that should (almost-)always be implemented by pointing at the underlying graph. #[enum_dispatch] diff --git a/raphtory/src/db/api/view/node.rs b/raphtory/src/db/api/view/node.rs index 05f077ea25..5a57a0a68b 100644 --- a/raphtory/src/db/api/view/node.rs +++ b/raphtory/src/db/api/view/node.rs @@ -6,7 +6,7 @@ use crate::{ }, db::api::{ properties::{internal::PropertiesOps, Properties}, - storage::storage_ops::GraphStorage, + storage::{nodes::node_storage_ops::NodeStorageOps, storage_ops::GraphStorage}, view::{ internal::{CoreGraphOps, OneHopFilter, TimeSemantics}, reset_filter::ResetFilter, @@ -175,7 +175,7 @@ impl<'graph, V: BaseNodeViewOps<'graph> + 'graph> NodeViewOps<'graph> for V { #[inline] fn id(&self) -> Self::ValueType { - self.map(|_cg, g, v| g.node_id(v)) + self.map(|cg, _g, v| cg.node(v).id()) } #[inline] fn name(&self) -> Self::ValueType { diff --git a/raphtory/src/db/task/edge/eval_edge.rs b/raphtory/src/db/task/edge/eval_edge.rs index 5f0a19915f..29b022aadf 100644 --- a/raphtory/src/db/task/edge/eval_edge.rs +++ b/raphtory/src/db/task/edge/eval_edge.rs @@ -18,7 +18,7 @@ use crate::{ use crate::db::task::edge::eval_edges::EvalEdges; -use crate::db::api::storage::storage_ops::GraphStorage; +use crate::db::{api::storage::storage_ops::GraphStorage, task::eval_graph::EvalGraph}; use std::{cell::RefCell, rc::Rc}; pub struct EvalEdgeView<'graph, 'a, G, GH, CS: Clone, S> { @@ -102,15 +102,19 @@ impl< let node_state = self.node_state.clone(); let local_state_prev = self.local_state_prev; let storage = self.storage; - EvalNodeView { + let base_graph = self.edge.base_graph; + let eval_graph = EvalGraph { ss, - node: node.node, - graph: node.base_graph, - base_graph: node.base_graph, + base_graph, storage, - local_state: None, local_state_prev, node_state, + }; + EvalNodeView { + node: node.node, + graph: node.base_graph, + eval_graph, + local_state: None, } } diff --git a/raphtory/src/db/task/edge/eval_edges.rs b/raphtory/src/db/task/edge/eval_edges.rs index d11bd896fe..afbea45040 100644 --- a/raphtory/src/db/task/edge/eval_edges.rs +++ b/raphtory/src/db/task/edge/eval_edges.rs @@ -12,6 +12,7 @@ use crate::{ graph::edges::Edges, task::{ edge::eval_edge::EvalEdgeView, + eval_graph::EvalGraph, node::{eval_node::EvalPathFromNode, eval_node_state::EVState}, task_state::PrevLocalState, }, @@ -168,14 +169,18 @@ impl< let local_state_prev = self.local_state_prev; let path = self.edges.map_nodes(op); let base_graph = self.edges.base_graph; + let storage = self.storage; + let eval_graph = EvalGraph { + ss, + base_graph, + storage, + local_state_prev, + node_state, + }; EvalPathFromNode { graph: base_graph, - base_graph, + base_graph: eval_graph, op: path.op, - ss, - node_state, - local_state_prev, - storage: self.storage, } } diff --git a/raphtory/src/db/task/eval_graph.rs b/raphtory/src/db/task/eval_graph.rs new file mode 100644 index 0000000000..421d3c9379 --- /dev/null +++ b/raphtory/src/db/task/eval_graph.rs @@ -0,0 +1,61 @@ +use crate::{ + core::{ + entities::nodes::node_ref::AsNodeRef, + state::compute_state::{ComputeState, ComputeStateVec}, + }, + db::{ + api::storage::storage_ops::GraphStorage, + task::{ + edge::eval_edge::EvalEdgeView, + node::{eval_node::EvalNodeView, eval_node_state::EVState}, + task_state::PrevLocalState, + }, + }, + prelude::GraphViewOps, +}; +use std::{cell::RefCell, rc::Rc}; + +#[derive(Debug)] +pub struct EvalGraph<'graph, 'a, G, S, CS: Clone = ComputeStateVec> { + pub(crate) ss: usize, + pub(crate) base_graph: &'graph G, + pub(crate) storage: &'graph GraphStorage, + pub(crate) local_state_prev: &'graph PrevLocalState<'a, S>, + pub(crate) node_state: Rc>>, +} + +impl<'graph, 'a, G, S, CS: Clone> Clone for EvalGraph<'graph, 'a, G, S, CS> { + fn clone(&self) -> Self { + Self { + ss: self.ss, + base_graph: self.base_graph, + storage: self.storage, + local_state_prev: self.local_state_prev, + node_state: self.node_state.clone(), + } + } +} + +impl<'graph, 'a: 'graph, G: GraphViewOps<'graph>, S: 'static, CS: ComputeState + Clone> + EvalGraph<'graph, 'a, G, S, CS> +{ + pub fn node(&self, n: impl AsNodeRef) -> Option> { + let node = (&self.base_graph).node(n)?; + Some(EvalNodeView::new_local(node.node, self.clone(), None)) + } + + pub fn edge( + &self, + src: N, + dst: N, + ) -> Option> { + let edge = (&self.base_graph).edge(src, dst)?; + Some(EvalEdgeView::new( + self.ss, + edge, + self.storage, + self.node_state.clone(), + self.local_state_prev, + )) + } +} diff --git a/raphtory/src/db/task/mod.rs b/raphtory/src/db/task/mod.rs index afb69622d4..141ef726e9 100644 --- a/raphtory/src/db/task/mod.rs +++ b/raphtory/src/db/task/mod.rs @@ -4,6 +4,7 @@ use std::sync::Arc; pub mod context; pub mod edge; +mod eval_graph; pub mod node; pub mod task; pub mod task_runner; diff --git a/raphtory/src/db/task/node/eval_node.rs b/raphtory/src/db/task/node/eval_node.rs index a799159c09..3c252467d8 100644 --- a/raphtory/src/db/task/node/eval_node.rs +++ b/raphtory/src/db/task/node/eval_node.rs @@ -16,49 +16,34 @@ use crate::{ }, graph::{edges::Edges, node::NodeView, path::PathFromNode}, task::{ - edge::eval_edges::EvalEdges, node::eval_node_state::EVState, task_state::PrevLocalState, + edge::eval_edges::EvalEdges, eval_graph::EvalGraph, node::eval_node_state::EVState, }, }, prelude::{GraphViewOps, NodeTypesFilter}, }; -use std::{ - cell::{Ref, RefCell}, - rc::Rc, - sync::Arc, -}; +use std::{cell::Ref, sync::Arc}; pub struct EvalNodeView<'graph, 'a: 'graph, G, S, GH = &'graph G, CS: Clone = ComputeStateVec> { - pub(crate) ss: usize, - pub(crate) node: VID, - pub(crate) base_graph: &'graph G, + pub node: VID, + pub(crate) eval_graph: EvalGraph<'graph, 'a, G, S, CS>, pub(crate) graph: GH, - pub(crate) storage: &'graph GraphStorage, pub(crate) local_state: Option<&'graph mut S>, - pub(crate) local_state_prev: &'graph PrevLocalState<'a, S>, - pub(crate) node_state: Rc>>, } impl<'graph, 'a: 'graph, G: GraphViewOps<'graph>, CS: ComputeState + 'a, S> EvalNodeView<'graph, 'a, G, S, &'graph G, CS> { pub(crate) fn new_local( - ss: usize, node: VID, - g: &'graph G, - storage: &'graph GraphStorage, + eval_graph: EvalGraph<'graph, 'a, G, S, CS>, local_state: Option<&'graph mut S>, - local_state_prev: &'graph PrevLocalState<'a, S>, - node_state: Rc>>, ) -> Self { + let graph = eval_graph.base_graph; Self { - ss, node, - base_graph: g, - graph: g, - storage, + eval_graph, + graph, local_state, - local_state_prev, - node_state, } } } @@ -73,16 +58,12 @@ impl< > Clone for EvalNodeView<'graph, 'a, G, S, GH, CS> { fn clone(&self) -> Self { - EvalNodeView::new_filtered( - self.ss, - self.node, - self.base_graph, - self.graph.clone(), - self.storage, - None, - self.local_state_prev, - self.node_state.clone(), - ) + Self { + node: self.node, + eval_graph: self.eval_graph.clone(), + graph: self.graph.clone(), + local_state: None, + } } } @@ -95,12 +76,12 @@ impl< GH: GraphViewOps<'graph>, > EvalNodeView<'graph, 'a, G, S, GH, CS> { - pub fn graph(&self) -> GH { - self.graph.clone() + pub fn graph(&self) -> EvalGraph<'graph, 'a, G, S, CS> { + self.eval_graph.clone() } pub fn prev(&self) -> &S { let VID(i) = self.node; - &self.local_state_prev.state[i] + &self.eval_graph.local_state_prev.state[i] } pub fn get_mut(&mut self) -> &mut S { @@ -118,24 +99,16 @@ impl< } pub(crate) fn new_filtered( - ss: usize, node: VID, - base_graph: &'graph G, + eval_graph: EvalGraph<'graph, 'a, G, S, CS>, graph: GH, - storage: &'graph GraphStorage, local_state: Option<&'graph mut S>, - local_state_prev: &'graph PrevLocalState<'a, S>, - node_state: Rc>>, ) -> Self { Self { - ss, node, - base_graph, + eval_graph, graph, - storage, local_state, - local_state_prev, - node_state, } } @@ -149,10 +122,11 @@ impl< id: &AccId, a: IN, ) { - self.node_state + self.eval_graph + .node_state .borrow_mut() .shard_mut() - .accumulate_into(self.ss, self.pid(), a, id); + .accumulate_into(self.eval_graph.ss, self.pid(), a, id); } pub fn global_update>( @@ -160,10 +134,11 @@ impl< id: &AccId, a: IN, ) { - self.node_state + self.eval_graph + .node_state .borrow_mut() .global_mut() - .accumulate_global(self.ss, a, id); + .accumulate_global(self.eval_graph.ss, a, id); } /// Reads the global state for a given accumulator, returned value is the global @@ -191,7 +166,11 @@ impl< OUT: StateType, A: StateType, { - self.node_state.borrow().global().read_global(self.ss, agg) + self.eval_graph + .node_state + .borrow() + .global() + .read_global(self.eval_graph.ss, agg) } /// Read the current value of the node state using the given accumulator. @@ -204,10 +183,11 @@ impl< A: StateType, OUT: std::fmt::Debug, { - self.node_state + self.eval_graph + .node_state .borrow() .shard() - .read_with_pid(self.ss, self.pid(), agg_r) + .read_with_pid(self.eval_graph.ss, self.pid(), agg_r) .unwrap_or(ACC::finish(&ACC::zero())) } @@ -221,7 +201,12 @@ impl< A: StateType, OUT: std::fmt::Debug, { - Entry::new(self.node_state.borrow(), *agg_r, &self.node, self.ss) + Entry::new( + self.eval_graph.node_state.borrow(), + *agg_r, + &self.node, + self.eval_graph.ss, + ) } /// Read the prev value of the node state using the given accumulator. @@ -234,10 +219,11 @@ impl< A: StateType, OUT: std::fmt::Debug, { - self.node_state + self.eval_graph + .node_state .borrow() .shard() - .read_with_pid(self.ss + 1, self.pid(), agg_r) + .read_with_pid(self.eval_graph.ss + 1, self.pid(), agg_r) .unwrap_or(ACC::finish(&ACC::zero())) } @@ -249,10 +235,11 @@ impl< A: StateType, OUT: std::fmt::Debug, { - self.node_state + self.eval_graph + .node_state .borrow() .global() - .read_global(self.ss + 1, agg_r) + .read_global(self.eval_graph.ss + 1, agg_r) .unwrap_or(ACC::finish(&ACC::zero())) } } @@ -266,12 +253,8 @@ pub struct EvalPathFromNode< S, > { pub graph: GH, - pub(crate) base_graph: &'graph G, + pub(crate) base_graph: EvalGraph<'graph, 'a, G, S, CS>, pub(crate) op: Arc BoxedLIter<'graph, VID> + Send + Sync + 'graph>, - pub(crate) storage: &'graph GraphStorage, - pub(crate) ss: usize, - pub(crate) node_state: Rc>>, - pub(crate) local_state_prev: &'graph PrevLocalState<'a, S>, } impl< @@ -288,24 +271,10 @@ impl< } pub fn iter(&self) -> impl Iterator> + 'graph { - let local = self.local_state_prev; - let node_state = self.node_state.clone(); - let ss = self.ss; - let base_graph = self.base_graph; + let base_graph = self.base_graph.clone(); let graph = self.graph.clone(); - let storage = self.storage; - self.iter_refs().map(move |v| { - EvalNodeView::new_filtered( - ss, - v, - base_graph, - graph.clone(), - storage, - None, - local, - node_state.clone(), - ) - }) + self.iter_refs() + .map(move |v| EvalNodeView::new_filtered(v, base_graph.clone(), graph.clone(), None)) } } @@ -338,12 +307,8 @@ impl< fn clone(&self) -> Self { EvalPathFromNode { graph: self.graph.clone(), - base_graph: self.base_graph, + base_graph: self.base_graph.clone(), op: self.op.clone(), - storage: self.storage, - ss: self.ss, - node_state: self.node_state.clone(), - local_state_prev: self.local_state_prev, } } } @@ -372,7 +337,7 @@ impl< op: F, ) -> Self::ValueType { let graph = self.graph.clone(); - let storage = self.storage; + let storage = self.base_graph.storage; Box::new(self.iter_refs().map(move |node| op(storage, &graph, node))) } @@ -387,12 +352,12 @@ impl< &self, op: F, ) -> Self::Edges { - let local_state_prev = self.local_state_prev; - let node_state = self.node_state.clone(); - let ss = self.ss; - let storage = self.storage; + let local_state_prev = self.base_graph.local_state_prev; + let node_state = self.base_graph.node_state.clone(); + let ss = self.base_graph.ss; + let storage = self.base_graph.storage; let path = PathFromNode::new_one_hop_filtered( - self.base_graph, + self.base_graph.base_graph, self.graph.clone(), self.op.clone(), ); @@ -415,7 +380,7 @@ impl< ) -> Self::PathType { let old_op = self.op.clone(); let graph = self.graph.clone(); - let storage = self.storage; + let storage = self.base_graph.storage; let new_op = Arc::new(move || { let op = op.clone(); let graph = graph.clone(); @@ -423,17 +388,11 @@ impl< .flat_map(move |vv| op(storage, &graph, vv)) .into_dyn_boxed() }); - let ss = self.ss; - let node_state = self.node_state.clone(); - let local_state_prev = self.local_state_prev; + EvalPathFromNode { - graph: self.base_graph, - base_graph: self.base_graph, + graph: self.base_graph.base_graph, + base_graph: self.base_graph.clone(), op: new_op, - storage, - ss, - node_state, - local_state_prev, } } } @@ -456,25 +415,18 @@ impl< } fn base_graph(&self) -> &Self::BaseGraph { - &self.base_graph + &self.base_graph.base_graph } fn one_hop_filtered>( &self, filtered_graph: GHH, ) -> Self::Filtered { - let storage = self.storage; - let local_state_prev = self.local_state_prev; - let node_state = self.node_state.clone(); - let ss = self.ss; + let base_graph = self.base_graph.clone(); EvalPathFromNode { graph: filtered_graph, - base_graph: self.base_graph, + base_graph, op: self.op.clone(), - storage, - ss, - node_state, - local_state_prev, } } } @@ -497,23 +449,15 @@ impl< } fn base_graph(&self) -> &Self::BaseGraph { - &self.base_graph + &self.eval_graph.base_graph } fn one_hop_filtered>( &self, filtered_graph: GHH, ) -> Self::Filtered { - EvalNodeView::new_filtered( - self.ss, - self.node, - self.base_graph, - filtered_graph, - self.storage, - None, - self.local_state_prev, - self.node_state.clone(), - ) + let eval_graph = self.eval_graph.clone(); + EvalNodeView::new_filtered(self.node, eval_graph, filtered_graph, None) } } @@ -540,7 +484,7 @@ impl< &self, op: F, ) -> Self::ValueType { - op(self.storage, &self.graph, self.node) + op(self.eval_graph.storage, &self.graph, self.node) } fn as_props(&self) -> Self::ValueType> { @@ -554,15 +498,15 @@ impl< &self, op: F, ) -> Self::Edges { - let ss = self.ss; - let local_state_prev = self.local_state_prev; - let node_state = self.node_state.clone(); + let ss = self.eval_graph.ss; + let local_state_prev = self.eval_graph.local_state_prev; + let node_state = self.eval_graph.node_state.clone(); let node = self.node; - let storage = self.storage; + let storage = self.eval_graph.storage; let graph = self.graph.clone(); let edges = Arc::new(move || op(storage, &graph, node).into_dyn_boxed()); let edges = Edges { - base_graph: self.base_graph, + base_graph: self.eval_graph.base_graph, graph: self.graph.clone(), edges, }; @@ -584,20 +528,13 @@ impl< ) -> Self::PathType { let graph = self.graph.clone(); let node = self.node; - let storage = self.storage; - + let storage = self.eval_graph.storage; let path_op = Arc::new(move || op(storage, &graph, node).into_dyn_boxed()); - let ss = self.ss; - let local_state_prev = self.local_state_prev; - let node_state = self.node_state.clone(); + let eval_graph = self.eval_graph.clone(); EvalPathFromNode { - graph: self.base_graph, - base_graph: self.base_graph, + graph: eval_graph.base_graph, + base_graph: eval_graph, op: path_op, - storage, - local_state_prev, - node_state, - ss, } } } diff --git a/raphtory/src/db/task/task_runner.rs b/raphtory/src/db/task/task_runner.rs index 735ed98b20..344f0c9c42 100644 --- a/raphtory/src/db/task/task_runner.rs +++ b/raphtory/src/db/task/task_runner.rs @@ -15,7 +15,10 @@ use crate::{ }, db::{ api::{storage::storage_ops::GraphStorage, view::StaticGraphViewOps}, - task::node::{eval_node::EvalNodeView, eval_node_state::EVState}, + task::{ + eval_graph::EvalGraph, + node::{eval_node::EvalNodeView, eval_node_state::EVState}, + }, }, }; use rayon::{prelude::*, ThreadPool}; @@ -61,24 +64,21 @@ impl TaskRunner { let global_state_view = global_state.as_cow(); let g = self.ctx.graph(); - let mut done = true; - let node_state = EVState::rc_from(shard_state_view, global_state_view); - let local = PrevLocalState::new(prev_local_state); let mut v_ref = morcel_id * morcel_size; + for local_state in morcel { if g.has_node(VID(v_ref)) { - let mut vv = EvalNodeView::new_local( - self.ctx.ss(), - v_ref.into(), - &g, + let eval_graph = EvalGraph { + ss: self.ctx.ss(), + base_graph: &g, storage, - Some(local_state), - &local, - node_state.clone(), - ); + local_state_prev: &local, + node_state: node_state.clone(), + }; + let mut vv = EvalNodeView::new_local(v_ref.into(), eval_graph, Some(local_state)); match task.run(&mut vv) { Step::Continue => { diff --git a/raphtory/src/db/task/task_state.rs b/raphtory/src/db/task/task_state.rs index 3aaf552916..8c9d7654ca 100644 --- a/raphtory/src/db/task/task_state.rs +++ b/raphtory/src/db/task/task_state.rs @@ -9,7 +9,7 @@ pub struct Global(Arc>); #[derive(Clone, Debug)] pub struct Shard(Arc>); -#[derive(Debug)] +#[derive(Debug, Copy, Clone)] pub(crate) struct PrevLocalState<'a, S> { pub(crate) state: &'a Vec, } diff --git a/raphtory/src/python/graph/edges.rs b/raphtory/src/python/graph/edges.rs index 5eed35478a..ccae2313c6 100644 --- a/raphtory/src/python/graph/edges.rs +++ b/raphtory/src/python/graph/edges.rs @@ -19,9 +19,8 @@ use crate::{ ArcStringIterable, ArcStringVecIterable, BoolIterable, I64Iterable, I64VecIterable, NestedArcStringIterable, NestedArcStringVecIterable, NestedBoolIterable, NestedI64VecIterable, NestedOptionI64Iterable, NestedU64U64Iterable, - NestedUtcDateTimeIterable, NestedVecUtcDateTimeIterable, OptionArcStringIterable, - OptionI64Iterable, OptionUtcDateTimeIterable, OptionVecUtcDateTimeIterable, - U64U64Iterable, + NestedUtcDateTimeIterable, NestedVecUtcDateTimeIterable, OptionI64Iterable, + OptionUtcDateTimeIterable, OptionVecUtcDateTimeIterable, U64U64Iterable, }, }, utils::{ diff --git a/scripts/activate_private_arrow.py b/scripts/activate_private_arrow.py new file mode 100755 index 0000000000..5711de5513 --- /dev/null +++ b/scripts/activate_private_arrow.py @@ -0,0 +1,24 @@ +#!/usr/bin/env python3 +import subprocess +from pathlib import Path +import re + +directory = "./raphtory-arrow" +root_dir = Path(__file__).parent.parent +toml_file = root_dir / "Cargo.toml" + +with open(toml_file, "r") as f: + lines = f.readlines() + +for i, line in enumerate(lines[:-1]): + if "#[private-arrow]" in line: + next_line = lines[i + 1] + if next_line.strip().startswith("#") and "raphtory-arrow" in next_line: + lines[i + 1] = re.sub(r"#\s*", "", next_line, 1) + if "#[public-arrow]" in line: + next_line = lines[i + 1] + if next_line.strip().startswith("raphtory-arrow"): + lines[i + 1] = next_line.replace("raphtory-arrow", "# raphtory-arrow", 1) + +with open(toml_file, "w") as f: + f.writelines(lines) diff --git a/scripts/deactivate_private_arrow.py b/scripts/deactivate_private_arrow.py new file mode 100755 index 0000000000..c928ab6e30 --- /dev/null +++ b/scripts/deactivate_private_arrow.py @@ -0,0 +1,24 @@ +#!/usr/bin/env python3 +import subprocess +from pathlib import Path +import re + +directory = "./raphtory-arrow" +root_dir = Path(__file__).parent.parent +toml_file = root_dir / "Cargo.toml" + +with open(toml_file, "r") as f: + lines = f.readlines() + +for i, line in enumerate(lines[:-1]): + if "#[public-arrow]" in line: + next_line = lines[i + 1] + if next_line.strip().startswith("#") and "raphtory-arrow" in next_line: + lines[i + 1] = re.sub(r"#\s*", "", next_line, 1) + if "#[private-arrow]" in line: + next_line = lines[i + 1] + if next_line.strip().startswith("raphtory-arrow"): + lines[i + 1] = next_line.replace("raphtory-arrow", "# raphtory-arrow", 1) + +with open(toml_file, "w") as f: + f.writelines(lines) diff --git a/scripts/flip_ra.py b/scripts/flip_ra.py deleted file mode 100755 index 73e21ebaba..0000000000 --- a/scripts/flip_ra.py +++ /dev/null @@ -1,24 +0,0 @@ -#!/usr/bin/env python3 -import shutil -import os, stat -import subprocess - -directory = "./raphtory-arrow" - -def remove_readonly(func, path, _): - "Clear the readonly bit and reattempt the removal" - os.chmod(path, stat.S_IWRITE) - func(path) - -shutil.rmtree(directory, onerror=remove_readonly) - -subprocess.run( - [ - "git", - "clone", - "--depth", - "1", - "git@github.com:Pometry/raphtory-arrow.git", - "raphtory-arrow", - ] -)