diff --git a/Cargo.lock b/Cargo.lock index 4cac904ad5..02d0c7e334 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4465,6 +4465,7 @@ dependencies = [ "pometry-storage", "rand 0.8.5", "raphtory", + "raphtory-api", "raphtory-graphql", "rayon", "sorted_vector_map", diff --git a/pometry-storage-private b/pometry-storage-private index 7a99bad373..45f16ded88 160000 --- a/pometry-storage-private +++ b/pometry-storage-private @@ -1 +1 @@ -Subproject commit 7a99bad37344cbbf985b842d989771edea135387 +Subproject commit 45f16ded88a34586c75a6e45133220a84cf0bdd1 diff --git a/python/tests/test_graphdb/graph.bincode b/python/tests/test_graphdb/graph.bincode index 731d8cee4d..fe30207c97 100644 Binary files a/python/tests/test_graphdb/graph.bincode and b/python/tests/test_graphdb/graph.bincode differ diff --git a/raphtory-benchmark/Cargo.toml b/raphtory-benchmark/Cargo.toml index f9f773c1a4..2984484ec4 100644 --- a/raphtory-benchmark/Cargo.toml +++ b/raphtory-benchmark/Cargo.toml @@ -7,7 +7,8 @@ edition = "2021" [dependencies] criterion = { workspace = true } -raphtory = { path = "../raphtory", features = ["io"] } +raphtory = { path = "../raphtory", features = ["io"], version = "0.9.3" } +raphtory-api = { path = "../raphtory-api", version = "0.9.3" } raphtory-graphql = { path = "../raphtory-graphql", version = "0.9.3" } pometry-storage.workspace = true sorted_vector_map = { workspace = true } diff --git a/raphtory-benchmark/benches/arrow_algobench.rs b/raphtory-benchmark/benches/arrow_algobench.rs index a1b7d913ad..245f720ca4 100644 --- a/raphtory-benchmark/benches/arrow_algobench.rs +++ b/raphtory-benchmark/benches/arrow_algobench.rs @@ -1,187 +1,180 @@ -use crate::common::bench; -use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, SamplingMode}; -use raphtory::{ - algorithms::{ - centrality::pagerank::unweighted_page_rank, - components::weakly_connected_components, - metrics::{ - clustering_coefficient::clustering_coefficient, - local_clustering_coefficient::local_clustering_coefficient, - }, - motifs::local_triangle_count::local_triangle_count, - }, - graphgen::random_attachment::random_attachment, - prelude::*, -}; -use rayon::prelude::*; -use tempfile::TempDir; - mod common; -//TODO swap to new trianglecount -// pub fn global_triangle_count_analysis(c: &mut Criterion) { -// let mut group = c.benchmark_group("global_triangle_count"); -// group.sample_size(10); -// bench(&mut group, "global_triangle_count", None, |b| { -// let g = raphtory_db::graph_loader::lotr_graph::lotr_graph(1); -// let windowed_graph = g.window(i64::MIN, i64::MAX); -// b.iter(|| { -// global_triangle_count(&windowed_graph).unwrap(); -// }); -// }); -// -// group.finish(); -// } - -pub fn local_triangle_count_analysis(c: &mut Criterion) { - let mut group = c.benchmark_group("local_triangle_count"); - group.sample_size(10); - bench(&mut group, "local_triangle_count", None, |b| { - let g = raphtory::graph_loader::lotr_graph::lotr_graph(); - let test_dir = TempDir::new().unwrap(); - let g = g.persist_as_disk_graph(test_dir.path()).unwrap(); - let windowed_graph = g.window(i64::MIN, i64::MAX); - - b.iter(|| { - let node_ids = windowed_graph.nodes().collect(); - - node_ids.into_par_iter().for_each(|v| { - local_triangle_count(&windowed_graph, v).unwrap(); - }); - }) - }); - - group.finish(); -} - -pub fn local_clustering_coefficient_analysis(c: &mut Criterion) { - let mut group = c.benchmark_group("local_clustering_coefficient"); - - bench(&mut group, "local_clustering_coefficient", None, |b| { - let g: Graph = Graph::new(); - - let vs = vec![ - (1, 2, 1), - (1, 3, 2), - (1, 4, 3), - (3, 1, 4), - (3, 4, 5), - (3, 5, 6), - (4, 5, 7), - (5, 6, 8), - (5, 8, 9), - (7, 5, 10), - (8, 5, 11), - (1, 9, 12), - (9, 1, 13), - (6, 3, 14), - (4, 8, 15), - (8, 3, 16), - (5, 10, 17), - (10, 5, 18), - (10, 8, 19), - (1, 11, 20), - (11, 1, 21), - (9, 11, 22), - (11, 9, 23), - ]; - - for (src, dst, t) in &vs { - g.add_edge(*t, *src, *dst, NO_PROPS, None).unwrap(); - } - - let test_dir = TempDir::new().unwrap(); - let g = g.persist_as_disk_graph(test_dir.path()).unwrap(); - - let windowed_graph = g.window(0, 5); - b.iter(|| local_clustering_coefficient(&windowed_graph, 1)) - }); - - group.finish(); -} - -pub fn graphgen_large_clustering_coeff(c: &mut Criterion) { - let mut group = c.benchmark_group("graphgen_large_clustering_coeff"); - // generate graph - let graph = Graph::new(); - let seed: [u8; 32] = [1; 32]; - random_attachment(&graph, 500000, 4, Some(seed)); - - let test_dir = TempDir::new().unwrap(); - let graph = graph.persist_as_disk_graph(test_dir.path()).unwrap(); - - group.sampling_mode(SamplingMode::Flat); - group.measurement_time(std::time::Duration::from_secs(60)); - group.sample_size(10); - group.bench_with_input( - BenchmarkId::new("graphgen_large_clustering_coeff", &graph), - &graph, - |b, graph| { - b.iter(|| { - let result = clustering_coefficient(graph); - black_box(result); - }); +#[cfg(feature = "storage")] +pub mod arrow_bench { + + use crate::common::bench; + use criterion::{ + black_box, criterion_group, criterion_main, BenchmarkId, Criterion, SamplingMode, + }; + use raphtory::{ + algorithms::{ + centrality::pagerank::unweighted_page_rank, + components::weakly_connected_components, + metrics::{ + clustering_coefficient::clustering_coefficient, + local_clustering_coefficient::local_clustering_coefficient, + }, + motifs::local_triangle_count::local_triangle_count, }, - ); - group.finish() -} + graphgen::random_attachment::random_attachment, + prelude::*, + }; + use rayon::prelude::*; + use tempfile::TempDir; + + pub fn local_triangle_count_analysis(c: &mut Criterion) { + let mut group = c.benchmark_group("local_triangle_count"); + group.sample_size(10); + bench(&mut group, "local_triangle_count", None, |b| { + let g = raphtory::graph_loader::lotr_graph::lotr_graph(); + let test_dir = TempDir::new().unwrap(); + let g = g.persist_as_disk_graph(test_dir.path()).unwrap(); + let windowed_graph = g.window(i64::MIN, i64::MAX); -pub fn graphgen_large_pagerank(c: &mut Criterion) { - let mut group = c.benchmark_group("graphgen_large_pagerank"); - // generate graph - let graph = Graph::new(); - let seed: [u8; 32] = [1; 32]; - random_attachment(&graph, 500000, 4, Some(seed)); - - let test_dir = TempDir::new().unwrap(); - let graph = graph.persist_as_disk_graph(test_dir.path()).unwrap(); - group.sampling_mode(SamplingMode::Flat); - group.measurement_time(std::time::Duration::from_secs(20)); - group.sample_size(10); - group.bench_with_input( - BenchmarkId::new("graphgen_large_pagerank", &graph), - &graph, - |b, graph| { b.iter(|| { - let result = unweighted_page_rank(graph, Some(100), None, None, true, None); - black_box(result); - }); - }, - ); - group.finish() -} + let node_ids = windowed_graph.nodes().collect(); + + node_ids.into_par_iter().for_each(|v| { + local_triangle_count(&windowed_graph, v).unwrap(); + }); + }) + }); + + group.finish(); + } + + pub fn local_clustering_coefficient_analysis(c: &mut Criterion) { + let mut group = c.benchmark_group("local_clustering_coefficient"); + + bench(&mut group, "local_clustering_coefficient", None, |b| { + let g: Graph = Graph::new(); + + let vs = vec![ + (1, 2, 1), + (1, 3, 2), + (1, 4, 3), + (3, 1, 4), + (3, 4, 5), + (3, 5, 6), + (4, 5, 7), + (5, 6, 8), + (5, 8, 9), + (7, 5, 10), + (8, 5, 11), + (1, 9, 12), + (9, 1, 13), + (6, 3, 14), + (4, 8, 15), + (8, 3, 16), + (5, 10, 17), + (10, 5, 18), + (10, 8, 19), + (1, 11, 20), + (11, 1, 21), + (9, 11, 22), + (11, 9, 23), + ]; + + for (src, dst, t) in &vs { + g.add_edge(*t, *src, *dst, NO_PROPS, None).unwrap(); + } + + let test_dir = TempDir::new().unwrap(); + let g = g.persist_as_disk_graph(test_dir.path()).unwrap(); + + let windowed_graph = g.window(0, 5); + b.iter(|| local_clustering_coefficient(&windowed_graph, 1)) + }); + + group.finish(); + } + + pub fn graphgen_large_clustering_coeff(c: &mut Criterion) { + let mut group = c.benchmark_group("graphgen_large_clustering_coeff"); + // generate graph + let graph = Graph::new(); + let seed: [u8; 32] = [1; 32]; + random_attachment(&graph, 500000, 4, Some(seed)); -pub fn graphgen_large_concomp(c: &mut Criterion) { - let mut group = c.benchmark_group("graphgen_large_concomp"); - // generate graph - let graph = Graph::new(); - let seed: [u8; 32] = [1; 32]; - random_attachment(&graph, 500000, 4, Some(seed)); - let test_dir = TempDir::new().unwrap(); - let graph = graph.persist_as_disk_graph(test_dir.path()).unwrap(); - - group.sampling_mode(SamplingMode::Flat); - group.measurement_time(std::time::Duration::from_secs(60)); - group.sample_size(10); - group.bench_with_input( - BenchmarkId::new("graphgen_large_concomp", &graph), - &graph, - |b, graph| { - b.iter(|| { - let result = weakly_connected_components(graph, 20, None); - black_box(result); - }); - }, - ); - group.finish() + let test_dir = TempDir::new().unwrap(); + let graph = graph.persist_as_disk_graph(test_dir.path()).unwrap(); + + group.sampling_mode(SamplingMode::Flat); + group.measurement_time(std::time::Duration::from_secs(60)); + group.sample_size(10); + group.bench_with_input( + BenchmarkId::new("graphgen_large_clustering_coeff", &graph), + &graph, + |b, graph| { + b.iter(|| { + let result = clustering_coefficient(graph); + black_box(result); + }); + }, + ); + group.finish() + } + + pub fn graphgen_large_pagerank(c: &mut Criterion) { + let mut group = c.benchmark_group("graphgen_large_pagerank"); + // generate graph + let graph = Graph::new(); + let seed: [u8; 32] = [1; 32]; + random_attachment(&graph, 500000, 4, Some(seed)); + + let test_dir = TempDir::new().unwrap(); + let graph = graph.persist_as_disk_graph(test_dir.path()).unwrap(); + group.sampling_mode(SamplingMode::Flat); + group.measurement_time(std::time::Duration::from_secs(20)); + group.sample_size(10); + group.bench_with_input( + BenchmarkId::new("graphgen_large_pagerank", &graph), + &graph, + |b, graph| { + b.iter(|| { + let result = unweighted_page_rank(graph, Some(100), None, None, true, None); + black_box(result); + }); + }, + ); + group.finish() + } + + pub fn graphgen_large_concomp(c: &mut Criterion) { + let mut group = c.benchmark_group("graphgen_large_concomp"); + // generate graph + let graph = Graph::new(); + let seed: [u8; 32] = [1; 32]; + random_attachment(&graph, 500000, 4, Some(seed)); + let test_dir = TempDir::new().unwrap(); + let graph = graph.persist_as_disk_graph(test_dir.path()).unwrap(); + + group.sampling_mode(SamplingMode::Flat); + group.measurement_time(std::time::Duration::from_secs(60)); + group.sample_size(10); + group.bench_with_input( + BenchmarkId::new("graphgen_large_concomp", &graph), + &graph, + |b, graph| { + b.iter(|| { + let result = weakly_connected_components(graph, 20, None); + black_box(result); + }); + }, + ); + group.finish() + } } +#[cfg(feature = "storage")] criterion_group!( benches, - local_triangle_count_analysis, - local_clustering_coefficient_analysis, - graphgen_large_clustering_coeff, - graphgen_large_pagerank, - graphgen_large_concomp, + arrow_bench.local_triangle_count_analysis, + arrow_bench.local_clustering_coefficient_analysis, + arrow_bench.graphgen_large_clustering_coeff, + arrow_bench.graphgen_large_pagerank, + arrow_bench.graphgen_large_concomp, ); +#[cfg(feature = "storage")] criterion_main!(benches); diff --git a/raphtory-benchmark/benches/edge_add.rs b/raphtory-benchmark/benches/edge_add.rs index 81a124c90b..1bef0528c4 100644 --- a/raphtory-benchmark/benches/edge_add.rs +++ b/raphtory-benchmark/benches/edge_add.rs @@ -1,5 +1,5 @@ use criterion::{criterion_group, criterion_main, Criterion}; -use raphtory::{core::entities::nodes::input_node::InputNode, prelude::*}; +use raphtory::prelude::*; mod common; use rand::{ diff --git a/raphtory-benchmark/benches/graph_ops.rs b/raphtory-benchmark/benches/graph_ops.rs index 9aee4f630c..3ce0cb0efb 100644 --- a/raphtory-benchmark/benches/graph_ops.rs +++ b/raphtory-benchmark/benches/graph_ops.rs @@ -1,13 +1,11 @@ use common::run_graph_ops_benches; use criterion::{criterion_group, criterion_main, Criterion}; use raphtory::{ - core::utils::hashing::calculate_hash, - graph_loader::{ - example::sx_superuser_graph::{sx_superuser_file, sx_superuser_graph, TEdge}, - source::csv_loader::CsvLoader, - }, + graph_loader::sx_superuser_graph::{sx_superuser_file, sx_superuser_graph, TEdge}, + io::csv_loader::CsvLoader, prelude::*, }; +use raphtory_api::core::utils::hashing::calculate_hash; mod common; diff --git a/raphtory/src/core/entities/edges/edge_store.rs b/raphtory/src/core/entities/edges/edge_store.rs index edb1fea95b..cb2376f026 100644 --- a/raphtory/src/core/entities/edges/edge_store.rs +++ b/raphtory/src/core/entities/edges/edge_store.rs @@ -6,42 +6,57 @@ use crate::{ }, storage::{ lazy_vec::IllegalSet, - timeindex::{TimeIndex, TimeIndexEntry, TimeIndexIntoOps, TimeIndexOps}, - ArcEntry, + raw_edges::EdgeArcGuard, + timeindex::{TimeIndexEntry, TimeIndexIntoOps}, }, - utils::errors::GraphError, + utils::{errors::GraphError, iter::GenLockedIter}, Prop, }, db::api::{ storage::edges::edge_storage_ops::{EdgeStorageIntoOps, EdgeStorageOps}, - view::{BoxedLIter, IntoDynBoxed}, + view::IntoDynBoxed, }, }; - +use itertools::Itertools; use raphtory_api::core::entities::edges::edge_ref::EdgeRef; pub use raphtory_api::core::entities::edges::*; - -use itertools::{EitherOrBoth, Itertools}; -use ouroboros::self_referencing; use serde::{Deserialize, Serialize}; -use std::{ - iter, - ops::{DerefMut, Range}, -}; +use std::ops::{Deref, Range}; -#[derive(Serialize, Deserialize, Debug, Default, PartialEq)] +#[derive(Clone, Serialize, Deserialize, Debug, Default, PartialEq)] pub struct EdgeStore { pub(crate) eid: EID, pub(crate) src: VID, pub(crate) dst: VID, - pub(crate) data: Vec, } -#[derive(Serialize, Deserialize, Debug, Default, PartialEq)] -pub struct EdgeData { - pub(crate) layer: EdgeLayer, - pub(crate) additions: TimeIndex, - pub(crate) deletions: TimeIndex, +pub trait EdgeDataLike<'a> { + fn temporal_prop_ids(self) -> impl Iterator + 'a; + fn const_prop_ids(self) -> impl Iterator + 'a; +} + +impl<'a, T: Deref + 'a> EdgeDataLike<'a> for T { + fn temporal_prop_ids(self) -> impl Iterator + 'a { + GenLockedIter::from(self, |layer| { + Box::new( + layer + .props() + .into_iter() + .flat_map(|props| props.temporal_prop_ids()), + ) + }) + } + + fn const_prop_ids(self) -> impl Iterator + 'a { + GenLockedIter::from(self, |layer| { + Box::new( + layer + .props() + .into_iter() + .flat_map(|props| props.const_prop_ids()), + ) + }) + } } #[derive(Serialize, Deserialize, Debug, Default, PartialEq)] @@ -54,6 +69,10 @@ impl EdgeLayer { self.props.as_ref() } + pub fn into_props(self) -> Option { + self.props + } + pub fn add_prop( &mut self, t: TimeIndexEntry, @@ -78,13 +97,6 @@ impl EdgeLayer { props.update_constant_prop(prop_id, prop) } - pub(crate) fn const_prop_ids(&self) -> impl Iterator + '_ { - self.props - .as_ref() - .into_iter() - .flat_map(|props| props.const_prop_ids()) - } - pub(crate) fn const_prop(&self, prop_id: usize) -> Option<&Prop> { self.props.as_ref().and_then(|ps| ps.const_prop(prop_id)) } @@ -95,270 +107,33 @@ impl EdgeLayer { } impl EdgeStore { - pub fn as_edge_ref(&self) -> EdgeRef { - EdgeRef::new_outgoing(self.eid, self.src, self.dst) - } - - pub fn internal_num_layers(&self) -> usize { - self.data.len() - } - - fn get_or_allocate_layer(&mut self, layer_id: usize) -> &mut EdgeLayer { - if self.data.len() <= layer_id { - self.data.resize_with(layer_id + 1, Default::default); - } - &mut self.data[layer_id].layer - } - - pub fn has_layer_inner(&self, layer_id: usize) -> bool { - self.get_additions(layer_id) - .filter(|t_index| !t_index.is_empty()) - .is_some() - || self - .get_deletions(layer_id) - .filter(|t_index| !t_index.is_empty()) - .is_some() - } - - pub fn layer_iter(&self) -> impl Iterator + '_ { - self.data.iter() - } - - /// Iterate over (layer_id, additions, deletions) triplets for edge - pub fn updates_iter_inner<'a>( - &'a self, - layers: &'a LayerIds, - ) -> impl Iterator< - Item = ( - usize, - &'a TimeIndex, - &'a TimeIndex, - ), - > + 'a { - match layers { - LayerIds::None => Box::new(iter::empty()), - LayerIds::All => self - .additions_iter_inner(layers) - .zip_longest(self.deletions_iter_inner(layers)) - .enumerate() - .map(|(l, zipped)| match zipped { - EitherOrBoth::Both(additions, deletions) => (l, additions, deletions), - EitherOrBoth::Left(additions) => (l, additions, &TimeIndex::Empty), - EitherOrBoth::Right(deletions) => (l, &TimeIndex::Empty, deletions), - }) - .into_dyn_boxed(), - LayerIds::One(id) => Box::new(iter::once(( - *id, - self.get_additions(*id).unwrap_or(&TimeIndex::Empty), - self.get_deletions(*id).unwrap_or(&TimeIndex::Empty), - ))), - LayerIds::Multiple(ids) => Box::new(ids.iter().map(|id| { - ( - *id, - self.get_additions(*id).unwrap_or(&TimeIndex::Empty), - self.get_deletions(*id).unwrap_or(&TimeIndex::Empty), - ) - })), - } - } - - pub fn additions_iter_inner<'a>( - &'a self, - layers: &'a LayerIds, - ) -> BoxedLIter<'a, &TimeIndex> { - match layers { - LayerIds::None => iter::empty().into_dyn_boxed(), - LayerIds::All => self.iter_additions().into_dyn_boxed(), - LayerIds::One(id) => self.get_additions(*id).into_iter().into_dyn_boxed(), - LayerIds::Multiple(ids) => ids - .iter() - .flat_map(|id| self.get_additions(*id)) - .into_dyn_boxed(), - } - } - - pub fn deletions_iter_inner<'a>( - &'a self, - layers: &'a LayerIds, - ) -> BoxedLIter<'a, &TimeIndex> { - match layers { - LayerIds::None => iter::empty().into_dyn_boxed(), - LayerIds::All => self.iter_deletions().into_dyn_boxed(), - LayerIds::One(id) => self.get_deletions(*id).into_iter().into_dyn_boxed(), - LayerIds::Multiple(ids) => ids - .iter() - .flat_map(|id| self.get_deletions(*id)) - .into_dyn_boxed(), - } - } - - pub fn layer_ids_window_iter(&self, w: Range) -> impl Iterator + '_ { - let layer_ids = self - .iter_additions() - .enumerate() - .zip_longest(self.iter_deletions().enumerate()) - .flat_map(move |e| match e { - EitherOrBoth::Both((i, t1), (_, t2)) => { - if t1.contains(w.clone()) || t2.contains(w.clone()) { - Some(i) - } else { - None - } - } - EitherOrBoth::Left((i, t)) => { - if t.contains(w.clone()) { - Some(i) - } else { - None - } - } - EitherOrBoth::Right((i, t)) => { - if t.contains(w.clone()) { - Some(i) - } else { - None - } - } - }); - - layer_ids - } - pub fn new(src: VID, dst: VID) -> Self { Self { eid: 0.into(), src, dst, - data: Vec::with_capacity(1), } } - pub fn layer(&self, layer_id: usize) -> Option<&EdgeLayer> { - self.data.get(layer_id).map(|data| &data.layer) - } - - /// an edge is active in a window if it has an addition event in any of the layers - pub fn active(&self, layer_ids: &LayerIds, w: Range) -> bool { - match layer_ids { - LayerIds::None => false, - LayerIds::All => self - .iter_additions() - .any(|t_index| t_index.contains(w.clone())), - LayerIds::One(l_id) => self - .get_additions(*l_id) - .map(|t_index| t_index.contains(w)) - .unwrap_or(false), - LayerIds::Multiple(layers) => layers - .iter() - .any(|l_id| self.active(&LayerIds::One(*l_id), w.clone())), - } - } - - pub fn last_deletion(&self, layer_ids: &LayerIds) -> Option { - match layer_ids { - LayerIds::None => None, - LayerIds::All => self.iter_deletions().flat_map(|d| d.last()).max(), - LayerIds::One(id) => self.get_deletions(*id).and_then(|t| t.last()), - LayerIds::Multiple(ids) => ids - .iter() - .flat_map(|id| self.get_deletions(*id).and_then(|t| t.last())) - .max(), - } - } - - pub fn last_addition(&self, layer_ids: &LayerIds) -> Option { - match layer_ids { - LayerIds::None => None, - LayerIds::All => self.iter_additions().flat_map(|d| d.last()).max(), - LayerIds::One(id) => self.get_additions(*id).and_then(|t| t.last()), - LayerIds::Multiple(ids) => ids - .iter() - .flat_map(|id| self.get_additions(*id).and_then(|t| t.last())) - .max(), - } - } - - pub fn temporal_prop_layer_inner(&self, layer_id: usize, prop_id: usize) -> Option<&TProp> { - self.data - .get(layer_id) - .and_then(|layer| layer.layer.temporal_property(prop_id)) - } - - pub fn layer_mut(&mut self, layer_id: usize) -> impl DerefMut + '_ { - self.get_or_allocate_layer(layer_id) - } - - pub fn deletions_mut(&mut self, layer_id: usize) -> &mut TimeIndex { - if self.data.len() <= layer_id { - self.data.resize_with(layer_id + 1, Default::default); - } - &mut self.data[layer_id].deletions - } - - pub fn additions_mut(&mut self, layer_id: usize) -> &mut TimeIndex { - if self.data.len() <= layer_id { - self.data.resize_with(layer_id + 1, Default::default); - } - &mut self.data[layer_id].additions - } - - pub(crate) fn temp_prop_ids( - &self, - layer_id: Option, - ) -> Box + '_> { - if let Some(layer_id) = layer_id { - Box::new(self.data.get(layer_id).into_iter().flat_map(|layer| { - layer - .layer - .props() - .into_iter() - .flat_map(|props| props.temporal_prop_ids()) - })) - } else { - Box::new( - self.data - .iter() - .flat_map(|layer| layer.layer.props().map(|prop| prop.temporal_prop_ids())) - .kmerge() - .dedup(), - ) - } - } - - pub fn get_additions(&self, layer_id: usize) -> Option<&TimeIndex> { - self.data.get(layer_id).map(|data| &data.additions) - } - - pub fn get_deletions(&self, layer_id: usize) -> Option<&TimeIndex> { - self.data.get(layer_id).map(|data| &data.deletions) - } - - pub fn iter_additions(&self) -> impl Iterator> + '_ { - self.data.iter().map(|data| &data.additions) - } - - pub fn iter_deletions(&self) -> impl Iterator> + '_ { - self.data.iter().map(|data| &data.deletions) + pub fn as_edge_ref(&self) -> EdgeRef { + EdgeRef::new_outgoing(self.eid, self.src, self.dst) } } -impl EdgeStorageIntoOps for ArcEntry { +impl EdgeStorageIntoOps for EdgeArcGuard { fn into_layers( self, layer_ids: LayerIds, eref: EdgeRef, ) -> impl Iterator + Send { let layer_ids = layer_ids.constrain_from_edge(eref); - ExplodedIterBuilder { - entry: self, - layer_ids, - iter_builder: move |edge, layer_ids| { - edge.layer_ids_iter(layer_ids) - .map(move |l| eref.at_layer(l)) - .into_dyn_boxed() - }, - } - .build() + GenLockedIter::from((self, layer_ids), |(edge, layers)| { + Box::new( + edge.as_mem_edge() + .layer_ids_iter(layers) + .map(move |l| eref.at_layer(l)), + ) + }) } fn into_exploded( @@ -367,17 +142,13 @@ impl EdgeStorageIntoOps for ArcEntry { eref: EdgeRef, ) -> impl Iterator + Send { let layer_ids = layer_ids.constrain_from_edge(eref); - ExplodedIterBuilder { - entry: self, - layer_ids, - iter_builder: move |edge, layers| { - edge.additions_iter(layers) - .map(move |(l, a)| a.into_iter().map(move |t| eref.at(t).at_layer(l))) - .kmerge_by(|e1, e2| e1.time() <= e2.time()) - .into_dyn_boxed() - }, - } - .build() + GenLockedIter::from((self, layer_ids, eref), |(edge, layers, eref)| { + edge.as_mem_edge() + .additions_iter(layers) + .map(move |(l, a)| a.into_iter().map(move |t| eref.at(t).at_layer(l))) + .kmerge_by(|e1, e2| e1.time() <= e2.time()) + .into_dyn_boxed() + }) } fn into_exploded_window( @@ -387,36 +158,16 @@ impl EdgeStorageIntoOps for ArcEntry { eref: EdgeRef, ) -> impl Iterator + Send { let layer_ids = layer_ids.constrain_from_edge(eref); - ExplodedIterBuilder { - entry: self, - layer_ids, - iter_builder: move |edge, layers| { - edge.additions_iter(layers) + GenLockedIter::from((self, layer_ids, w), |(edge, layers, w)| { + Box::new( + edge.as_mem_edge() + .additions_iter(layers) .flat_map(move |(l, a)| { a.into_range(w.clone()) .into_iter() .map(move |t| eref.at(t).at_layer(l)) - }) - .into_dyn_boxed() - }, - } - .build() - } -} - -#[self_referencing] -pub struct ExplodedIter { - entry: ArcEntry, - layer_ids: LayerIds, - #[borrows(entry, layer_ids)] - #[covariant] - iter: Box + Send + 'this>, -} - -impl Iterator for ExplodedIter { - type Item = EdgeRef; - - fn next(&mut self) -> Option { - self.with_iter_mut(|iter| iter.next()) + }), + ) + }) } } diff --git a/raphtory/src/core/entities/graph/mod.rs b/raphtory/src/core/entities/graph/mod.rs index 3923ab8a20..75a98c5915 100644 --- a/raphtory/src/core/entities/graph/mod.rs +++ b/raphtory/src/core/entities/graph/mod.rs @@ -38,7 +38,7 @@ mod test { ) .unwrap(); - let first = g.inner().storage.nodes.get(v1); + let first = g.inner().storage.nodes.entry(v1); let ns = first .neighbours(&vec![l_btc, l_eth].into(), Direction::OUT) diff --git a/raphtory/src/core/entities/graph/tgraph.rs b/raphtory/src/core/entities/graph/tgraph.rs index 3f1a7625ab..f2853bbf08 100644 --- a/raphtory/src/core/entities/graph/tgraph.rs +++ b/raphtory/src/core/entities/graph/tgraph.rs @@ -12,8 +12,9 @@ use crate::{ }, storage::{ locked_view::LockedView, + raw_edges::EdgeWGuard, timeindex::{AsTime, TimeIndexEntry}, - Entry, EntryMut, + EntryMut, }, utils::errors::GraphError, Direction, Prop, @@ -52,8 +53,8 @@ impl InternalGraph { } pub(crate) fn lock(&self) -> LockedGraph { - let nodes = Arc::new(self.inner().storage.nodes.read_lock()); - let edges = Arc::new(self.inner().storage.edges.read_lock()); + let nodes = Arc::new(self.inner().storage.nodes_read_lock()); + let edges = Arc::new(self.inner().storage.edges_read_lock()); LockedGraph { nodes, edges } } @@ -105,8 +106,8 @@ impl std::fmt::Display for InternalGraph { write!( f, "Graph(num_nodes={}, num_edges={})", - self.inner().storage.nodes.len(), - self.inner().storage.edges.len() + self.inner().storage.nodes_len(), + self.inner().storage.edges_len() ) } } @@ -260,16 +261,6 @@ impl TemporalGraph { pub(crate) fn get_all_node_types(&self) -> Vec { self.node_meta.get_all_node_types() } - - #[inline] - pub(crate) fn node_entry(&self, v: VID) -> Entry<'_, NodeStore> { - self.storage.get_node(v) - } - - #[inline] - pub(crate) fn edge_entry(&self, e: EID) -> Entry<'_, EdgeStore> { - self.storage.get_edge(e) - } } impl TemporalGraph { @@ -417,7 +408,7 @@ impl TemporalGraph { Ok(()) } - fn link_nodes Result<(), GraphError>>( + fn link_nodes Result<(), GraphError>>( &self, src_id: VID, dst_id: VID, @@ -436,9 +427,10 @@ impl TemporalGraph { edge_id } None => { - let mut edge = EdgeStore::new(src_id, dst_id); + let mut edge = self.storage.push_edge(EdgeStore::new(src_id, dst_id)); + let eid = edge.edge_store().eid; edge_fn(&mut edge)?; - self.storage.push_edge(edge) + eid } }; @@ -461,9 +453,11 @@ impl TemporalGraph { // get the entries for the src and dst nodes self.link_nodes(src_id, dst_id, t, layer, move |edge| { edge.additions_mut(layer).insert(t); - let mut edge_layer = edge.layer_mut(layer); - for (prop_id, prop_value) in props { - edge_layer.add_prop(t, prop_id, prop_value)?; + if !props.is_empty() { + let edge_layer = edge.layer_mut(layer); + for (prop_id, prop_value) in props { + edge_layer.add_prop(t, prop_id, prop_value)?; + } } Ok(()) }) diff --git a/raphtory/src/core/entities/graph/tgraph_storage.rs b/raphtory/src/core/entities/graph/tgraph_storage.rs index 58f17beee4..efdda59595 100644 --- a/raphtory/src/core/entities/graph/tgraph_storage.rs +++ b/raphtory/src/core/entities/graph/tgraph_storage.rs @@ -1,6 +1,10 @@ use crate::core::{ entities::{edges::edge_store::EdgeStore, nodes::node_store::NodeStore, EID, VID}, - storage::{self, Entry, EntryMut, PairEntryMut}, + storage::{ + self, + raw_edges::{EdgeArcGuard, EdgeRGuard, EdgeWGuard, EdgesStorage, LockedEdges}, + Entry, EntryMut, PairEntryMut, + }, }; use serde::{Deserialize, Serialize}; @@ -9,28 +13,43 @@ pub(crate) struct GraphStorage { // node storage with having (id, time_index, properties, adj list for each layer) pub(crate) nodes: storage::RawStorage, - // edge storage with having (src, dst, time_index, properties) for each layer - pub(crate) edges: storage::RawStorage, + edges: EdgesStorage, } impl GraphStorage { pub(crate) fn new(num_locks: usize) -> Self { Self { nodes: storage::RawStorage::new(num_locks), - edges: storage::RawStorage::new(num_locks), + edges: EdgesStorage::new(), } } + pub fn nodes_read_lock(&self) -> storage::ReadLockedStorage { + self.nodes.read_lock() + } + + pub fn edges_read_lock(&self) -> LockedEdges { + self.edges.read_lock() + } + + pub fn nodes_len(&self) -> usize { + self.nodes.len() + } + + #[inline] + pub fn edges_len(&self) -> usize { + self.edges.len() + } + + #[inline] pub(crate) fn push_node(&self, node: NodeStore) -> VID { self.nodes .push(node, |vid, node| node.vid = vid.into()) .into() } - - pub(crate) fn push_edge(&self, edge: EdgeStore) -> EID { - self.edges - .push(edge, |eid, edge| edge.eid = eid.into()) - .into() + #[inline] + pub(crate) fn push_edge(&self, edge: EdgeStore) -> EdgeWGuard { + self.edges.push_edge(edge) } #[inline] @@ -39,8 +58,8 @@ impl GraphStorage { } #[inline] - pub(crate) fn get_edge_mut(&self, id: EID) -> EntryMut<'_, EdgeStore> { - self.edges.entry_mut(id) + pub(crate) fn get_edge_mut(&self, eid: EID) -> EdgeWGuard { + self.edges.get_edge_mut(eid) } #[inline] @@ -49,10 +68,16 @@ impl GraphStorage { } #[inline] - pub(crate) fn get_edge(&self, id: EID) -> Entry<'_, EdgeStore> { - self.edges.entry(id) + pub(crate) fn edge_entry(&self, eid: EID) -> EdgeRGuard { + self.edges.get_edge(eid) } + #[inline] + pub(crate) fn get_edge_arc(&self, eid: EID) -> EdgeArcGuard { + self.edges.get_edge_arc(eid) + } + + #[inline] pub(crate) fn pair_node_mut(&self, i: VID, j: VID) -> PairEntryMut<'_, NodeStore> { self.nodes.pair_entry_mut(i, j) } diff --git a/raphtory/src/core/entities/nodes/node_store.rs b/raphtory/src/core/entities/nodes/node_store.rs index 7b9ae2e969..38408b1041 100644 --- a/raphtory/src/core/entities/nodes/node_store.rs +++ b/raphtory/src/core/entities/nodes/node_store.rs @@ -10,11 +10,10 @@ use crate::core::{ timeindex::{AsTime, TimeIndex, TimeIndexEntry}, ArcEntry, Entry, }, - utils::errors::GraphError, + utils::{errors::GraphError, iter::GenLockedIter}, Direction, Prop, }; use itertools::Itertools; -use ouroboros::self_referencing; use serde::{Deserialize, Serialize}; use std::{ iter, @@ -350,20 +349,12 @@ impl NodeStore { } impl ArcEntry { - pub fn into_edges(self, layers: &LayerIds, dir: Direction) -> LockedAdjIter { - LockedAdjIterBuilder { - entry: self, - iter_builder: |node| node.edge_tuples(layers, dir), - } - .build() + pub fn into_edges(self, layers: &LayerIds, dir: Direction) -> impl Iterator { + GenLockedIter::from(self, |node| node.edge_tuples(layers, dir)) } - pub fn into_neighbours(self, layers: &LayerIds, dir: Direction) -> LockedNeighboursIter { - LockedNeighboursIterBuilder { - entry: self, - iter_builder: |node| node.neighbours(layers, dir), - } - .build() + pub fn into_neighbours(self, layers: &LayerIds, dir: Direction) -> impl Iterator { + GenLockedIter::from(self, |node| node.neighbours(layers, dir)) } pub fn into_layers(self) -> LockedLayers { @@ -384,85 +375,28 @@ impl ArcEntry { } impl<'a> Entry<'a, NodeStore> { - pub fn into_neighbours(self, layers: &LayerIds, dir: Direction) -> LockedRefNeighboursIter<'a> { - LockedRefNeighboursIterBuilder { - entry: self, - iter_builder: |node| node.neighbours(layers, dir), - } - .build() - } - - pub fn into_edges(self, layers: &LayerIds, dir: Direction) -> LockedRefEdgesIter<'a> { - LockedRefEdgesIterBuilder { - entry: self, - iter_builder: |node| node.edge_tuples(layers, dir), - } - .build() - } -} - -#[self_referencing] -pub struct LockedAdjIter { - entry: ArcEntry, - #[borrows(entry)] - #[covariant] - iter: Box + Send + 'this>, -} - -impl Iterator for LockedAdjIter { - type Item = EdgeRef; - - #[inline] - fn next(&mut self) -> Option { - self.with_iter_mut(|iter| iter.next()) - } -} - -#[self_referencing] -pub struct LockedNeighboursIter { - entry: ArcEntry, - #[borrows(entry)] - #[covariant] - iter: Box + Send + 'this>, -} - -impl Iterator for LockedNeighboursIter { - type Item = VID; - - fn next(&mut self) -> Option { - self.with_iter_mut(|iter| iter.next()) + pub fn into_neighbours( + self, + layers: &LayerIds, + dir: Direction, + ) -> impl Iterator + 'a { + GenLockedIter::from(self, |node| node.neighbours(layers, dir)) } -} -#[self_referencing] -pub struct LockedRefNeighboursIter<'a> { - entry: Entry<'a, NodeStore>, - #[borrows(entry)] - #[covariant] - iter: Box + Send + 'this>, -} - -impl<'a> Iterator for LockedRefNeighboursIter<'a> { - type Item = VID; - - fn next(&mut self) -> Option { - self.with_iter_mut(|iter| iter.next()) + pub fn into_edges( + self, + layers: &LayerIds, + dir: Direction, + ) -> impl Iterator + 'a { + GenLockedIter::from(self, |node| node.edge_tuples(layers, dir)) } -} -#[self_referencing] -pub struct LockedRefEdgesIter<'a> { - entry: Entry<'a, NodeStore>, - #[borrows(entry)] - #[covariant] - iter: Box + Send + 'this>, -} - -impl<'a> Iterator for LockedRefEdgesIter<'a> { - type Item = EdgeRef; - - fn next(&mut self) -> Option { - self.with_iter_mut(|iter| iter.next()) + pub fn into_edges_iter( + self, + layers: &'a LayerIds, + dir: Direction, + ) -> impl Iterator + 'a { + GenLockedIter::from(self, |node| Box::new(node.edge_tuples(layers, dir))) } } diff --git a/raphtory/src/core/entities/properties/props.rs b/raphtory/src/core/entities/properties/props.rs index 9da6ab3625..12009daef5 100644 --- a/raphtory/src/core/entities/properties/props.rs +++ b/raphtory/src/core/entities/properties/props.rs @@ -98,7 +98,7 @@ impl Props { self.constant_props.filled_ids() } - pub fn temporal_prop_ids(&self) -> impl Iterator + '_ { + pub fn temporal_prop_ids(&self) -> impl Iterator + Send + '_ { self.temporal_props.filled_ids() } } diff --git a/raphtory/src/core/storage/lazy_vec.rs b/raphtory/src/core/storage/lazy_vec.rs index f422c84642..15a3586586 100644 --- a/raphtory/src/core/storage/lazy_vec.rs +++ b/raphtory/src/core/storage/lazy_vec.rs @@ -31,13 +31,13 @@ pub(crate) enum LazyVec { impl LazyVec where - A: PartialEq + Default + Clone + Debug, + A: PartialEq + Default + Clone + Debug + Send + Sync, { pub(crate) fn from(id: usize, value: A) -> Self { LazyVec::LazyVec1(id, value) } - pub(crate) fn filled_ids(&self) -> Box + '_> { + pub(crate) fn filled_ids(&self) -> Box + Send + '_> { match self { LazyVec::Empty => Box::new(iter::empty()), LazyVec::LazyVec1(id, _) => Box::new(iter::once(*id)), @@ -127,7 +127,7 @@ where let mut value = A::default(); updater(&mut value)?; self.set(id, value) - .expect("Set failed over a non existing value") + .map_err(|e| GraphError::IllegalSet(e.to_string()))?; } }; Ok(()) diff --git a/raphtory/src/core/storage/mod.rs b/raphtory/src/core/storage/mod.rs index 218454320d..71b5a4d487 100644 --- a/raphtory/src/core/storage/mod.rs +++ b/raphtory/src/core/storage/mod.rs @@ -3,6 +3,7 @@ pub(crate) mod iter; pub mod lazy_vec; pub mod locked_view; +pub mod raw_edges; pub mod sorted_vec_map; pub mod timeindex; @@ -190,10 +191,6 @@ where } } - pub fn indices(&self) -> impl Iterator + Send + '_ { - 0..self.len() - } - pub fn new(n_locks: usize) -> Self { let data: Box<[LockVec]> = (0..n_locks) .map(|_| LockVec::new()) @@ -226,14 +223,6 @@ where Entry { offset, guard } } - #[inline] - pub fn get(&self, index: Index) -> impl Deref + '_ { - let index = index.into(); - let (bucket, offset) = self.resolve(index); - let guard = self.data[bucket].data.read_recursive(); - RwLockReadGuard::map(guard, |guard| &guard[offset]) - } - pub fn entry_arc(&self, index: Index) -> ArcEntry { let index = index.into(); let (bucket, offset) = self.resolve(index); @@ -298,14 +287,6 @@ pub struct Entry<'a, T: 'static> { guard: RwLockReadGuard<'a, Vec>, } -impl<'a, T: 'static> Clone for Entry<'a, T> { - fn clone(&self) -> Self { - let guard = RwLockReadGuard::rwlock(&self.guard).read_recursive(); - let i = self.offset; - Self { offset: i, guard } - } -} - #[derive(Debug)] pub struct ArcEntry { guard: Arc>>, @@ -329,35 +310,11 @@ impl Deref for ArcEntry { } } -impl AsRef for ArcEntry -where - T: ?Sized, - S: AsRef, -{ - fn as_ref(&self) -> &T { - self.deref().as_ref() - } -} - -impl<'a, T> Entry<'a, T> { - pub fn value(&self) -> &T { - &self.guard[self.offset] - } - - pub fn map &U>(self, f: F) -> LockedView<'a, U> { - let mapped_guard = RwLockReadGuard::map(self.guard, |guard| { - let what = &guard[self.offset]; - f(what) - }); - LockedView::LockMapped(mapped_guard) - } -} - impl<'a, T> Deref for Entry<'a, T> { type Target = T; fn deref(&self) -> &Self::Target { - self.value() + &self.guard[self.offset] } } diff --git a/raphtory/src/core/storage/raw_edges.rs b/raphtory/src/core/storage/raw_edges.rs new file mode 100644 index 0000000000..0325400751 --- /dev/null +++ b/raphtory/src/core/storage/raw_edges.rs @@ -0,0 +1,312 @@ +use std::{ + ops::Deref, + sync::{ + atomic::{self, AtomicUsize}, + Arc, + }, +}; + +use lock_api::ArcRwLockReadGuard; +use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard}; +use rayon::prelude::*; +use serde::{Deserialize, Serialize}; + +use raphtory_api::core::{entities::EID, storage::timeindex::TimeIndexEntry}; + +use crate::{ + core::entities::{ + edges::edge_store::{EdgeDataLike, EdgeLayer, EdgeStore}, + LayerIds, + }, + db::api::storage::edges::edge_storage_ops::{EdgeStorageOps, MemEdge}, +}; + +use super::{resolve, timeindex::TimeIndex}; + +#[derive(Debug, Serialize, Deserialize, PartialEq)] +pub struct EdgeShard { + edge_ids: Vec, + props: Vec>, + additions: Vec>>, + deletions: Vec>>, +} + +impl EdgeShard { + pub fn insert(&mut self, index: usize, value: EdgeStore) { + if index >= self.edge_ids.len() { + self.edge_ids.resize_with(index + 1, Default::default); + } + self.edge_ids[index] = value; + } + + pub fn edge_store(&self, index: usize) -> &EdgeStore { + &self.edge_ids[index] + } + + pub fn internal_num_layers(&self) -> usize { + self.additions.len().max(self.deletions.len()) + } + + pub fn additions(&self, index: usize, layer_id: usize) -> Option<&TimeIndex> { + self.additions.get(layer_id).and_then(|add| add.get(index)) + } + + pub fn deletions(&self, index: usize, layer_id: usize) -> Option<&TimeIndex> { + self.deletions.get(layer_id).and_then(|del| del.get(index)) + } + + pub fn props(&self, index: usize, layer_id: usize) -> Option<&EdgeLayer> { + self.props.get(layer_id).and_then(|props| props.get(index)) + } + + pub fn props_iter(&self, index: usize) -> impl Iterator { + self.props + .iter() + .enumerate() + .filter_map(move |(id, layer)| layer.get(index).map(|l| (id, l))) + } +} + +pub const SHARD_SIZE: usize = 64; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct EdgesStorage { + shards: Arc<[Arc>]>, + len: Arc, +} + +impl PartialEq for EdgesStorage { + fn eq(&self, other: &Self) -> bool { + self.shards.len() == other.shards.len() + && self + .shards + .iter() + .zip(other.shards.iter()) + .all(|(a, b)| a.read().eq(&b.read())) + } +} + +impl EdgesStorage { + pub fn new() -> Self { + let mut shards = (0..SHARD_SIZE).into_iter().map(|_| { + Arc::new(RwLock::new(EdgeShard { + edge_ids: vec![], + props: Vec::with_capacity(0), + additions: Vec::with_capacity(1), + deletions: Vec::with_capacity(0), + })) + }); + EdgesStorage { + shards: shards.collect(), + len: Arc::new(AtomicUsize::new(0)), + } + } + + #[inline] + pub fn len(&self) -> usize { + self.len.load(atomic::Ordering::SeqCst) + } + + pub(crate) fn push_edge(&self, edge: EdgeStore) -> EdgeWGuard { + let (eid, mut edge) = self.push(edge); + edge.edge_store_mut().eid = eid; + edge + } + + pub fn read_lock(&self) -> LockedEdges { + LockedEdges { + shards: self.shards.iter().map(|shard| shard.read_arc()).collect(), + len: self.len(), + } + } + + #[inline] + fn resolve(&self, index: usize) -> (usize, usize) { + resolve(index, self.shards.len()) + } + + fn push(&self, mut value: EdgeStore) -> (EID, EdgeWGuard) { + let index = self.len.fetch_add(1, atomic::Ordering::Relaxed); + let (bucket, offset) = self.resolve(index); + let mut shard = self.shards[bucket].write(); + shard.insert(offset, value); + let guard = EdgeWGuard { + guard: shard, + i: offset, + }; + (index.into(), guard) + } + + pub fn get_edge_mut(&self, eid: EID) -> EdgeWGuard { + let (bucket, offset) = self.resolve(eid.into()); + EdgeWGuard { + guard: self.shards[bucket].write(), + i: offset, + } + } + + pub fn get_edge(&self, eid: EID) -> EdgeRGuard { + let (bucket, offset) = self.resolve(eid.into()); + EdgeRGuard { + guard: self.shards[bucket].read(), + offset, + } + } + + pub fn get_edge_arc(&self, eid: EID) -> EdgeArcGuard { + let (bucket, offset) = self.resolve(eid.into()); + let guard = Arc::new(self.shards[bucket].read_arc()); + EdgeArcGuard { guard, offset } + } +} + +#[derive(Debug, Clone)] +pub struct EdgeArcGuard { + guard: Arc>, + offset: usize, +} + +impl EdgeArcGuard { + pub fn as_mem_edge(&self) -> MemEdge { + MemEdge::new(&self.guard, self.offset) + } +} + +pub struct EdgeWGuard<'a> { + guard: RwLockWriteGuard<'a, EdgeShard>, + i: usize, +} + +impl<'a> EdgeWGuard<'a> { + pub fn edge_store(&self) -> &EdgeStore { + &self.guard.edge_ids[self.i] + } + + pub fn edge_store_mut(&mut self) -> &mut EdgeStore { + &mut self.guard.edge_ids[self.i] + } + + pub fn deletions_mut(&mut self, layer_id: usize) -> &mut TimeIndex { + if layer_id >= self.guard.deletions.len() { + self.guard + .deletions + .resize_with(layer_id + 1, Default::default); + } + if self.i >= self.guard.deletions[layer_id].len() { + self.guard.deletions[layer_id].resize_with(self.i + 1, Default::default); + } + &mut self.guard.deletions[layer_id][self.i] + } + + pub fn additions_mut(&mut self, layer_id: usize) -> &mut TimeIndex { + if layer_id >= self.guard.additions.len() { + self.guard + .additions + .resize_with(layer_id + 1, Default::default); + } + if self.i >= self.guard.additions[layer_id].len() { + self.guard.additions[layer_id].resize_with(self.i + 1, Default::default); + } + &mut self.guard.additions[layer_id][self.i] + } + + pub fn layer_mut(&mut self, layer_id: usize) -> &mut EdgeLayer { + if layer_id >= self.guard.props.len() { + self.guard.props.resize_with(layer_id + 1, Default::default); + } + if self.i >= self.guard.props[layer_id].len() { + self.guard.props[layer_id].resize_with(self.i + 1, Default::default); + } + + &mut self.guard.props[layer_id][self.i] + } +} + +#[derive(Debug)] +pub struct EdgeRGuard<'a> { + guard: RwLockReadGuard<'a, EdgeShard>, + offset: usize, +} + +impl<'a> EdgeRGuard<'a> { + pub fn as_mem_edge(&self) -> MemEdge { + MemEdge::new(&self.guard, self.offset) + } + + pub fn has_layer(&self, layers: &LayerIds) -> bool { + self.as_mem_edge().has_layer(layers) + } + + pub fn layer_iter( + &self, + ) -> impl Iterator + '_)> + '_ { + self.guard.props_iter(self.offset) + } + + pub(crate) fn temp_prop_ids( + &self, + layer_id: Option, + ) -> Box + '_> { + if let Some(layer_id) = layer_id { + Box::new( + self.guard + .props(self.offset, layer_id) + .into_iter() + .flat_map(|layer| layer.temporal_prop_ids()), + ) + } else { + Box::new( + self.guard + .props_iter(self.offset) + .flat_map(|(_, layer)| layer.temporal_prop_ids()), + ) + } + } + + pub(crate) fn layer(&self, layer_id: usize) -> Option + '_> { + self.guard.props(self.offset, layer_id) + } +} + +#[derive(Debug)] +pub struct LockedEdges { + shards: Arc<[ArcRwLockReadGuard]>, + len: usize, +} + +impl LockedEdges { + pub fn get(&self, eid: EID) -> &EdgeShard { + let (bucket, offset) = resolve(eid.into(), self.shards.len()); + let shard = &self.shards[bucket]; + &shard + } + + pub fn get_mem(&self, eid: EID) -> MemEdge { + let (bucket, offset) = resolve(eid.into(), self.shards.len()); + MemEdge::new(&self.shards[bucket], offset) + } + + pub fn len(&self) -> usize { + self.len + } + + pub fn iter(&self) -> impl Iterator + '_ { + self.shards.iter().flat_map(|shard| { + shard + .edge_ids + .iter() + .enumerate() + .map(move |(offset, _)| MemEdge::new(&shard, offset)) + }) + } + + pub fn par_iter(&self) -> impl ParallelIterator + '_ { + self.shards.par_iter().flat_map(|shard| { + shard + .edge_ids + .par_iter() + .enumerate() + .map(move |(offset, _)| MemEdge::new(&shard, offset)) + }) + } +} diff --git a/raphtory/src/core/utils/errors.rs b/raphtory/src/core/utils/errors.rs index d2a6a2dd54..ac789824aa 100644 --- a/raphtory/src/core/utils/errors.rs +++ b/raphtory/src/core/utils/errors.rs @@ -129,6 +129,9 @@ pub enum GraphError { #[error("The time function is only available once an edge has been exploded via .explode(). You may want to retrieve the history for this edge via .history(), or the earliest/latest time via earliest_time or latest_time")] TimeAPIError, + + #[error("Illegal set error {0}")] + IllegalSet(String), } impl GraphError { diff --git a/raphtory/src/core/utils/iter.rs b/raphtory/src/core/utils/iter.rs new file mode 100644 index 0000000000..c941e8d335 --- /dev/null +++ b/raphtory/src/core/utils/iter.rs @@ -0,0 +1,32 @@ +use ouroboros::self_referencing; + +#[self_referencing] +pub struct GenLockedIter<'a, O, OUT> { + owner: O, + #[borrows(owner)] + #[covariant] + iter: Box + Send + 'this>, + mark: std::marker::PhantomData<&'a O>, +} + +impl<'a, O, OUT> Iterator for GenLockedIter<'a, O, OUT> { + type Item = OUT; + + fn next(&mut self) -> Option { + self.with_iter_mut(|iter| iter.next()) + } +} + +impl<'a, O, OUT> GenLockedIter<'a, O, OUT> { + pub fn from<'b>( + owner: O, + iter_fn: impl FnOnce(&O) -> Box + Send + '_>, + ) -> Self { + GenLockedIterBuilder { + owner, + iter_builder: |owner| iter_fn(owner), + mark: std::marker::PhantomData, + } + .build() + } +} diff --git a/raphtory/src/core/utils/mod.rs b/raphtory/src/core/utils/mod.rs index f9a5a35317..03d3668889 100644 --- a/raphtory/src/core/utils/mod.rs +++ b/raphtory/src/core/utils/mod.rs @@ -1,2 +1,4 @@ pub mod errors; pub mod time; + +pub mod iter; diff --git a/raphtory/src/db/api/storage/edges/edge_entry.rs b/raphtory/src/db/api/storage/edges/edge_entry.rs index b32be169a4..533e27bbe3 100644 --- a/raphtory/src/db/api/storage/edges/edge_entry.rs +++ b/raphtory/src/db/api/storage/edges/edge_entry.rs @@ -1,28 +1,29 @@ +use std::ops::Range; + +use rayon::prelude::*; + +#[cfg(feature = "storage")] +use crate::disk_graph::storage_interface::edge::DiskEdge; use crate::{ core::{ - entities::{ - edges::{edge_ref::EdgeRef, edge_store::EdgeStore}, - LayerIds, VID, - }, - storage::Entry, + entities::{edges::edge_ref::EdgeRef, LayerIds, VID}, + storage::raw_edges::EdgeRGuard, }, - db::api::storage::edges::{ - edge_ref::EdgeStorageRef, - edge_storage_ops::{EdgeStorageOps, TimeIndexRef}, + db::api::storage::{ + edges::{ + edge_ref::EdgeStorageRef, + edge_storage_ops::{EdgeStorageOps, TimeIndexRef}, + }, + tprop_storage_ops::TPropOps, }, }; -#[cfg(feature = "storage")] -use crate::disk_graph::storage_interface::edge::DiskEdge; - -use crate::db::api::storage::tprop_storage_ops::TPropOps; -use rayon::prelude::*; -use std::ops::Range; +use super::edge_storage_ops::MemEdge; #[derive(Debug)] pub enum EdgeStorageEntry<'a> { - Mem(&'a EdgeStore), - Unlocked(Entry<'a, EdgeStore>), + Mem(MemEdge<'a>), + Unlocked(EdgeRGuard<'a>), #[cfg(feature = "storage")] Disk(DiskEdge<'a>), } @@ -31,8 +32,8 @@ impl<'a> EdgeStorageEntry<'a> { #[inline] pub fn as_ref(&self) -> EdgeStorageRef { match self { - EdgeStorageEntry::Mem(edge) => EdgeStorageRef::Mem(edge), - EdgeStorageEntry::Unlocked(edge) => EdgeStorageRef::Mem(edge), + EdgeStorageEntry::Mem(edge) => EdgeStorageRef::Mem(*edge), + EdgeStorageEntry::Unlocked(edge) => EdgeStorageRef::Mem(edge.as_mem_edge()), #[cfg(feature = "storage")] EdgeStorageEntry::Disk(edge) => EdgeStorageRef::Disk(*edge), } diff --git a/raphtory/src/db/api/storage/edges/edge_owned_entry.rs b/raphtory/src/db/api/storage/edges/edge_owned_entry.rs index 3b4536da19..750ec8bca5 100644 --- a/raphtory/src/db/api/storage/edges/edge_owned_entry.rs +++ b/raphtory/src/db/api/storage/edges/edge_owned_entry.rs @@ -1,14 +1,17 @@ +use std::ops::Range; + +use rayon::iter::ParallelIterator; + +use raphtory_api::core::storage::timeindex::TimeIndexEntry; + #[cfg(feature = "storage")] use crate::db::api::storage::variants::storage_variants::StorageVariants; #[cfg(feature = "storage")] use crate::disk_graph::storage_interface::edge::DiskOwnedEdge; use crate::{ core::{ - entities::{ - edges::{edge_ref::EdgeRef, edge_store::EdgeStore}, - LayerIds, VID, - }, - storage::ArcEntry, + entities::{edges::edge_ref::EdgeRef, LayerIds, VID}, + storage::raw_edges::EdgeArcGuard, }, db::api::storage::{ edges::{ @@ -18,13 +21,10 @@ use crate::{ tprop_storage_ops::TPropOps, }, }; -use raphtory_api::core::storage::timeindex::TimeIndexEntry; -use rayon::iter::ParallelIterator; -use std::ops::Range; #[derive(Debug, Clone)] pub enum EdgeOwnedEntry { - Mem(ArcEntry), + Mem(EdgeArcGuard), #[cfg(feature = "storage")] Disk(DiskOwnedEdge), } @@ -51,7 +51,7 @@ macro_rules! for_all_variants { impl EdgeOwnedEntry { pub fn as_ref(&self) -> EdgeStorageRef { match self { - EdgeOwnedEntry::Mem(entry) => EdgeStorageRef::Mem(entry), + EdgeOwnedEntry::Mem(entry) => EdgeStorageRef::Mem(entry.as_mem_edge()), #[cfg(feature = "storage")] EdgeOwnedEntry::Disk(entry) => EdgeStorageRef::Disk(entry.as_ref()), } diff --git a/raphtory/src/db/api/storage/edges/edge_ref.rs b/raphtory/src/db/api/storage/edges/edge_ref.rs index cf2fa4db53..1f2be867c8 100644 --- a/raphtory/src/db/api/storage/edges/edge_ref.rs +++ b/raphtory/src/db/api/storage/edges/edge_ref.rs @@ -1,19 +1,20 @@ +use std::ops::Range; + +use rayon::prelude::*; + #[cfg(feature = "storage")] use crate::db::api::storage::variants::storage_variants::StorageVariants; #[cfg(feature = "storage")] use crate::disk_graph::storage_interface::edge::DiskEdge; use crate::{ - core::entities::{ - edges::{edge_ref::EdgeRef, edge_store::EdgeStore}, - LayerIds, EID, VID, - }, + core::entities::{edges::edge_ref::EdgeRef, LayerIds, EID, VID}, db::api::storage::{ edges::edge_storage_ops::{EdgeStorageOps, TimeIndexRef}, tprop_storage_ops::TPropOps, }, }; -use rayon::prelude::*; -use std::ops::Range; + +use super::edge_storage_ops::MemEdge; macro_rules! for_all { ($value:expr, $pattern:pat => $result:expr) => { @@ -46,7 +47,7 @@ macro_rules! for_all_iter { #[derive(Copy, Clone, Debug)] pub enum EdgeStorageRef<'a> { - Mem(&'a EdgeStore), + Mem(MemEdge<'a>), #[cfg(feature = "storage")] Disk(DiskEdge<'a>), } @@ -55,7 +56,7 @@ impl<'a> EdgeStorageRef<'a> { #[inline] pub fn eid(&self) -> EID { match self { - EdgeStorageRef::Mem(e) => e.eid, + EdgeStorageRef::Mem(e) => e.eid(), #[cfg(feature = "storage")] EdgeStorageRef::Disk(e) => e.eid(), } diff --git a/raphtory/src/db/api/storage/edges/edge_storage_ops.rs b/raphtory/src/db/api/storage/edges/edge_storage_ops.rs index 40e96574f6..8457e4b037 100644 --- a/raphtory/src/db/api/storage/edges/edge_storage_ops.rs +++ b/raphtory/src/db/api/storage/edges/edge_storage_ops.rs @@ -13,10 +13,10 @@ use crate::{ use pometry_storage::timestamps::TimeStamps; use crate::{ - core::entities::properties::tprop::TProp, + core::{entities::properties::tprop::TProp, storage::raw_edges::EdgeShard}, db::api::storage::{tprop_storage_ops::TPropOps, variants::layer_variants::LayerVariants}, }; -use raphtory_api::core::storage::timeindex::TimeIndexEntry; +use raphtory_api::core::{entities::EID, storage::timeindex::TimeIndexEntry}; use rayon::prelude::*; use std::ops::Range; @@ -234,17 +234,80 @@ pub trait EdgeStorageOps<'a>: Copy + Sized + Send + Sync + 'a { } } -impl<'a> EdgeStorageOps<'a> for &'a EdgeStore { +#[derive(Clone, Copy, Debug)] +pub struct MemEdge<'a> { + edges: &'a EdgeShard, + offset: usize, +} + +impl<'a> MemEdge<'a> { + pub fn new(edges: &'a EdgeShard, offset: usize) -> Self { + MemEdge { edges, offset } + } + + pub fn edge_store(&self) -> &EdgeStore { + self.edges.edge_store(self.offset) + } + + pub fn eid(self) -> EID { + self.edge_store().eid + } + + pub fn as_edge_ref(&self) -> EdgeRef { + EdgeRef::new_outgoing(self.eid(), self.src(), self.dst()) + } + + pub fn internal_num_layers(self) -> usize { + self.edges.internal_num_layers() + } + + fn get_additions(self, layer_id: usize) -> Option<&'a TimeIndex> { + self.edges.additions(self.offset, layer_id) + } + + fn get_deletions(self, layer_id: usize) -> Option<&'a TimeIndex> { + self.edges.deletions(self.offset, layer_id) + } + + pub fn has_layer_inner(self, layer_id: usize) -> bool { + self.get_additions(layer_id) + .filter(|t_index| !t_index.is_empty()) + .is_some() + || self + .get_deletions(layer_id) + .filter(|t_index| !t_index.is_empty()) + .is_some() + } + + pub fn temporal_prop_layer_inner(self, layer_id: usize, prop_id: usize) -> Option<&'a TProp> { + let layer = self.edges.props(self.offset, layer_id)?; + layer.temporal_property(prop_id) + } +} + +impl<'a> EdgeStorageOps<'a> for MemEdge<'a> { fn in_ref(self) -> EdgeRef { - EdgeRef::new_incoming(self.eid, self.src, self.dst) + EdgeRef::new_incoming(self.eid(), self.src(), self.dst()) } fn out_ref(self) -> EdgeRef { - EdgeRef::new_outgoing(self.eid, self.src, self.dst) + EdgeRef::new_outgoing(self.eid(), self.src(), self.dst()) } fn active(self, layer_ids: &LayerIds, w: Range) -> bool { - self.active(layer_ids, w) + match layer_ids { + LayerIds::None => false, + LayerIds::All => self + .additions_iter(layer_ids) + .any(|(_, t_index)| t_index.active_t(w.clone())), + LayerIds::One(l_id) => self + .get_additions(*l_id) + .filter(|a| a.active_t(w)) + .is_some(), + LayerIds::Multiple(layers) => layers + .iter() + .any(|l_id| self.active(&LayerIds::One(*l_id), w.clone())), + } } fn has_layer(self, layer_ids: &LayerIds) -> bool { @@ -257,25 +320,27 @@ impl<'a> EdgeStorageOps<'a> for &'a EdgeStore { } fn src(self) -> VID { - self.src + self.edge_store().src } fn dst(self) -> VID { - self.dst + self.edge_store().dst } fn layer_ids_iter(self, layer_ids: &'a LayerIds) -> impl Iterator + 'a { match layer_ids { LayerIds::None => LayerVariants::None(std::iter::empty()), LayerIds::All => LayerVariants::All( - (0..self.internal_num_layers()).filter(|&l| self.has_layer_inner(l)), + (0..self.internal_num_layers()).filter(move |&l| self.has_layer_inner(l)), ), LayerIds::One(id) => { LayerVariants::One(self.has_layer_inner(*id).then_some(*id).into_iter()) } - LayerIds::Multiple(ids) => { - LayerVariants::Multiple(ids.iter().copied().filter(|&id| self.has_layer_inner(id))) - } + LayerIds::Multiple(ids) => LayerVariants::Multiple( + ids.iter() + .copied() + .filter(move |&id| self.has_layer_inner(id)), + ), } } @@ -288,7 +353,7 @@ impl<'a> EdgeStorageOps<'a> for &'a EdgeStore { LayerIds::All => LayerVariants::All( (0..self.internal_num_layers()) .into_par_iter() - .filter(|&l| self.has_layer_inner(l)), + .filter(move |&l| self.has_layer_inner(l)), ), LayerIds::One(id) => { LayerVariants::One(self.has_layer_inner(*id).then_some(*id).into_par_iter()) @@ -296,7 +361,7 @@ impl<'a> EdgeStorageOps<'a> for &'a EdgeStore { LayerIds::Multiple(ids) => LayerVariants::Multiple( ids.par_iter() .copied() - .filter(|&id| self.has_layer_inner(id)), + .filter(move |&id| self.has_layer_inner(id)), ), } } @@ -309,6 +374,7 @@ impl<'a> EdgeStorageOps<'a> for &'a EdgeStore { TimeIndexRef::Ref(self.get_deletions(layer_id).unwrap_or(&TimeIndex::Empty)) } + #[inline(always)] fn temporal_prop_layer(self, layer_id: usize, prop_id: usize) -> impl TPropOps<'a> + 'a { self.temporal_prop_layer_inner(layer_id, prop_id) .unwrap_or(&TProp::Empty) diff --git a/raphtory/src/db/api/storage/edges/edges.rs b/raphtory/src/db/api/storage/edges/edges.rs index 7ded8af78c..7d99deb8d9 100644 --- a/raphtory/src/db/api/storage/edges/edges.rs +++ b/raphtory/src/db/api/storage/edges/edges.rs @@ -1,22 +1,17 @@ -use super::edge_entry::EdgeStorageEntry; +use super::{edge_entry::EdgeStorageEntry, unlocked::UnlockedEdges}; +#[cfg(feature = "storage")] +use crate::disk_graph::storage_interface::{edges::DiskEdges, edges_ref::DiskEdgesRef}; use crate::{ - core::{ - entities::{edges::edge_store::EdgeStore, LayerIds, EID}, - storage::ReadLockedStorage, - }, + core::{entities::LayerIds, storage::raw_edges::LockedEdges}, db::api::storage::{ - edges::edge_storage_ops::EdgeStorageOps, nodes::unlocked::UnlockedEdges, - variants::storage_variants3::StorageVariants, + edges::edge_storage_ops::EdgeStorageOps, variants::storage_variants3::StorageVariants, }, }; use rayon::iter::ParallelIterator; use std::sync::Arc; -#[cfg(feature = "storage")] -use crate::disk_graph::storage_interface::{edges::DiskEdges, edges_ref::DiskEdgesRef}; - pub enum EdgesStorage { - Mem(Arc>), + Mem(Arc), #[cfg(feature = "storage")] Disk(DiskEdges), } @@ -34,7 +29,7 @@ impl EdgesStorage { #[derive(Debug)] pub enum EdgesStorageRef<'a> { - Mem(&'a ReadLockedStorage), + Mem(&'a LockedEdges), Unlocked(UnlockedEdges<'a>), #[cfg(feature = "storage")] Disk(DiskEdgesRef<'a>), @@ -74,7 +69,7 @@ impl<'a> EdgesStorageRef<'a> { EdgesStorageRef::Unlocked(edges) => StorageVariants::Unlocked( edges .iter() - .filter(move |e| e.has_layer(&layers)) + .filter(move |e| e.as_mem_edge().has_layer(&layers)) .map(EdgeStorageEntry::Unlocked), ), } @@ -113,7 +108,7 @@ impl<'a> EdgesStorageRef<'a> { EdgesStorageRef::Unlocked(edges) => StorageVariants::Unlocked( edges .par_iter() - .filter(move |e| e.has_layer(&layers)) + .filter(move |e| e.as_mem_edge().has_layer(&layers)) .map(EdgeStorageEntry::Unlocked), ), } @@ -130,7 +125,10 @@ impl<'a> EdgesStorageRef<'a> { EdgesStorageRef::Unlocked(edges) => match layers { LayerIds::None => 0, LayerIds::All => edges.len(), - _ => edges.par_iter().filter(|e| e.has_layer(layers)).count(), + _ => edges + .par_iter() + .filter(|e| e.as_mem_edge().has_layer(layers)) + .count(), }, #[cfg(feature = "storage")] EdgesStorageRef::Disk(storage) => storage.count(layers), diff --git a/raphtory/src/db/api/storage/edges/mod.rs b/raphtory/src/db/api/storage/edges/mod.rs index 2b16658360..be155e8413 100644 --- a/raphtory/src/db/api/storage/edges/mod.rs +++ b/raphtory/src/db/api/storage/edges/mod.rs @@ -3,3 +3,4 @@ pub mod edge_owned_entry; pub mod edge_ref; pub mod edge_storage_ops; pub mod edges; +pub mod unlocked; diff --git a/raphtory/src/db/api/storage/edges/unlocked.rs b/raphtory/src/db/api/storage/edges/unlocked.rs new file mode 100644 index 0000000000..cbc2902e89 --- /dev/null +++ b/raphtory/src/db/api/storage/edges/unlocked.rs @@ -0,0 +1,28 @@ +use crate::core::{entities::graph::tgraph_storage::GraphStorage, storage::raw_edges::EdgeRGuard}; +use raphtory_api::core::entities::EID; +use rayon::prelude::*; + +#[derive(Copy, Clone, Debug)] +pub struct UnlockedEdges<'a>(pub(crate) &'a GraphStorage); + +impl<'a> UnlockedEdges<'a> { + pub fn iter(self) -> impl Iterator> + 'a { + let storage = self.0; + (0..storage.edges_len()) + .map(EID) + .map(|eid| storage.edge_entry(eid)) + } + + pub fn par_iter(self) -> impl ParallelIterator> + 'a { + let storage = self.0; + (0..storage.edges_len()) + .into_par_iter() + .map(EID) + .map(|eid| storage.edge_entry(eid)) + } + + #[inline] + pub fn len(self) -> usize { + self.0.edges_len() + } +} diff --git a/raphtory/src/db/api/storage/locked.rs b/raphtory/src/db/api/storage/locked.rs index 8e8d5d2cc1..c9d1efb675 100644 --- a/raphtory/src/db/api/storage/locked.rs +++ b/raphtory/src/db/api/storage/locked.rs @@ -1,11 +1,21 @@ +use std::sync::Arc; + use crate::core::{ - entities::{edges::edge_store::EdgeStore, nodes::node_store::NodeStore, EID, VID}, - storage::ReadLockedStorage, + entities::{nodes::node_store::NodeStore, VID}, + storage::{raw_edges::LockedEdges, ReadLockedStorage}, }; -use std::sync::Arc; -#[derive(Clone, Debug)] +#[derive(Debug)] pub struct LockedGraph { pub(crate) nodes: Arc>, - pub(crate) edges: Arc>, + pub(crate) edges: Arc, +} + +impl Clone for LockedGraph { + fn clone(&self) -> Self { + LockedGraph { + nodes: self.nodes.clone(), + edges: self.edges.clone(), + } + } } diff --git a/raphtory/src/db/api/storage/nodes/mod.rs b/raphtory/src/db/api/storage/nodes/mod.rs index 015bd0ee05..26081101c8 100644 --- a/raphtory/src/db/api/storage/nodes/mod.rs +++ b/raphtory/src/db/api/storage/nodes/mod.rs @@ -4,4 +4,3 @@ pub mod node_ref; pub mod node_storage_ops; pub mod nodes; pub mod nodes_ref; -pub mod unlocked; diff --git a/raphtory/src/db/api/storage/nodes/unlocked.rs b/raphtory/src/db/api/storage/nodes/unlocked.rs deleted file mode 100644 index 2217dcce21..0000000000 --- a/raphtory/src/db/api/storage/nodes/unlocked.rs +++ /dev/null @@ -1,115 +0,0 @@ -use crate::core::{ - entities::{ - edges::edge_store::EdgeStore, graph::tgraph::InternalGraph, nodes::node_store::NodeStore, - LayerIds, - }, - storage::{ArcEntry, Entry}, -}; -use ouroboros::self_referencing; -use raphtory_api::core::{ - entities::{edges::edge_ref::EdgeRef, EID, VID}, - Direction, -}; -use rayon::prelude::*; - -impl<'a> Entry<'a, NodeStore> { - pub fn into_edges_iter( - self, - layers: &'a LayerIds, - dir: Direction, - ) -> impl Iterator + 'a { - LockedEdgesRefIterBuilder { - entry: self, - iter_builder: |node| Box::new(node.edge_tuples(layers, dir)), - } - .build() - } -} - -#[self_referencing] -pub struct LockedEdgesRefIter<'a> { - entry: Entry<'a, NodeStore>, - #[borrows(entry)] - #[covariant] - iter: Box + Send + 'this>, -} - -impl<'a> Iterator for LockedEdgesRefIter<'a> { - type Item = EdgeRef; - - fn next(&mut self) -> Option { - self.with_iter_mut(|iter| iter.next()) - } -} - -#[derive(Clone, Copy, Debug)] -pub struct UnlockedNodes<'a>(pub &'a InternalGraph); - -impl<'a> UnlockedNodes<'a> { - pub fn len(self) -> usize { - self.0.inner().storage.nodes.len() - } - - pub fn node(&self, vid: VID) -> Entry<'a, NodeStore> { - self.0.inner().storage.nodes.entry(vid) - } - - pub fn iter(self) -> impl Iterator> + 'a { - let storage = &self.0.inner().storage.nodes; - (0..storage.len()).map(VID).map(|vid| storage.entry(vid)) - } - - pub fn par_iter(self) -> impl ParallelIterator> + 'a { - let storage = &self.0.inner().storage.nodes; - (0..storage.len()) - .into_par_iter() - .map(VID) - .map(|vid| storage.entry(vid)) - } -} - -#[derive(Debug, Clone)] -pub struct UnlockedOwnedNode { - g: InternalGraph, - vid: VID, -} - -impl UnlockedOwnedNode { - pub fn new(g: InternalGraph, vid: VID) -> Self { - Self { g, vid } - } - - pub fn arc_node(&self) -> ArcEntry { - self.g.inner().storage.nodes.entry_arc(self.vid) - } - - pub fn into_edges_iter( - self, - layers: LayerIds, - dir: Direction, - ) -> impl Iterator { - self.arc_node().into_edges(&layers, dir) - } -} - -#[derive(Copy, Clone, Debug)] -pub struct UnlockedEdges<'a>(pub &'a InternalGraph); - -impl<'a> UnlockedEdges<'a> { - pub fn iter(self) -> impl Iterator> + 'a { - let storage = &self.0.inner().storage.edges; - (0..storage.len()).map(EID).map(|eid| storage.entry(eid)) - } - - pub fn par_iter(self) -> impl ParallelIterator> + 'a { - let storage = &self.0.inner().storage.edges; - (0..storage.len()) - .into_par_iter() - .map(EID) - .map(|eid| storage.entry(eid)) - } - - pub fn len(self) -> usize { - self.0.inner().storage.edges.len() - } -} diff --git a/raphtory/src/db/api/storage/storage_ops.rs b/raphtory/src/db/api/storage/storage_ops.rs index 9001307010..a8cb1e2db4 100644 --- a/raphtory/src/db/api/storage/storage_ops.rs +++ b/raphtory/src/db/api/storage/storage_ops.rs @@ -42,8 +42,8 @@ use crate::{ use pometry_storage::graph::TemporalGraph; use super::{ - edges::edge_entry::EdgeStorageEntry, - nodes::{node_entry::NodeStorageEntry, unlocked::UnlockedEdges}, + edges::{edge_entry::EdgeStorageEntry, unlocked::UnlockedEdges}, + nodes::node_entry::NodeStorageEntry, }; #[derive(Debug, Clone)] @@ -87,7 +87,7 @@ impl GraphStorage { match self { GraphStorage::Mem(storage) => NodeStorageEntry::Mem(storage.nodes.get(vid)), GraphStorage::Unlocked(storage) => { - NodeStorageEntry::Unlocked(storage.inner().node_entry(vid)) + NodeStorageEntry::Unlocked(storage.inner().storage.get_node(vid)) } #[cfg(feature = "storage")] GraphStorage::Disk(storage) => NodeStorageEntry::Disk(DiskNode::new(storage, vid)), @@ -110,7 +110,9 @@ impl GraphStorage { pub fn edges(&self) -> EdgesStorageRef { match self { GraphStorage::Mem(storage) => EdgesStorageRef::Mem(&storage.edges), - GraphStorage::Unlocked(storage) => EdgesStorageRef::Unlocked(UnlockedEdges(storage)), + GraphStorage::Unlocked(storage) => { + EdgesStorageRef::Unlocked(UnlockedEdges(&storage.inner().storage)) + } #[cfg(feature = "storage")] GraphStorage::Disk(storage) => EdgesStorageRef::Disk(DiskEdgesRef::new(storage)), } @@ -127,9 +129,9 @@ impl GraphStorage { pub fn edge(&self, eid: EdgeRef) -> EdgeStorageEntry { match self { - GraphStorage::Mem(storage) => EdgeStorageEntry::Mem(storage.edges.get(eid.pid())), + GraphStorage::Mem(storage) => EdgeStorageEntry::Mem(storage.edges.get_mem(eid.pid())), GraphStorage::Unlocked(storage) => { - EdgeStorageEntry::Unlocked(storage.inner().edge_entry(eid.pid())) + EdgeStorageEntry::Unlocked(storage.inner().storage.edge_entry(eid.pid())) } #[cfg(feature = "storage")] GraphStorage::Disk(storage) => { @@ -226,15 +228,10 @@ impl GraphStorage { ) -> impl ParallelIterator + 'graph { view.node_list().into_par_iter().filter(move |&vid| { let node = self.node(vid); - let n = node.name(); - let i = node.node_type_id(); let r = type_filter .as_ref() .map_or(true, |type_filter| type_filter[node.node_type_id()]); let s = view.filter_node(self.node(vid).as_ref(), view.layer_ids()); - - println!("name = {:?}, id = {}, r = {}, s = {}", n, i, r, s); - r && s }) } @@ -280,25 +277,25 @@ impl GraphStorage { EdgesStorage::Mem(edges) => { let iter = (0..edges.len()).map(EID); let filtered = match view.filter_state() { - FilterState::Neither => { - FilterVariants::Neither(iter.map(move |eid| edges.get(eid).as_edge_ref())) - } + FilterState::Neither => FilterVariants::Neither( + iter.map(move |eid| edges.get_mem(eid).as_edge_ref()), + ), FilterState::Both => FilterVariants::Both(iter.filter_map(move |e| { - let e = EdgeStorageRef::Mem(edges.get(e)); + let e = EdgeStorageRef::Mem(edges.get_mem(e)); (view.filter_edge(e, view.layer_ids()) && view.filter_node(nodes.node_entry(e.src()), view.layer_ids()) && view.filter_node(nodes.node_entry(e.dst()), view.layer_ids())) .then(|| e.out_ref()) })), FilterState::Nodes => FilterVariants::Nodes(iter.filter_map(move |e| { - let e = EdgeStorageRef::Mem(edges.get(e)); + let e = EdgeStorageRef::Mem(edges.get_mem(e)); (view.filter_node(nodes.node_entry(e.src()), view.layer_ids()) && view.filter_node(nodes.node_entry(e.dst()), view.layer_ids())) .then(|| e.out_ref()) })), FilterState::Edges | FilterState::BothIndependent => { FilterVariants::Edges(iter.filter_map(move |e| { - let e = EdgeStorageRef::Mem(edges.get(e)); + let e = EdgeStorageRef::Mem(edges.get_mem(e)); view.filter_edge(e, view.layer_ids()).then(|| e.out_ref()) })) } @@ -407,25 +404,25 @@ impl GraphStorage { EdgesStorage::Mem(edges) => { let iter = (0..edges.len()).into_par_iter().map(EID); let filtered = match view.filter_state() { - FilterState::Neither => { - FilterVariants::Neither(iter.map(move |eid| edges.get(eid).as_edge_ref())) - } + FilterState::Neither => FilterVariants::Neither( + iter.map(move |eid| edges.get_mem(eid).as_edge_ref()), + ), FilterState::Both => FilterVariants::Both(iter.filter_map(move |e| { - let e = EdgeStorageRef::Mem(edges.get(e)); + let e = EdgeStorageRef::Mem(edges.get_mem(e)); (view.filter_edge(e, view.layer_ids()) && view.filter_node(nodes.node_entry(e.src()), view.layer_ids()) && view.filter_node(nodes.node_entry(e.dst()), view.layer_ids())) .then(|| e.out_ref()) })), FilterState::Nodes => FilterVariants::Nodes(iter.filter_map(move |e| { - let e = EdgeStorageRef::Mem(edges.get(e)); + let e = EdgeStorageRef::Mem(edges.get_mem(e)); (view.filter_node(nodes.node_entry(e.src()), view.layer_ids()) && view.filter_node(nodes.node_entry(e.dst()), view.layer_ids())) .then(|| e.out_ref()) })), FilterState::Edges | FilterState::BothIndependent => { FilterVariants::Edges(iter.filter_map(move |e| { - let e = EdgeStorageRef::Mem(edges.get(e)); + let e = EdgeStorageRef::Mem(edges.get_mem(e)); view.filter_edge(e, view.layer_ids()).then(|| e.out_ref()) })) } diff --git a/raphtory/src/db/internal/core_ops.rs b/raphtory/src/db/internal/core_ops.rs index b8b50926d9..780e385572 100644 --- a/raphtory/src/db/internal/core_ops.rs +++ b/raphtory/src/db/internal/core_ops.rs @@ -1,7 +1,7 @@ use crate::{ core::{ entities::{ - edges::edge_ref::EdgeRef, + edges::{edge_ref::EdgeRef, edge_store::EdgeDataLike}, graph::tgraph::InternalGraph, nodes::node_ref::NodeRef, properties::{graph_meta::GraphMeta, props::Meta, tprop::TProp}, @@ -124,7 +124,7 @@ impl CoreGraphOps for InternalGraph { #[inline] fn constant_node_prop(&self, v: VID, prop_id: usize) -> Option { - let entry = self.inner().node_entry(v); + let entry = self.inner().storage.get_node(v); entry.const_prop(prop_id).cloned() } @@ -133,7 +133,8 @@ impl CoreGraphOps for InternalGraph { // FIXME: revisit the locking scheme so we don't have to collect the ids Box::new( self.inner() - .node_entry(v) + .storage + .get_node(v) .const_prop_ids() .collect_vec() .into_iter(), @@ -145,7 +146,8 @@ impl CoreGraphOps for InternalGraph { // FIXME: revisit the locking scheme so we don't have to collect the ids Box::new( self.inner() - .node_entry(v) + .storage + .get_node(v) .temporal_prop_ids() .collect_vec() .into_iter(), @@ -154,7 +156,7 @@ impl CoreGraphOps for InternalGraph { fn get_const_edge_prop(&self, e: EdgeRef, prop_id: usize, layer_ids: LayerIds) -> Option { let layer_ids = layer_ids.constrain_from_edge(e); - let entry = self.inner().edge_entry(e.pid()); + let entry = self.inner().storage.edge_entry(e.pid()); match layer_ids { LayerIds::None => None, LayerIds::All => { @@ -163,14 +165,12 @@ impl CoreGraphOps for InternalGraph { entry .layer_iter() .next() - .and_then(|data| data.layer.const_prop(prop_id).cloned()) + .and_then(|(_, data)| data.const_prop(prop_id).cloned()) } else { let prop_map: HashMap<_, _> = entry .layer_iter() - .enumerate() .flat_map(|(id, data)| { - data.layer - .const_prop(prop_id) + data.const_prop(prop_id) .map(|p| (self.inner().get_layer_name(id), p.clone())) }) .collect(); @@ -186,9 +186,8 @@ impl CoreGraphOps for InternalGraph { let prop_map: HashMap<_, _> = ids .iter() .flat_map(|&id| { - entry.layer(id).and_then(|layer| { - layer - .const_prop(prop_id) + entry.layer(id).and_then(|data| { + data.const_prop(prop_id) .map(|p| (self.inner().get_layer_name(id), p.clone())) }) }) @@ -207,14 +206,14 @@ impl CoreGraphOps for InternalGraph { e: EdgeRef, layer_ids: LayerIds, ) -> Box + '_> { - // FIXME: revisit the locking scheme so we don't have to collect all the ids + // // FIXME: revisit the locking scheme so we don't have to collect all the ids let layer_ids = layer_ids.constrain_from_edge(e); - let entry = self.inner().edge_entry(e.pid()); + let entry = self.inner().storage.edge_entry(e.pid()); let ids: Vec<_> = match layer_ids { LayerIds::None => vec![], LayerIds::All => entry .layer_iter() - .map(|data| data.layer.const_prop_ids()) + .map(|(_, data)| data.const_prop_ids()) .kmerge() .dedup() .collect(), @@ -237,8 +236,8 @@ impl CoreGraphOps for InternalGraph { e: EdgeRef, layer_ids: &LayerIds, ) -> Box + '_> { - // FIXME: revisit the locking scheme so we don't have to collect the ids - let entry = self.inner().edge_entry(e.pid()); + // // FIXME: revisit the locking scheme so we don't have to collect the ids + let entry = self.inner().storage.edge_entry(e.pid()); match layer_ids { LayerIds::None => Box::new(iter::empty()), LayerIds::All => Box::new(entry.temp_prop_ids(None).collect_vec().into_iter()), @@ -256,17 +255,17 @@ impl CoreGraphOps for InternalGraph { #[inline] fn core_edges(&self) -> EdgesStorage { - EdgesStorage::Mem(Arc::new(self.inner().storage.edges.read_lock())) + EdgesStorage::Mem(Arc::new(self.inner().storage.edges_read_lock())) } #[inline] fn core_nodes(&self) -> NodesStorage { - NodesStorage::Mem(Arc::new(self.inner().storage.nodes.read_lock())) + NodesStorage::Mem(Arc::new(self.inner().storage.nodes_read_lock())) } #[inline] fn core_edge(&self, eid: ELID) -> EdgeStorageEntry { - EdgeStorageEntry::Unlocked(self.inner().storage.edges.entry(eid.pid())) + EdgeStorageEntry::Unlocked(self.inner().storage.edge_entry(eid.pid())) } #[inline] @@ -279,12 +278,12 @@ impl CoreGraphOps for InternalGraph { } fn core_edge_arc(&self, eid: ELID) -> EdgeOwnedEntry { - EdgeOwnedEntry::Mem(self.inner().storage.edges.entry_arc(eid.pid())) + EdgeOwnedEntry::Mem(self.inner().storage.get_edge_arc(eid.pid())) } #[inline] fn unfiltered_num_edges(&self) -> usize { - self.inner().storage.edges.len() + self.inner().storage.edges_len() } } diff --git a/raphtory/src/db/internal/list_ops.rs b/raphtory/src/db/internal/list_ops.rs index 3be8e16cf6..ccbdefd7d5 100644 --- a/raphtory/src/db/internal/list_ops.rs +++ b/raphtory/src/db/internal/list_ops.rs @@ -6,13 +6,13 @@ use crate::{ impl ListOps for InternalGraph { fn node_list(&self) -> NodeList { NodeList::All { - num_nodes: self.inner().storage.nodes.len(), + num_nodes: self.inner().storage.nodes_len(), } } fn edge_list(&self) -> EdgeList { EdgeList::All { - num_edges: self.inner().storage.edges.len(), + num_edges: self.inner().storage.edges_len(), } } } diff --git a/raphtory/src/db/internal/prop_add.rs b/raphtory/src/db/internal/prop_add.rs index 0c2ac9611f..d98a10aab6 100644 --- a/raphtory/src/db/internal/prop_add.rs +++ b/raphtory/src/db/internal/prop_add.rs @@ -68,7 +68,7 @@ impl InternalPropertyAdditionOps for InternalGraph { props: Vec<(usize, Prop)>, ) -> Result<(), GraphError> { let mut edge = self.inner().storage.get_edge_mut(eid); - let mut edge_layer = edge.layer_mut(layer); + let edge_layer = edge.layer_mut(layer); for (prop_id, value) in props { edge_layer .add_constant_prop(prop_id, value) @@ -93,7 +93,7 @@ impl InternalPropertyAdditionOps for InternalGraph { props: Vec<(usize, Prop)>, ) -> Result<(), GraphError> { let mut edge = self.inner().storage.get_edge_mut(eid); - let mut edge_layer = edge.layer_mut(layer); + let edge_layer = edge.layer_mut(layer); for (prop_id, value) in props { edge_layer.update_constant_prop(prop_id, value)?; } diff --git a/raphtory/src/db/internal/time_semantics.rs b/raphtory/src/db/internal/time_semantics.rs index 732a669cbe..e2a0fe4f4d 100644 --- a/raphtory/src/db/internal/time_semantics.rs +++ b/raphtory/src/db/internal/time_semantics.rs @@ -26,11 +26,11 @@ use std::ops::Range; impl TimeSemantics for InternalGraph { fn node_earliest_time(&self, v: VID) -> Option { - self.inner().node_entry(v).value().timestamps().first_t() + self.inner().storage.get_node(v).timestamps().first_t() } fn node_latest_time(&self, v: VID) -> Option { - self.inner().node_entry(v).value().timestamps().last_t() + self.inner().storage.get_node(v).timestamps().last_t() } fn view_start(&self) -> Option { @@ -71,8 +71,8 @@ impl TimeSemantics for InternalGraph { fn node_earliest_time_window(&self, v: VID, start: i64, end: i64) -> Option { self.inner() - .node_entry(v) - .value() + .storage + .get_node(v) .timestamps() .range_t(start..end) .first_t() @@ -80,8 +80,8 @@ impl TimeSemantics for InternalGraph { fn node_latest_time_window(&self, v: VID, start: i64, end: i64) -> Option { self.inner() - .node_entry(v) - .value() + .storage + .get_node(v) .timestamps() .range_t(start..end) .last_t() @@ -158,7 +158,7 @@ impl TimeSemantics for InternalGraph { } fn edge_exploded(&self, e: EdgeRef, layer_ids: &LayerIds) -> BoxedIter { - let entry = self.inner().storage.edges.entry_arc(e.pid()); + let entry = self.inner().storage.get_edge_arc(e.pid()); entry.into_exploded(layer_ids.clone(), e).into_dyn_boxed() } @@ -308,17 +308,17 @@ impl TimeSemantics for InternalGraph { } fn has_temporal_node_prop(&self, v: VID, prop_id: usize) -> bool { - let entry = self.inner().storage.nodes.get(v); + let entry = self.inner().storage.nodes.entry(v); entry.temporal_property(prop_id).is_some() } fn temporal_node_prop_vec(&self, v: VID, prop_id: usize) -> Vec<(i64, Prop)> { - let node = self.inner().storage.nodes.get(v); + let node = self.inner().storage.nodes.entry(v); node.temporal_properties(prop_id, None).collect() } fn has_temporal_node_prop_window(&self, v: VID, prop_id: usize, w: Range) -> bool { - let entry = self.inner().storage.nodes.get(v); + let entry = self.inner().storage.nodes.entry(v); entry .temporal_property(prop_id) .filter(|p| p.iter_window_t(w).next().is_some()) @@ -335,7 +335,7 @@ impl TimeSemantics for InternalGraph { self.inner() .storage .nodes - .get(v) + .entry(v) .temporal_properties(prop_id, Some(start..end)) .collect() } diff --git a/raphtory/src/lib.rs b/raphtory/src/lib.rs index 1121df1def..607392ce80 100644 --- a/raphtory/src/lib.rs +++ b/raphtory/src/lib.rs @@ -125,7 +125,7 @@ pub mod prelude { pub use raphtory_api::core::input::input_node::InputNode; } -pub const BINCODE_VERSION: u32 = 1u32; +pub const BINCODE_VERSION: u32 = 2u32; #[cfg(feature = "storage")] pub use polars_arrow as arrow2;