From 1732326eef26c9bf31be717335fb35e9d515120c Mon Sep 17 00:00:00 2001 From: Fabian Murariu Date: Thu, 20 Jun 2024 12:25:02 +0100 Subject: [PATCH] Refactor the graph storage to reduce memory usage --- Cargo.lock | 1 + pometry-storage-private | 2 +- raphtory-benchmark/Cargo.toml | 1 + raphtory-benchmark/benches/graph_ops.rs | 8 +- .../src/core/entities/edges/edge_store.rs | 373 +++--------------- raphtory/src/core/entities/graph/edges.rs | 77 ++++ raphtory/src/core/entities/graph/mod.rs | 3 +- raphtory/src/core/entities/graph/tgraph.rs | 30 +- .../src/core/entities/graph/tgraph_storage.rs | 47 ++- .../src/core/entities/nodes/node_store.rs | 112 ++---- .../src/core/entities/properties/props.rs | 2 +- raphtory/src/core/storage/lazy_vec.rs | 6 +- raphtory/src/core/storage/mod.rs | 62 ++- raphtory/src/core/storage/raw_edges.rs | 305 ++++++++++++++ raphtory/src/core/storage/timeindex.rs | 83 ++++ raphtory/src/core/utils/errors.rs | 3 + raphtory/src/core/utils/iter.rs | 32 ++ raphtory/src/core/utils/mod.rs | 2 + .../src/db/api/storage/edges/edge_entry.rs | 37 +- .../db/api/storage/edges/edge_owned_entry.rs | 20 +- raphtory/src/db/api/storage/edges/edge_ref.rs | 17 +- .../db/api/storage/edges/edge_storage_ops.rs | 125 +++++- raphtory/src/db/api/storage/edges/edges.rs | 50 +-- raphtory/src/db/api/storage/locked.rs | 18 +- raphtory/src/db/api/storage/nodes/unlocked.rs | 114 +----- raphtory/src/db/api/storage/storage_ops.rs | 13 +- .../src/db/api/storage/tprop_storage_ops.rs | 45 ++- raphtory/src/db/internal/core_ops.rs | 43 +- raphtory/src/db/internal/list_ops.rs | 4 +- raphtory/src/db/internal/prop_add.rs | 4 +- raphtory/src/db/internal/time_semantics.rs | 30 +- 31 files changed, 993 insertions(+), 676 deletions(-) create mode 100644 raphtory/src/core/entities/graph/edges.rs create mode 100644 raphtory/src/core/storage/raw_edges.rs create mode 100644 raphtory/src/core/utils/iter.rs diff --git a/Cargo.lock b/Cargo.lock index 4cac904ad5..02d0c7e334 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4465,6 +4465,7 @@ dependencies = [ "pometry-storage", "rand 0.8.5", "raphtory", + "raphtory-api", "raphtory-graphql", "rayon", "sorted_vector_map", diff --git a/pometry-storage-private b/pometry-storage-private index 7a99bad373..daf72073f4 160000 --- a/pometry-storage-private +++ b/pometry-storage-private @@ -1 +1 @@ -Subproject commit 7a99bad37344cbbf985b842d989771edea135387 +Subproject commit daf72073f4a3dbdc3824b50ac15f3399f0686bb3 diff --git a/raphtory-benchmark/Cargo.toml b/raphtory-benchmark/Cargo.toml index f9f773c1a4..af908f34bc 100644 --- a/raphtory-benchmark/Cargo.toml +++ b/raphtory-benchmark/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" [dependencies] criterion = { workspace = true } raphtory = { path = "../raphtory", features = ["io"] } +raphtory-api = { path = "../raphtory-api" } raphtory-graphql = { path = "../raphtory-graphql", version = "0.9.3" } pometry-storage.workspace = true sorted_vector_map = { workspace = true } diff --git a/raphtory-benchmark/benches/graph_ops.rs b/raphtory-benchmark/benches/graph_ops.rs index 9aee4f630c..3ce0cb0efb 100644 --- a/raphtory-benchmark/benches/graph_ops.rs +++ b/raphtory-benchmark/benches/graph_ops.rs @@ -1,13 +1,11 @@ use common::run_graph_ops_benches; use criterion::{criterion_group, criterion_main, Criterion}; use raphtory::{ - core::utils::hashing::calculate_hash, - graph_loader::{ - example::sx_superuser_graph::{sx_superuser_file, sx_superuser_graph, TEdge}, - source::csv_loader::CsvLoader, - }, + graph_loader::sx_superuser_graph::{sx_superuser_file, sx_superuser_graph, TEdge}, + io::csv_loader::CsvLoader, prelude::*, }; +use raphtory_api::core::utils::hashing::calculate_hash; mod common; diff --git a/raphtory/src/core/entities/edges/edge_store.rs b/raphtory/src/core/entities/edges/edge_store.rs index edb1fea95b..cb2376f026 100644 --- a/raphtory/src/core/entities/edges/edge_store.rs +++ b/raphtory/src/core/entities/edges/edge_store.rs @@ -6,42 +6,57 @@ use crate::{ }, storage::{ lazy_vec::IllegalSet, - timeindex::{TimeIndex, TimeIndexEntry, TimeIndexIntoOps, TimeIndexOps}, - ArcEntry, + raw_edges::EdgeArcGuard, + timeindex::{TimeIndexEntry, TimeIndexIntoOps}, }, - utils::errors::GraphError, + utils::{errors::GraphError, iter::GenLockedIter}, Prop, }, db::api::{ storage::edges::edge_storage_ops::{EdgeStorageIntoOps, EdgeStorageOps}, - view::{BoxedLIter, IntoDynBoxed}, + view::IntoDynBoxed, }, }; - +use itertools::Itertools; use raphtory_api::core::entities::edges::edge_ref::EdgeRef; pub use raphtory_api::core::entities::edges::*; - -use itertools::{EitherOrBoth, Itertools}; -use ouroboros::self_referencing; use serde::{Deserialize, Serialize}; -use std::{ - iter, - ops::{DerefMut, Range}, -}; +use std::ops::{Deref, Range}; -#[derive(Serialize, Deserialize, Debug, Default, PartialEq)] +#[derive(Clone, Serialize, Deserialize, Debug, Default, PartialEq)] pub struct EdgeStore { pub(crate) eid: EID, pub(crate) src: VID, pub(crate) dst: VID, - pub(crate) data: Vec, } -#[derive(Serialize, Deserialize, Debug, Default, PartialEq)] -pub struct EdgeData { - pub(crate) layer: EdgeLayer, - pub(crate) additions: TimeIndex, - pub(crate) deletions: TimeIndex, +pub trait EdgeDataLike<'a> { + fn temporal_prop_ids(self) -> impl Iterator + 'a; + fn const_prop_ids(self) -> impl Iterator + 'a; +} + +impl<'a, T: Deref + 'a> EdgeDataLike<'a> for T { + fn temporal_prop_ids(self) -> impl Iterator + 'a { + GenLockedIter::from(self, |layer| { + Box::new( + layer + .props() + .into_iter() + .flat_map(|props| props.temporal_prop_ids()), + ) + }) + } + + fn const_prop_ids(self) -> impl Iterator + 'a { + GenLockedIter::from(self, |layer| { + Box::new( + layer + .props() + .into_iter() + .flat_map(|props| props.const_prop_ids()), + ) + }) + } } #[derive(Serialize, Deserialize, Debug, Default, PartialEq)] @@ -54,6 +69,10 @@ impl EdgeLayer { self.props.as_ref() } + pub fn into_props(self) -> Option { + self.props + } + pub fn add_prop( &mut self, t: TimeIndexEntry, @@ -78,13 +97,6 @@ impl EdgeLayer { props.update_constant_prop(prop_id, prop) } - pub(crate) fn const_prop_ids(&self) -> impl Iterator + '_ { - self.props - .as_ref() - .into_iter() - .flat_map(|props| props.const_prop_ids()) - } - pub(crate) fn const_prop(&self, prop_id: usize) -> Option<&Prop> { self.props.as_ref().and_then(|ps| ps.const_prop(prop_id)) } @@ -95,270 +107,33 @@ impl EdgeLayer { } impl EdgeStore { - pub fn as_edge_ref(&self) -> EdgeRef { - EdgeRef::new_outgoing(self.eid, self.src, self.dst) - } - - pub fn internal_num_layers(&self) -> usize { - self.data.len() - } - - fn get_or_allocate_layer(&mut self, layer_id: usize) -> &mut EdgeLayer { - if self.data.len() <= layer_id { - self.data.resize_with(layer_id + 1, Default::default); - } - &mut self.data[layer_id].layer - } - - pub fn has_layer_inner(&self, layer_id: usize) -> bool { - self.get_additions(layer_id) - .filter(|t_index| !t_index.is_empty()) - .is_some() - || self - .get_deletions(layer_id) - .filter(|t_index| !t_index.is_empty()) - .is_some() - } - - pub fn layer_iter(&self) -> impl Iterator + '_ { - self.data.iter() - } - - /// Iterate over (layer_id, additions, deletions) triplets for edge - pub fn updates_iter_inner<'a>( - &'a self, - layers: &'a LayerIds, - ) -> impl Iterator< - Item = ( - usize, - &'a TimeIndex, - &'a TimeIndex, - ), - > + 'a { - match layers { - LayerIds::None => Box::new(iter::empty()), - LayerIds::All => self - .additions_iter_inner(layers) - .zip_longest(self.deletions_iter_inner(layers)) - .enumerate() - .map(|(l, zipped)| match zipped { - EitherOrBoth::Both(additions, deletions) => (l, additions, deletions), - EitherOrBoth::Left(additions) => (l, additions, &TimeIndex::Empty), - EitherOrBoth::Right(deletions) => (l, &TimeIndex::Empty, deletions), - }) - .into_dyn_boxed(), - LayerIds::One(id) => Box::new(iter::once(( - *id, - self.get_additions(*id).unwrap_or(&TimeIndex::Empty), - self.get_deletions(*id).unwrap_or(&TimeIndex::Empty), - ))), - LayerIds::Multiple(ids) => Box::new(ids.iter().map(|id| { - ( - *id, - self.get_additions(*id).unwrap_or(&TimeIndex::Empty), - self.get_deletions(*id).unwrap_or(&TimeIndex::Empty), - ) - })), - } - } - - pub fn additions_iter_inner<'a>( - &'a self, - layers: &'a LayerIds, - ) -> BoxedLIter<'a, &TimeIndex> { - match layers { - LayerIds::None => iter::empty().into_dyn_boxed(), - LayerIds::All => self.iter_additions().into_dyn_boxed(), - LayerIds::One(id) => self.get_additions(*id).into_iter().into_dyn_boxed(), - LayerIds::Multiple(ids) => ids - .iter() - .flat_map(|id| self.get_additions(*id)) - .into_dyn_boxed(), - } - } - - pub fn deletions_iter_inner<'a>( - &'a self, - layers: &'a LayerIds, - ) -> BoxedLIter<'a, &TimeIndex> { - match layers { - LayerIds::None => iter::empty().into_dyn_boxed(), - LayerIds::All => self.iter_deletions().into_dyn_boxed(), - LayerIds::One(id) => self.get_deletions(*id).into_iter().into_dyn_boxed(), - LayerIds::Multiple(ids) => ids - .iter() - .flat_map(|id| self.get_deletions(*id)) - .into_dyn_boxed(), - } - } - - pub fn layer_ids_window_iter(&self, w: Range) -> impl Iterator + '_ { - let layer_ids = self - .iter_additions() - .enumerate() - .zip_longest(self.iter_deletions().enumerate()) - .flat_map(move |e| match e { - EitherOrBoth::Both((i, t1), (_, t2)) => { - if t1.contains(w.clone()) || t2.contains(w.clone()) { - Some(i) - } else { - None - } - } - EitherOrBoth::Left((i, t)) => { - if t.contains(w.clone()) { - Some(i) - } else { - None - } - } - EitherOrBoth::Right((i, t)) => { - if t.contains(w.clone()) { - Some(i) - } else { - None - } - } - }); - - layer_ids - } - pub fn new(src: VID, dst: VID) -> Self { Self { eid: 0.into(), src, dst, - data: Vec::with_capacity(1), } } - pub fn layer(&self, layer_id: usize) -> Option<&EdgeLayer> { - self.data.get(layer_id).map(|data| &data.layer) - } - - /// an edge is active in a window if it has an addition event in any of the layers - pub fn active(&self, layer_ids: &LayerIds, w: Range) -> bool { - match layer_ids { - LayerIds::None => false, - LayerIds::All => self - .iter_additions() - .any(|t_index| t_index.contains(w.clone())), - LayerIds::One(l_id) => self - .get_additions(*l_id) - .map(|t_index| t_index.contains(w)) - .unwrap_or(false), - LayerIds::Multiple(layers) => layers - .iter() - .any(|l_id| self.active(&LayerIds::One(*l_id), w.clone())), - } - } - - pub fn last_deletion(&self, layer_ids: &LayerIds) -> Option { - match layer_ids { - LayerIds::None => None, - LayerIds::All => self.iter_deletions().flat_map(|d| d.last()).max(), - LayerIds::One(id) => self.get_deletions(*id).and_then(|t| t.last()), - LayerIds::Multiple(ids) => ids - .iter() - .flat_map(|id| self.get_deletions(*id).and_then(|t| t.last())) - .max(), - } - } - - pub fn last_addition(&self, layer_ids: &LayerIds) -> Option { - match layer_ids { - LayerIds::None => None, - LayerIds::All => self.iter_additions().flat_map(|d| d.last()).max(), - LayerIds::One(id) => self.get_additions(*id).and_then(|t| t.last()), - LayerIds::Multiple(ids) => ids - .iter() - .flat_map(|id| self.get_additions(*id).and_then(|t| t.last())) - .max(), - } - } - - pub fn temporal_prop_layer_inner(&self, layer_id: usize, prop_id: usize) -> Option<&TProp> { - self.data - .get(layer_id) - .and_then(|layer| layer.layer.temporal_property(prop_id)) - } - - pub fn layer_mut(&mut self, layer_id: usize) -> impl DerefMut + '_ { - self.get_or_allocate_layer(layer_id) - } - - pub fn deletions_mut(&mut self, layer_id: usize) -> &mut TimeIndex { - if self.data.len() <= layer_id { - self.data.resize_with(layer_id + 1, Default::default); - } - &mut self.data[layer_id].deletions - } - - pub fn additions_mut(&mut self, layer_id: usize) -> &mut TimeIndex { - if self.data.len() <= layer_id { - self.data.resize_with(layer_id + 1, Default::default); - } - &mut self.data[layer_id].additions - } - - pub(crate) fn temp_prop_ids( - &self, - layer_id: Option, - ) -> Box + '_> { - if let Some(layer_id) = layer_id { - Box::new(self.data.get(layer_id).into_iter().flat_map(|layer| { - layer - .layer - .props() - .into_iter() - .flat_map(|props| props.temporal_prop_ids()) - })) - } else { - Box::new( - self.data - .iter() - .flat_map(|layer| layer.layer.props().map(|prop| prop.temporal_prop_ids())) - .kmerge() - .dedup(), - ) - } - } - - pub fn get_additions(&self, layer_id: usize) -> Option<&TimeIndex> { - self.data.get(layer_id).map(|data| &data.additions) - } - - pub fn get_deletions(&self, layer_id: usize) -> Option<&TimeIndex> { - self.data.get(layer_id).map(|data| &data.deletions) - } - - pub fn iter_additions(&self) -> impl Iterator> + '_ { - self.data.iter().map(|data| &data.additions) - } - - pub fn iter_deletions(&self) -> impl Iterator> + '_ { - self.data.iter().map(|data| &data.deletions) + pub fn as_edge_ref(&self) -> EdgeRef { + EdgeRef::new_outgoing(self.eid, self.src, self.dst) } } -impl EdgeStorageIntoOps for ArcEntry { +impl EdgeStorageIntoOps for EdgeArcGuard { fn into_layers( self, layer_ids: LayerIds, eref: EdgeRef, ) -> impl Iterator + Send { let layer_ids = layer_ids.constrain_from_edge(eref); - ExplodedIterBuilder { - entry: self, - layer_ids, - iter_builder: move |edge, layer_ids| { - edge.layer_ids_iter(layer_ids) - .map(move |l| eref.at_layer(l)) - .into_dyn_boxed() - }, - } - .build() + GenLockedIter::from((self, layer_ids), |(edge, layers)| { + Box::new( + edge.as_mem_edge() + .layer_ids_iter(layers) + .map(move |l| eref.at_layer(l)), + ) + }) } fn into_exploded( @@ -367,17 +142,13 @@ impl EdgeStorageIntoOps for ArcEntry { eref: EdgeRef, ) -> impl Iterator + Send { let layer_ids = layer_ids.constrain_from_edge(eref); - ExplodedIterBuilder { - entry: self, - layer_ids, - iter_builder: move |edge, layers| { - edge.additions_iter(layers) - .map(move |(l, a)| a.into_iter().map(move |t| eref.at(t).at_layer(l))) - .kmerge_by(|e1, e2| e1.time() <= e2.time()) - .into_dyn_boxed() - }, - } - .build() + GenLockedIter::from((self, layer_ids, eref), |(edge, layers, eref)| { + edge.as_mem_edge() + .additions_iter(layers) + .map(move |(l, a)| a.into_iter().map(move |t| eref.at(t).at_layer(l))) + .kmerge_by(|e1, e2| e1.time() <= e2.time()) + .into_dyn_boxed() + }) } fn into_exploded_window( @@ -387,36 +158,16 @@ impl EdgeStorageIntoOps for ArcEntry { eref: EdgeRef, ) -> impl Iterator + Send { let layer_ids = layer_ids.constrain_from_edge(eref); - ExplodedIterBuilder { - entry: self, - layer_ids, - iter_builder: move |edge, layers| { - edge.additions_iter(layers) + GenLockedIter::from((self, layer_ids, w), |(edge, layers, w)| { + Box::new( + edge.as_mem_edge() + .additions_iter(layers) .flat_map(move |(l, a)| { a.into_range(w.clone()) .into_iter() .map(move |t| eref.at(t).at_layer(l)) - }) - .into_dyn_boxed() - }, - } - .build() - } -} - -#[self_referencing] -pub struct ExplodedIter { - entry: ArcEntry, - layer_ids: LayerIds, - #[borrows(entry, layer_ids)] - #[covariant] - iter: Box + Send + 'this>, -} - -impl Iterator for ExplodedIter { - type Item = EdgeRef; - - fn next(&mut self) -> Option { - self.with_iter_mut(|iter| iter.next()) + }), + ) + }) } } diff --git a/raphtory/src/core/entities/graph/edges.rs b/raphtory/src/core/entities/graph/edges.rs new file mode 100644 index 0000000000..86daad8037 --- /dev/null +++ b/raphtory/src/core/entities/graph/edges.rs @@ -0,0 +1,77 @@ +use crate::{ + core::{ + entities::edges::edge_store::EdgeStore, + storage::raw_edges::{EdgeArcGuard, EdgeRGuard, EdgeWGuard, EdgesShard, LockedEdgesShard}, + }, + db::api::storage::edges::edge_storage_ops::MemEdge, +}; +use raphtory_api::core::entities::EID; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Deserialize, Serialize, PartialEq)] +pub(crate) struct EdgesStorage { + edges: EdgesShard, +} + +#[derive(Debug)] +pub struct LockedEdges { + edges: LockedEdgesShard, +} + +impl LockedEdges { + pub fn iter(&self) -> impl Iterator + '_ { + self.edges.iter() + } + + pub fn par_iter(&self) -> impl rayon::iter::ParallelIterator + '_ { + self.edges.par_iter() + } + + #[inline] + pub fn get(&self, id: EID) -> MemEdge { + self.edges.get_mem(id) + } + + pub fn len(&self) -> usize { + self.edges.len() + } +} + +impl EdgesStorage { + pub(crate) fn new() -> Self { + Self { + edges: EdgesShard::new(), + } + } + + pub fn read_lock(&self) -> LockedEdges { + LockedEdges { + edges: self.edges.lock(), + } + } + + pub(crate) fn len(&self) -> usize { + self.edges.len() + } + + pub(crate) fn push(&self, edge: EdgeStore) -> EdgeWGuard { + let (eid, mut edge) = self.edges.push(edge); + edge.edge_store_mut().eid = eid; + edge + } + + #[inline] + pub(crate) fn entry(&self, id: EID) -> EdgeRGuard { + self.edges.get_edge(id) + } + + #[inline] + pub(crate) fn entry_mut(&self, id: EID) -> EdgeWGuard { + self.edges.get_edge_mut(id) + } + + #[inline] + pub(crate) fn entry_arc(&self, id: EID) -> EdgeArcGuard { + self.edges.get_edge_arc(id) + } +} diff --git a/raphtory/src/core/entities/graph/mod.rs b/raphtory/src/core/entities/graph/mod.rs index 3923ab8a20..64575837f6 100644 --- a/raphtory/src/core/entities/graph/mod.rs +++ b/raphtory/src/core/entities/graph/mod.rs @@ -1,3 +1,4 @@ +pub mod edges; pub mod tgraph; pub mod tgraph_storage; pub(crate) mod timer; @@ -38,7 +39,7 @@ mod test { ) .unwrap(); - let first = g.inner().storage.nodes.get(v1); + let first = g.inner().storage.nodes.entry(v1); let ns = first .neighbours(&vec![l_btc, l_eth].into(), Direction::OUT) diff --git a/raphtory/src/core/entities/graph/tgraph.rs b/raphtory/src/core/entities/graph/tgraph.rs index 3f1a7625ab..e216433f3a 100644 --- a/raphtory/src/core/entities/graph/tgraph.rs +++ b/raphtory/src/core/entities/graph/tgraph.rs @@ -12,8 +12,9 @@ use crate::{ }, storage::{ locked_view::LockedView, + raw_edges::EdgeWGuard, timeindex::{AsTime, TimeIndexEntry}, - Entry, EntryMut, + EntryMut, }, utils::errors::GraphError, Direction, Prop, @@ -52,8 +53,8 @@ impl InternalGraph { } pub(crate) fn lock(&self) -> LockedGraph { - let nodes = Arc::new(self.inner().storage.nodes.read_lock()); - let edges = Arc::new(self.inner().storage.edges.read_lock()); + let nodes = Arc::new(self.inner().storage.nodes_read_lock()); + let edges = Arc::new(self.inner().storage.edges_read_lock()); LockedGraph { nodes, edges } } @@ -105,8 +106,8 @@ impl std::fmt::Display for InternalGraph { write!( f, "Graph(num_nodes={}, num_edges={})", - self.inner().storage.nodes.len(), - self.inner().storage.edges.len() + self.inner().storage.nodes_len(), + self.inner().storage.edges_len() ) } } @@ -260,16 +261,6 @@ impl TemporalGraph { pub(crate) fn get_all_node_types(&self) -> Vec { self.node_meta.get_all_node_types() } - - #[inline] - pub(crate) fn node_entry(&self, v: VID) -> Entry<'_, NodeStore> { - self.storage.get_node(v) - } - - #[inline] - pub(crate) fn edge_entry(&self, e: EID) -> Entry<'_, EdgeStore> { - self.storage.get_edge(e) - } } impl TemporalGraph { @@ -417,7 +408,7 @@ impl TemporalGraph { Ok(()) } - fn link_nodes Result<(), GraphError>>( + fn link_nodes Result<(), GraphError>>( &self, src_id: VID, dst_id: VID, @@ -436,9 +427,10 @@ impl TemporalGraph { edge_id } None => { - let mut edge = EdgeStore::new(src_id, dst_id); + let mut edge = self.storage.push_edge(EdgeStore::new(src_id, dst_id)); + let eid = edge.edge_store().eid; edge_fn(&mut edge)?; - self.storage.push_edge(edge) + eid } }; @@ -461,7 +453,7 @@ impl TemporalGraph { // get the entries for the src and dst nodes self.link_nodes(src_id, dst_id, t, layer, move |edge| { edge.additions_mut(layer).insert(t); - let mut edge_layer = edge.layer_mut(layer); + let edge_layer = edge.layer_mut(layer); for (prop_id, prop_value) in props { edge_layer.add_prop(t, prop_id, prop_value)?; } diff --git a/raphtory/src/core/entities/graph/tgraph_storage.rs b/raphtory/src/core/entities/graph/tgraph_storage.rs index 58f17beee4..4b8024df72 100644 --- a/raphtory/src/core/entities/graph/tgraph_storage.rs +++ b/raphtory/src/core/entities/graph/tgraph_storage.rs @@ -1,6 +1,11 @@ +use super::edges::{EdgesStorage, LockedEdges}; use crate::core::{ entities::{edges::edge_store::EdgeStore, nodes::node_store::NodeStore, EID, VID}, - storage::{self, Entry, EntryMut, PairEntryMut}, + storage::{ + self, + raw_edges::{EdgeArcGuard, EdgeRGuard, EdgeWGuard}, + Entry, EntryMut, PairEntryMut, + }, }; use serde::{Deserialize, Serialize}; @@ -9,28 +14,42 @@ pub(crate) struct GraphStorage { // node storage with having (id, time_index, properties, adj list for each layer) pub(crate) nodes: storage::RawStorage, - // edge storage with having (src, dst, time_index, properties) for each layer - pub(crate) edges: storage::RawStorage, + edges: EdgesStorage, } impl GraphStorage { pub(crate) fn new(num_locks: usize) -> Self { Self { nodes: storage::RawStorage::new(num_locks), - edges: storage::RawStorage::new(num_locks), + edges: EdgesStorage::new(), } } + pub fn nodes_read_lock(&self) -> storage::ReadLockedStorage { + self.nodes.read_lock() + } + + pub fn edges_read_lock(&self) -> LockedEdges { + self.edges.read_lock() + } + + pub fn nodes_len(&self) -> usize { + self.nodes.len() + } + + pub fn edges_len(&self) -> usize { + self.edges.len() + } + + #[inline] pub(crate) fn push_node(&self, node: NodeStore) -> VID { self.nodes .push(node, |vid, node| node.vid = vid.into()) .into() } - - pub(crate) fn push_edge(&self, edge: EdgeStore) -> EID { - self.edges - .push(edge, |eid, edge| edge.eid = eid.into()) - .into() + #[inline] + pub(crate) fn push_edge(&self, edge: EdgeStore) -> EdgeWGuard { + self.edges.push(edge) } #[inline] @@ -39,7 +58,7 @@ impl GraphStorage { } #[inline] - pub(crate) fn get_edge_mut(&self, id: EID) -> EntryMut<'_, EdgeStore> { + pub(crate) fn get_edge_mut(&self, id: EID) -> EdgeWGuard { self.edges.entry_mut(id) } @@ -49,10 +68,16 @@ impl GraphStorage { } #[inline] - pub(crate) fn get_edge(&self, id: EID) -> Entry<'_, EdgeStore> { + pub(crate) fn edge_entry(&self, id: EID) -> EdgeRGuard { self.edges.entry(id) } + #[inline] + pub(crate) fn get_edge_arc(&self, id: EID) -> EdgeArcGuard { + self.edges.entry_arc(id) + } + + #[inline] pub(crate) fn pair_node_mut(&self, i: VID, j: VID) -> PairEntryMut<'_, NodeStore> { self.nodes.pair_entry_mut(i, j) } diff --git a/raphtory/src/core/entities/nodes/node_store.rs b/raphtory/src/core/entities/nodes/node_store.rs index 7b9ae2e969..38408b1041 100644 --- a/raphtory/src/core/entities/nodes/node_store.rs +++ b/raphtory/src/core/entities/nodes/node_store.rs @@ -10,11 +10,10 @@ use crate::core::{ timeindex::{AsTime, TimeIndex, TimeIndexEntry}, ArcEntry, Entry, }, - utils::errors::GraphError, + utils::{errors::GraphError, iter::GenLockedIter}, Direction, Prop, }; use itertools::Itertools; -use ouroboros::self_referencing; use serde::{Deserialize, Serialize}; use std::{ iter, @@ -350,20 +349,12 @@ impl NodeStore { } impl ArcEntry { - pub fn into_edges(self, layers: &LayerIds, dir: Direction) -> LockedAdjIter { - LockedAdjIterBuilder { - entry: self, - iter_builder: |node| node.edge_tuples(layers, dir), - } - .build() + pub fn into_edges(self, layers: &LayerIds, dir: Direction) -> impl Iterator { + GenLockedIter::from(self, |node| node.edge_tuples(layers, dir)) } - pub fn into_neighbours(self, layers: &LayerIds, dir: Direction) -> LockedNeighboursIter { - LockedNeighboursIterBuilder { - entry: self, - iter_builder: |node| node.neighbours(layers, dir), - } - .build() + pub fn into_neighbours(self, layers: &LayerIds, dir: Direction) -> impl Iterator { + GenLockedIter::from(self, |node| node.neighbours(layers, dir)) } pub fn into_layers(self) -> LockedLayers { @@ -384,85 +375,28 @@ impl ArcEntry { } impl<'a> Entry<'a, NodeStore> { - pub fn into_neighbours(self, layers: &LayerIds, dir: Direction) -> LockedRefNeighboursIter<'a> { - LockedRefNeighboursIterBuilder { - entry: self, - iter_builder: |node| node.neighbours(layers, dir), - } - .build() - } - - pub fn into_edges(self, layers: &LayerIds, dir: Direction) -> LockedRefEdgesIter<'a> { - LockedRefEdgesIterBuilder { - entry: self, - iter_builder: |node| node.edge_tuples(layers, dir), - } - .build() - } -} - -#[self_referencing] -pub struct LockedAdjIter { - entry: ArcEntry, - #[borrows(entry)] - #[covariant] - iter: Box + Send + 'this>, -} - -impl Iterator for LockedAdjIter { - type Item = EdgeRef; - - #[inline] - fn next(&mut self) -> Option { - self.with_iter_mut(|iter| iter.next()) - } -} - -#[self_referencing] -pub struct LockedNeighboursIter { - entry: ArcEntry, - #[borrows(entry)] - #[covariant] - iter: Box + Send + 'this>, -} - -impl Iterator for LockedNeighboursIter { - type Item = VID; - - fn next(&mut self) -> Option { - self.with_iter_mut(|iter| iter.next()) + pub fn into_neighbours( + self, + layers: &LayerIds, + dir: Direction, + ) -> impl Iterator + 'a { + GenLockedIter::from(self, |node| node.neighbours(layers, dir)) } -} -#[self_referencing] -pub struct LockedRefNeighboursIter<'a> { - entry: Entry<'a, NodeStore>, - #[borrows(entry)] - #[covariant] - iter: Box + Send + 'this>, -} - -impl<'a> Iterator for LockedRefNeighboursIter<'a> { - type Item = VID; - - fn next(&mut self) -> Option { - self.with_iter_mut(|iter| iter.next()) + pub fn into_edges( + self, + layers: &LayerIds, + dir: Direction, + ) -> impl Iterator + 'a { + GenLockedIter::from(self, |node| node.edge_tuples(layers, dir)) } -} -#[self_referencing] -pub struct LockedRefEdgesIter<'a> { - entry: Entry<'a, NodeStore>, - #[borrows(entry)] - #[covariant] - iter: Box + Send + 'this>, -} - -impl<'a> Iterator for LockedRefEdgesIter<'a> { - type Item = EdgeRef; - - fn next(&mut self) -> Option { - self.with_iter_mut(|iter| iter.next()) + pub fn into_edges_iter( + self, + layers: &'a LayerIds, + dir: Direction, + ) -> impl Iterator + 'a { + GenLockedIter::from(self, |node| Box::new(node.edge_tuples(layers, dir))) } } diff --git a/raphtory/src/core/entities/properties/props.rs b/raphtory/src/core/entities/properties/props.rs index 9da6ab3625..12009daef5 100644 --- a/raphtory/src/core/entities/properties/props.rs +++ b/raphtory/src/core/entities/properties/props.rs @@ -98,7 +98,7 @@ impl Props { self.constant_props.filled_ids() } - pub fn temporal_prop_ids(&self) -> impl Iterator + '_ { + pub fn temporal_prop_ids(&self) -> impl Iterator + Send + '_ { self.temporal_props.filled_ids() } } diff --git a/raphtory/src/core/storage/lazy_vec.rs b/raphtory/src/core/storage/lazy_vec.rs index f422c84642..15a3586586 100644 --- a/raphtory/src/core/storage/lazy_vec.rs +++ b/raphtory/src/core/storage/lazy_vec.rs @@ -31,13 +31,13 @@ pub(crate) enum LazyVec { impl LazyVec where - A: PartialEq + Default + Clone + Debug, + A: PartialEq + Default + Clone + Debug + Send + Sync, { pub(crate) fn from(id: usize, value: A) -> Self { LazyVec::LazyVec1(id, value) } - pub(crate) fn filled_ids(&self) -> Box + '_> { + pub(crate) fn filled_ids(&self) -> Box + Send + '_> { match self { LazyVec::Empty => Box::new(iter::empty()), LazyVec::LazyVec1(id, _) => Box::new(iter::once(*id)), @@ -127,7 +127,7 @@ where let mut value = A::default(); updater(&mut value)?; self.set(id, value) - .expect("Set failed over a non existing value") + .map_err(|e| GraphError::IllegalSet(e.to_string()))?; } }; Ok(()) diff --git a/raphtory/src/core/storage/mod.rs b/raphtory/src/core/storage/mod.rs index 218454320d..2c71edc6f8 100644 --- a/raphtory/src/core/storage/mod.rs +++ b/raphtory/src/core/storage/mod.rs @@ -3,6 +3,7 @@ pub(crate) mod iter; pub mod lazy_vec; pub mod locked_view; +pub mod raw_edges; pub mod sorted_vec_map; pub mod timeindex; @@ -79,7 +80,7 @@ impl PartialEq for RawStorage { } } -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct ReadLockedStorage { pub(crate) locks: Vec>>>, len: usize, @@ -108,6 +109,12 @@ where &bucket[offset] } + pub(crate) fn get_opt(&self, index: Index) -> Option<&T> { + let (bucket, offset) = self.resolve(index); + let bucket = self.locks.get(bucket)?; + bucket.get(offset) + } + pub(crate) fn arc_entry(&self, index: Index) -> ArcEntry { let (bucket, offset) = self.resolve(index); ArcEntry { @@ -218,6 +225,17 @@ where index } + pub fn insert(&self, index: Index, value: T) { + let index: usize = index.into(); + let (bucket, offset) = self.resolve(index); + let mut vec = self.data[bucket].data.write(); + if offset >= vec.len() { + vec.resize_with(offset + 1, || Default::default()); + } + vec[offset] = value; + self.len.fetch_max(index + 1, Ordering::Relaxed); + } + #[inline] pub fn entry(&self, index: Index) -> Entry<'_, T> { let index = index.into(); @@ -226,12 +244,24 @@ where Entry { offset, guard } } + pub fn entry_opt(&self, index: Index) -> Option> { + let index = index.into(); + let (bucket, offset) = self.resolve(index); + let bucket = self.data.get(bucket)?; + let guard = bucket.data.read_recursive(); + if guard.get(offset).is_some() { + Some(Entry { offset, guard }) + } else { + None + } + } + #[inline] - pub fn get(&self, index: Index) -> impl Deref + '_ { + pub fn has_entry(&self, index: Index) -> bool { let index = index.into(); let (bucket, offset) = self.resolve(index); let guard = self.data[bucket].data.read_recursive(); - RwLockReadGuard::map(guard, |guard| &guard[offset]) + guard.get(offset).is_some() } pub fn entry_arc(&self, index: Index) -> ArcEntry { @@ -249,7 +279,7 @@ where let index = index.into(); let (bucket, offset) = self.resolve(index); let guard = self.data[bucket].data.write(); - EntryMut { i: offset, guard } + EntryMut { i: offset, guard }.ensure_exists() } // This helps get the right locks when adding an edge @@ -344,11 +374,18 @@ impl<'a, T> Entry<'a, T> { &self.guard[self.offset] } - pub fn map &U>(self, f: F) -> LockedView<'a, U> { - let mapped_guard = RwLockReadGuard::map(self.guard, |guard| { - let what = &guard[self.offset]; + pub fn map_guard &U>( + entry: Self, + f: F, + ) -> lock_api::MappedRwLockReadGuard<'a, parking_lot::RawRwLock, U> { + RwLockReadGuard::map(entry.guard, |guard| { + let what = &guard[entry.offset]; f(what) - }); + }) + } + + pub fn map &U>(entry: Self, f: F) -> LockedView<'a, U> { + let mapped_guard = Self::map_guard(entry, f); LockedView::LockMapped(mapped_guard) } } @@ -396,6 +433,15 @@ pub struct EntryMut<'a, T: 'static> { guard: parking_lot::RwLockWriteGuard<'a, Vec>, } +impl<'a, T: Default + 'static> EntryMut<'a, T> { + pub fn ensure_exists(mut self) -> Self { + if self.guard.len() <= self.i { + self.guard.resize_with(self.i + 1, Default::default); + } + self + } +} + impl<'a, T> Deref for EntryMut<'a, T> { type Target = T; diff --git a/raphtory/src/core/storage/raw_edges.rs b/raphtory/src/core/storage/raw_edges.rs new file mode 100644 index 0000000000..1e4878ad78 --- /dev/null +++ b/raphtory/src/core/storage/raw_edges.rs @@ -0,0 +1,305 @@ +use std::{ + ops::Deref, + sync::{ + atomic::{self, AtomicUsize}, + Arc, + }, +}; + +use lock_api::ArcRwLockReadGuard; +use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard}; +use rayon::prelude::*; +use serde::{Deserialize, Serialize}; + +use raphtory_api::core::{entities::EID, storage::timeindex::TimeIndexEntry}; + +use crate::{ + core::entities::{ + edges::edge_store::{EdgeDataLike, EdgeLayer, EdgeStore}, + LayerIds, + }, + db::api::storage::edges::edge_storage_ops::{EdgeStorageOps, MemEdge}, +}; + +use super::{resolve, timeindex::TimeIndex}; + +#[derive(Debug, Serialize, Deserialize, PartialEq)] +pub struct EdgeShard { + edge_ids: Vec, + props: Vec>, + additions: Vec>>, + deletions: Vec>>, +} + +impl EdgeShard { + pub fn insert(&mut self, index: usize, value: EdgeStore) { + if index >= self.edge_ids.len() { + self.edge_ids.resize_with(index + 1, Default::default); + } + self.edge_ids[index] = value; + } + + pub fn edge_store(&self, index: usize) -> &EdgeStore { + &self.edge_ids[index] + } + + pub fn internal_num_layers(&self) -> usize { + self.additions.len().max(self.deletions.len()) + } + + pub fn additions(&self, index: usize, layer_id: usize) -> Option<&TimeIndex> { + self.additions.get(layer_id).and_then(|add| add.get(index)) + } + + pub fn deletions(&self, index: usize, layer_id: usize) -> Option<&TimeIndex> { + self.deletions.get(layer_id).and_then(|del| del.get(index)) + } + + pub fn props(&self, index: usize, layer_id: usize) -> Option<&EdgeLayer> { + self.props[layer_id].get(index) + } + + pub fn props_iter(&self, index: usize) -> impl Iterator { + self.props + .iter() + .enumerate() + .filter_map(move |(id, layer)| layer.get(index).map(|l| (id, l))) + } +} + +pub const SHARD_SIZE: usize = 64; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct EdgesShard { + shards: Arc<[Arc>]>, + len: Arc, +} + +impl PartialEq for EdgesShard { + fn eq(&self, other: &Self) -> bool { + self.shards.len() == other.shards.len() + && self + .shards + .iter() + .zip(other.shards.iter()) + .all(|(a, b)| a.read().eq(&b.read())) + } +} + +impl EdgesShard { + pub fn new() -> Self { + let mut shards = (0..SHARD_SIZE).into_iter().map(|_| { + Arc::new(RwLock::new(EdgeShard { + edge_ids: vec![], + props: Vec::with_capacity(1), + additions: Vec::with_capacity(1), + deletions: Vec::with_capacity(0), + })) + }); + EdgesShard { + shards: shards.collect(), + len: Arc::new(AtomicUsize::new(0)), + } + } + + pub fn len(&self) -> usize { + self.len.load(atomic::Ordering::SeqCst) + } + + pub fn lock(&self) -> LockedEdgesShard { + LockedEdgesShard { + shards: self.shards.iter().map(|shard| shard.read_arc()).collect(), + len: self.len(), + } + } + + #[inline] + fn resolve(&self, index: usize) -> (usize, usize) { + resolve(index, self.shards.len()) + } + + pub fn push(&self, mut value: EdgeStore) -> (EID, EdgeWGuard) { + let index = self.len.fetch_add(1, atomic::Ordering::SeqCst); + let (bucket, offset) = self.resolve(index); + let mut shard = self.shards[bucket].write(); + shard.insert(offset, value); + let guard = EdgeWGuard { + guard: shard, + i: offset, + }; + (index.into(), guard) + } + + pub fn get_edge_mut(&self, eid: EID) -> EdgeWGuard { + let (bucket, offset) = self.resolve(eid.into()); + EdgeWGuard { + guard: self.shards[bucket].write(), + i: offset, + } + } + + pub fn get_edge(&self, eid: EID) -> EdgeRGuard { + let (bucket, offset) = self.resolve(eid.into()); + EdgeRGuard { + guard: self.shards[bucket].read(), + offset, + } + } + + pub fn get_edge_arc(&self, eid: EID) -> EdgeArcGuard { + let (bucket, offset) = self.resolve(eid.into()); + let guard = Arc::new(self.shards[bucket].read_arc()); + EdgeArcGuard { guard, offset } + } +} + +#[derive(Debug, Clone)] +pub struct EdgeArcGuard { + guard: Arc>, + offset: usize, +} + +impl EdgeArcGuard { + pub fn as_mem_edge(&self) -> MemEdge { + MemEdge::new(&self.guard, self.offset) + } +} + +pub struct EdgeWGuard<'a> { + guard: RwLockWriteGuard<'a, EdgeShard>, + i: usize, +} + +impl<'a> EdgeWGuard<'a> { + pub fn edge_store(&self) -> &EdgeStore { + &self.guard.edge_ids[self.i] + } + + pub fn edge_store_mut(&mut self) -> &mut EdgeStore { + &mut self.guard.edge_ids[self.i] + } + + pub fn deletions_mut(&mut self, layer_id: usize) -> &mut TimeIndex { + if layer_id >= self.guard.deletions.len() { + self.guard + .deletions + .resize_with(layer_id + 1, Default::default); + } + if self.i >= self.guard.deletions[layer_id].len() { + self.guard.deletions[layer_id].resize_with(self.i + 1, Default::default); + } + &mut self.guard.deletions[layer_id][self.i] + } + + pub fn additions_mut(&mut self, layer_id: usize) -> &mut TimeIndex { + if layer_id >= self.guard.additions.len() { + self.guard + .additions + .resize_with(layer_id + 1, Default::default); + } + if self.i >= self.guard.additions[layer_id].len() { + self.guard.additions[layer_id].resize_with(self.i + 1, Default::default); + } + &mut self.guard.additions[layer_id][self.i] + } + + pub fn layer_mut(&mut self, layer_id: usize) -> &mut EdgeLayer { + if layer_id >= self.guard.props.len() { + self.guard.props.resize_with(layer_id + 1, Default::default); + } + if self.i >= self.guard.props[layer_id].len() { + self.guard.props[layer_id].resize_with(self.i + 1, Default::default); + } + + &mut self.guard.props[layer_id][self.i] + } +} + +#[derive(Debug)] +pub struct EdgeRGuard<'a> { + guard: RwLockReadGuard<'a, EdgeShard>, + offset: usize, +} + +impl<'a> EdgeRGuard<'a> { + pub fn as_mem_edge(&self) -> MemEdge { + MemEdge::new(&self.guard, self.offset) + } + + pub fn has_layer(&self, layers: &LayerIds) -> bool { + self.as_mem_edge().has_layer(layers) + } + + pub fn layer_iter( + &self, + ) -> impl Iterator + '_)> + '_ { + self.guard.props_iter(self.offset) + } + + pub(crate) fn temp_prop_ids( + &self, + layer_id: Option, + ) -> Box + '_> { + if let Some(layer_id) = layer_id { + Box::new( + self.guard + .props(self.offset, layer_id) + .into_iter() + .flat_map(|layer| layer.temporal_prop_ids()), + ) + } else { + Box::new( + self.guard + .props_iter(self.offset) + .flat_map(|(_, layer)| layer.temporal_prop_ids()), + ) + } + } + + pub(crate) fn layer(&self, layer_id: usize) -> Option + '_> { + self.guard.props(self.offset, layer_id) + } +} + +#[derive(Debug)] +pub struct LockedEdgesShard { + shards: Arc<[ArcRwLockReadGuard]>, + len: usize, +} + +impl LockedEdgesShard { + pub fn get(&self, eid: EID) -> &EdgeShard { + let (bucket, offset) = resolve(eid.into(), self.shards.len()); + let shard = &self.shards[bucket]; + &shard + } + + pub fn get_mem(&self, eid: EID) -> MemEdge { + let (bucket, offset) = resolve(eid.into(), self.shards.len()); + MemEdge::new(&self.shards[bucket], offset) + } + + pub fn len(&self) -> usize { + self.len + } + + pub fn iter(&self) -> impl Iterator + '_ { + self.shards.iter().flat_map(|shard| { + shard + .edge_ids + .iter() + .enumerate() + .map(move |(offset, _)| MemEdge::new(&shard, offset)) + }) + } + + pub fn par_iter(&self) -> impl ParallelIterator + '_ { + self.shards.par_iter().flat_map(|shard| { + shard + .edge_ids + .par_iter() + .enumerate() + .map(move |(offset, _)| MemEdge::new(&shard, offset)) + }) + } +} diff --git a/raphtory/src/core/storage/timeindex.rs b/raphtory/src/core/storage/timeindex.rs index f673eb2611..05c574ef8f 100644 --- a/raphtory/src/core/storage/timeindex.rs +++ b/raphtory/src/core/storage/timeindex.rs @@ -18,6 +18,7 @@ use std::{ ops::{Deref, Range}, }; +use crate::core::utils::iter::GenLockedIter; pub use raphtory_api::core::storage::timeindex::*; #[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq)] @@ -541,3 +542,85 @@ impl<'a, Ops: TimeIndexOps + 'a> TimeIndexOps for LayeredTimeIndexWindow<'a, Ops self.timeindex.par_iter().map(|ts| ts.len()).sum() } } + +impl<'a, T: AsTime> TimeIndexIntoOps for LockedView<'a, TimeIndex> { + type IndexType = T; + + type RangeType = LockedTimeIndexWindow<'a, T>; + + fn into_range(self, w: Range) -> Self::RangeType { + LockedTimeIndexWindow { + timeindex: self, + range: w, + } + } + + fn into_iter(self) -> impl Iterator + Send { + GenLockedIter::from(self, |t| t.iter()) + } +} + +#[derive(Debug)] +pub struct LockedTimeIndexWindow<'a, T: AsTime> { + timeindex: LockedView<'a, TimeIndex>, + range: Range, +} + +impl<'a, T: AsTime> TimeIndexIntoOps for LockedTimeIndexWindow<'a, T> { + type IndexType = T; + type RangeType = Self; + + fn into_range(self, w: Range) -> Self::RangeType { + let start = w.start.max(self.range.start); + let end = w.end.min(self.range.end); + LockedTimeIndexWindow { + timeindex: self.timeindex, + range: start..end, + } + } + + fn into_iter(self) -> impl Iterator + Send { + GenLockedIter::from(self.timeindex, |t| t.range_iter_forward(self.range.clone())) + } +} + +impl<'a, T: AsTime> TimeIndexOps for LockedTimeIndexWindow<'a, T> { + type IndexType = T; + type RangeType<'b> = TimeIndexWindow<'b, T> where Self: 'b; + + fn active(&self, w: Range) -> bool { + let Self { timeindex, range } = self; + w.start < range.end + && w.end > range.start + && (timeindex.active(max(w.start, range.start)..min(w.end, range.end))) + } + + fn range(&self, w: Range) -> Self::RangeType<'_> { + let Self { timeindex, range } = self; + let start = max(range.start, w.start); + let end = min(range.start, w.start); + TimeIndexWindow::TimeIndexRange { + timeindex: timeindex.deref(), + range: start..end, + } + } + + fn first(&self) -> Option { + let Self { timeindex, range } = self; + timeindex.range_iter(range.clone()).next() + } + + fn last(&self) -> Option { + let Self { timeindex, range } = self; + timeindex.range_iter(range.clone()).next_back() + } + + fn iter(&self) -> Box + Send + '_> { + todo!() + } + + fn len(&self) -> usize { + let Self { timeindex, range } = self; + timeindex.range_iter(range.clone()).count() + } +} diff --git a/raphtory/src/core/utils/errors.rs b/raphtory/src/core/utils/errors.rs index d2a6a2dd54..ac789824aa 100644 --- a/raphtory/src/core/utils/errors.rs +++ b/raphtory/src/core/utils/errors.rs @@ -129,6 +129,9 @@ pub enum GraphError { #[error("The time function is only available once an edge has been exploded via .explode(). You may want to retrieve the history for this edge via .history(), or the earliest/latest time via earliest_time or latest_time")] TimeAPIError, + + #[error("Illegal set error {0}")] + IllegalSet(String), } impl GraphError { diff --git a/raphtory/src/core/utils/iter.rs b/raphtory/src/core/utils/iter.rs new file mode 100644 index 0000000000..c941e8d335 --- /dev/null +++ b/raphtory/src/core/utils/iter.rs @@ -0,0 +1,32 @@ +use ouroboros::self_referencing; + +#[self_referencing] +pub struct GenLockedIter<'a, O, OUT> { + owner: O, + #[borrows(owner)] + #[covariant] + iter: Box + Send + 'this>, + mark: std::marker::PhantomData<&'a O>, +} + +impl<'a, O, OUT> Iterator for GenLockedIter<'a, O, OUT> { + type Item = OUT; + + fn next(&mut self) -> Option { + self.with_iter_mut(|iter| iter.next()) + } +} + +impl<'a, O, OUT> GenLockedIter<'a, O, OUT> { + pub fn from<'b>( + owner: O, + iter_fn: impl FnOnce(&O) -> Box + Send + '_>, + ) -> Self { + GenLockedIterBuilder { + owner, + iter_builder: |owner| iter_fn(owner), + mark: std::marker::PhantomData, + } + .build() + } +} diff --git a/raphtory/src/core/utils/mod.rs b/raphtory/src/core/utils/mod.rs index f9a5a35317..03d3668889 100644 --- a/raphtory/src/core/utils/mod.rs +++ b/raphtory/src/core/utils/mod.rs @@ -1,2 +1,4 @@ pub mod errors; pub mod time; + +pub mod iter; diff --git a/raphtory/src/db/api/storage/edges/edge_entry.rs b/raphtory/src/db/api/storage/edges/edge_entry.rs index b32be169a4..533e27bbe3 100644 --- a/raphtory/src/db/api/storage/edges/edge_entry.rs +++ b/raphtory/src/db/api/storage/edges/edge_entry.rs @@ -1,28 +1,29 @@ +use std::ops::Range; + +use rayon::prelude::*; + +#[cfg(feature = "storage")] +use crate::disk_graph::storage_interface::edge::DiskEdge; use crate::{ core::{ - entities::{ - edges::{edge_ref::EdgeRef, edge_store::EdgeStore}, - LayerIds, VID, - }, - storage::Entry, + entities::{edges::edge_ref::EdgeRef, LayerIds, VID}, + storage::raw_edges::EdgeRGuard, }, - db::api::storage::edges::{ - edge_ref::EdgeStorageRef, - edge_storage_ops::{EdgeStorageOps, TimeIndexRef}, + db::api::storage::{ + edges::{ + edge_ref::EdgeStorageRef, + edge_storage_ops::{EdgeStorageOps, TimeIndexRef}, + }, + tprop_storage_ops::TPropOps, }, }; -#[cfg(feature = "storage")] -use crate::disk_graph::storage_interface::edge::DiskEdge; - -use crate::db::api::storage::tprop_storage_ops::TPropOps; -use rayon::prelude::*; -use std::ops::Range; +use super::edge_storage_ops::MemEdge; #[derive(Debug)] pub enum EdgeStorageEntry<'a> { - Mem(&'a EdgeStore), - Unlocked(Entry<'a, EdgeStore>), + Mem(MemEdge<'a>), + Unlocked(EdgeRGuard<'a>), #[cfg(feature = "storage")] Disk(DiskEdge<'a>), } @@ -31,8 +32,8 @@ impl<'a> EdgeStorageEntry<'a> { #[inline] pub fn as_ref(&self) -> EdgeStorageRef { match self { - EdgeStorageEntry::Mem(edge) => EdgeStorageRef::Mem(edge), - EdgeStorageEntry::Unlocked(edge) => EdgeStorageRef::Mem(edge), + EdgeStorageEntry::Mem(edge) => EdgeStorageRef::Mem(*edge), + EdgeStorageEntry::Unlocked(edge) => EdgeStorageRef::Mem(edge.as_mem_edge()), #[cfg(feature = "storage")] EdgeStorageEntry::Disk(edge) => EdgeStorageRef::Disk(*edge), } diff --git a/raphtory/src/db/api/storage/edges/edge_owned_entry.rs b/raphtory/src/db/api/storage/edges/edge_owned_entry.rs index 3b4536da19..750ec8bca5 100644 --- a/raphtory/src/db/api/storage/edges/edge_owned_entry.rs +++ b/raphtory/src/db/api/storage/edges/edge_owned_entry.rs @@ -1,14 +1,17 @@ +use std::ops::Range; + +use rayon::iter::ParallelIterator; + +use raphtory_api::core::storage::timeindex::TimeIndexEntry; + #[cfg(feature = "storage")] use crate::db::api::storage::variants::storage_variants::StorageVariants; #[cfg(feature = "storage")] use crate::disk_graph::storage_interface::edge::DiskOwnedEdge; use crate::{ core::{ - entities::{ - edges::{edge_ref::EdgeRef, edge_store::EdgeStore}, - LayerIds, VID, - }, - storage::ArcEntry, + entities::{edges::edge_ref::EdgeRef, LayerIds, VID}, + storage::raw_edges::EdgeArcGuard, }, db::api::storage::{ edges::{ @@ -18,13 +21,10 @@ use crate::{ tprop_storage_ops::TPropOps, }, }; -use raphtory_api::core::storage::timeindex::TimeIndexEntry; -use rayon::iter::ParallelIterator; -use std::ops::Range; #[derive(Debug, Clone)] pub enum EdgeOwnedEntry { - Mem(ArcEntry), + Mem(EdgeArcGuard), #[cfg(feature = "storage")] Disk(DiskOwnedEdge), } @@ -51,7 +51,7 @@ macro_rules! for_all_variants { impl EdgeOwnedEntry { pub fn as_ref(&self) -> EdgeStorageRef { match self { - EdgeOwnedEntry::Mem(entry) => EdgeStorageRef::Mem(entry), + EdgeOwnedEntry::Mem(entry) => EdgeStorageRef::Mem(entry.as_mem_edge()), #[cfg(feature = "storage")] EdgeOwnedEntry::Disk(entry) => EdgeStorageRef::Disk(entry.as_ref()), } diff --git a/raphtory/src/db/api/storage/edges/edge_ref.rs b/raphtory/src/db/api/storage/edges/edge_ref.rs index cf2fa4db53..1f2be867c8 100644 --- a/raphtory/src/db/api/storage/edges/edge_ref.rs +++ b/raphtory/src/db/api/storage/edges/edge_ref.rs @@ -1,19 +1,20 @@ +use std::ops::Range; + +use rayon::prelude::*; + #[cfg(feature = "storage")] use crate::db::api::storage::variants::storage_variants::StorageVariants; #[cfg(feature = "storage")] use crate::disk_graph::storage_interface::edge::DiskEdge; use crate::{ - core::entities::{ - edges::{edge_ref::EdgeRef, edge_store::EdgeStore}, - LayerIds, EID, VID, - }, + core::entities::{edges::edge_ref::EdgeRef, LayerIds, EID, VID}, db::api::storage::{ edges::edge_storage_ops::{EdgeStorageOps, TimeIndexRef}, tprop_storage_ops::TPropOps, }, }; -use rayon::prelude::*; -use std::ops::Range; + +use super::edge_storage_ops::MemEdge; macro_rules! for_all { ($value:expr, $pattern:pat => $result:expr) => { @@ -46,7 +47,7 @@ macro_rules! for_all_iter { #[derive(Copy, Clone, Debug)] pub enum EdgeStorageRef<'a> { - Mem(&'a EdgeStore), + Mem(MemEdge<'a>), #[cfg(feature = "storage")] Disk(DiskEdge<'a>), } @@ -55,7 +56,7 @@ impl<'a> EdgeStorageRef<'a> { #[inline] pub fn eid(&self) -> EID { match self { - EdgeStorageRef::Mem(e) => e.eid, + EdgeStorageRef::Mem(e) => e.eid(), #[cfg(feature = "storage")] EdgeStorageRef::Disk(e) => e.eid(), } diff --git a/raphtory/src/db/api/storage/edges/edge_storage_ops.rs b/raphtory/src/db/api/storage/edges/edge_storage_ops.rs index 40e96574f6..a2017e1f7d 100644 --- a/raphtory/src/db/api/storage/edges/edge_storage_ops.rs +++ b/raphtory/src/db/api/storage/edges/edge_storage_ops.rs @@ -4,7 +4,12 @@ use crate::{ edges::{edge_ref::EdgeRef, edge_store::EdgeStore}, LayerIds, VID, }, - storage::timeindex::{TimeIndex, TimeIndexIntoOps, TimeIndexOps, TimeIndexWindow}, + storage::{ + locked_view::LockedView, + timeindex::{ + LockedTimeIndexWindow, TimeIndex, TimeIndexIntoOps, TimeIndexOps, TimeIndexWindow, + }, + }, }, db::api::view::IntoDynBoxed, }; @@ -13,15 +18,21 @@ use crate::{ use pometry_storage::timestamps::TimeStamps; use crate::{ - core::entities::properties::tprop::TProp, + core::{ + entities::properties::tprop::TProp, storage::raw_edges::EdgeShard, + utils::iter::GenLockedIter, + }, db::api::storage::{tprop_storage_ops::TPropOps, variants::layer_variants::LayerVariants}, }; -use raphtory_api::core::storage::timeindex::TimeIndexEntry; +use raphtory_api::core::{entities::EID, storage::timeindex::TimeIndexEntry}; use rayon::prelude::*; use std::ops::Range; +#[derive(Debug)] pub enum TimeIndexRef<'a> { Ref(&'a TimeIndex), + Locked(LockedView<'a, TimeIndex>), + LockedRange(LockedTimeIndexWindow<'a, TimeIndexEntry>), Range(TimeIndexWindow<'a, TimeIndexEntry>), #[cfg(feature = "storage")] External(TimeStamps<'a, TimeIndexEntry>), @@ -31,6 +42,8 @@ impl<'a> TimeIndexRef<'a> { pub fn len(&self) -> usize { match self { TimeIndexRef::Ref(ts) => ts.len(), + TimeIndexRef::Locked(ts) => ts.len(), + TimeIndexRef::LockedRange(ts) => ts.len(), TimeIndexRef::Range(ts) => ts.len(), #[cfg(feature = "storage")] TimeIndexRef::External(ts) => ts.len(), @@ -46,6 +59,8 @@ impl<'a> TimeIndexOps for TimeIndexRef<'a> { fn active(&self, w: Range) -> bool { match self { TimeIndexRef::Ref(t) => t.active(w), + TimeIndexRef::Locked(t) => t.active(w), + TimeIndexRef::LockedRange(t) => t.active(w), TimeIndexRef::Range(ref t) => t.active(w), #[cfg(feature = "storage")] TimeIndexRef::External(ref t) => t.active(w), @@ -55,6 +70,8 @@ impl<'a> TimeIndexOps for TimeIndexRef<'a> { fn range(&self, w: Range) -> Self::RangeType<'_> { match self { TimeIndexRef::Ref(t) => TimeIndexRef::Range(t.range(w)), + TimeIndexRef::Locked(t) => TimeIndexRef::Range(t.range(w)), + TimeIndexRef::LockedRange(t) => TimeIndexRef::Range(t.range(w)), TimeIndexRef::Range(ref t) => TimeIndexRef::Range(t.range(w)), #[cfg(feature = "storage")] TimeIndexRef::External(ref t) => TimeIndexRef::External(t.range(w)), @@ -64,6 +81,8 @@ impl<'a> TimeIndexOps for TimeIndexRef<'a> { fn first(&self) -> Option { match self { TimeIndexRef::Ref(t) => t.first(), + TimeIndexRef::Locked(t) => t.first(), + TimeIndexRef::LockedRange(t) => t.first(), TimeIndexRef::Range(ref t) => t.first(), #[cfg(feature = "storage")] TimeIndexRef::External(ref t) => t.first(), @@ -73,6 +92,8 @@ impl<'a> TimeIndexOps for TimeIndexRef<'a> { fn last(&self) -> Option { match self { TimeIndexRef::Ref(t) => t.last(), + TimeIndexRef::Locked(t) => t.last(), + TimeIndexRef::LockedRange(t) => t.last(), TimeIndexRef::Range(ref t) => t.last(), #[cfg(feature = "storage")] TimeIndexRef::External(ref t) => t.last(), @@ -82,6 +103,8 @@ impl<'a> TimeIndexOps for TimeIndexRef<'a> { fn iter(&self) -> Box + Send + '_> { match self { TimeIndexRef::Ref(t) => t.iter(), + TimeIndexRef::Locked(t) => t.iter(), + TimeIndexRef::LockedRange(t) => t.iter(), TimeIndexRef::Range(t) => t.iter(), #[cfg(feature = "storage")] TimeIndexRef::External(ref t) => t.iter(), @@ -91,6 +114,8 @@ impl<'a> TimeIndexOps for TimeIndexRef<'a> { fn len(&self) -> usize { match self { TimeIndexRef::Ref(ts) => ts.len(), + TimeIndexRef::Locked(ts) => ts.len(), + TimeIndexRef::LockedRange(ts) => ts.len(), TimeIndexRef::Range(ts) => ts.len(), #[cfg(feature = "storage")] TimeIndexRef::External(ref t) => t.len(), @@ -106,6 +131,8 @@ impl<'a> TimeIndexIntoOps for TimeIndexRef<'a> { fn into_range(self, w: Range) -> TimeIndexRef<'a> { match self { TimeIndexRef::Ref(t) => TimeIndexRef::Range(t.range_inner(w)), + TimeIndexRef::Locked(t) => TimeIndexRef::LockedRange(t.into_range(w)), + TimeIndexRef::LockedRange(t) => TimeIndexRef::LockedRange(t.into_range(w)), TimeIndexRef::Range(t) => TimeIndexRef::Range(t.into_range(w)), #[cfg(feature = "storage")] TimeIndexRef::External(t) => TimeIndexRef::External(t.into_range(w)), @@ -114,6 +141,8 @@ impl<'a> TimeIndexIntoOps for TimeIndexRef<'a> { fn into_iter(self) -> impl Iterator + Send { match self { TimeIndexRef::Ref(t) => t.iter(), + TimeIndexRef::Locked(t) => GenLockedIter::from(t, |t| t.iter()).into_dyn_boxed(), + TimeIndexRef::LockedRange(t) => t.into_iter().into_dyn_boxed(), TimeIndexRef::Range(t) => t.into_iter().into_dyn_boxed(), #[cfg(feature = "storage")] TimeIndexRef::External(t) => t.into_iter().into_dyn_boxed(), @@ -234,17 +263,80 @@ pub trait EdgeStorageOps<'a>: Copy + Sized + Send + Sync + 'a { } } -impl<'a> EdgeStorageOps<'a> for &'a EdgeStore { +#[derive(Clone, Copy, Debug)] +pub struct MemEdge<'a> { + edges: &'a EdgeShard, + offset: usize, +} + +impl<'a> MemEdge<'a> { + pub fn new(edges: &'a EdgeShard, offset: usize) -> Self { + MemEdge { edges, offset } + } + + pub fn edge_store(&self) -> &EdgeStore { + self.edges.edge_store(self.offset) + } + + pub fn eid(self) -> EID { + self.edge_store().eid + } + + pub fn as_edge_ref(&self) -> EdgeRef { + EdgeRef::new_outgoing(self.eid(), self.src(), self.dst()) + } + + pub fn internal_num_layers(self) -> usize { + self.edges.internal_num_layers() + } + + fn get_additions(self, layer_id: usize) -> Option<&'a TimeIndex> { + self.edges.additions(self.offset, layer_id) + } + + fn get_deletions(self, layer_id: usize) -> Option<&'a TimeIndex> { + self.edges.deletions(self.offset, layer_id) + } + + pub fn has_layer_inner(self, layer_id: usize) -> bool { + self.get_additions(layer_id) + .filter(|t_index| !t_index.is_empty()) + .is_some() + || self + .get_deletions(layer_id) + .filter(|t_index| !t_index.is_empty()) + .is_some() + } + + pub fn temporal_prop_layer_inner(self, layer_id: usize, prop_id: usize) -> Option<&'a TProp> { + let layer = self.edges.props(self.offset, layer_id)?; + layer.temporal_property(prop_id) + } +} + +impl<'a> EdgeStorageOps<'a> for MemEdge<'a> { fn in_ref(self) -> EdgeRef { - EdgeRef::new_incoming(self.eid, self.src, self.dst) + EdgeRef::new_incoming(self.eid(), self.src(), self.dst()) } fn out_ref(self) -> EdgeRef { - EdgeRef::new_outgoing(self.eid, self.src, self.dst) + EdgeRef::new_outgoing(self.eid(), self.src(), self.dst()) } fn active(self, layer_ids: &LayerIds, w: Range) -> bool { - self.active(layer_ids, w) + match layer_ids { + LayerIds::None => false, + LayerIds::All => self + .additions_iter(layer_ids) + .any(|(_, t_index)| t_index.active_t(w.clone())), + LayerIds::One(l_id) => self + .get_additions(*l_id) + .filter(|a| a.active_t(w)) + .is_some(), + LayerIds::Multiple(layers) => layers + .iter() + .any(|l_id| self.active(&LayerIds::One(*l_id), w.clone())), + } } fn has_layer(self, layer_ids: &LayerIds) -> bool { @@ -257,25 +349,27 @@ impl<'a> EdgeStorageOps<'a> for &'a EdgeStore { } fn src(self) -> VID { - self.src + self.edge_store().src } fn dst(self) -> VID { - self.dst + self.edge_store().dst } fn layer_ids_iter(self, layer_ids: &'a LayerIds) -> impl Iterator + 'a { match layer_ids { LayerIds::None => LayerVariants::None(std::iter::empty()), LayerIds::All => LayerVariants::All( - (0..self.internal_num_layers()).filter(|&l| self.has_layer_inner(l)), + (0..self.internal_num_layers()).filter(move |&l| self.has_layer_inner(l)), ), LayerIds::One(id) => { LayerVariants::One(self.has_layer_inner(*id).then_some(*id).into_iter()) } - LayerIds::Multiple(ids) => { - LayerVariants::Multiple(ids.iter().copied().filter(|&id| self.has_layer_inner(id))) - } + LayerIds::Multiple(ids) => LayerVariants::Multiple( + ids.iter() + .copied() + .filter(move |&id| self.has_layer_inner(id)), + ), } } @@ -288,7 +382,7 @@ impl<'a> EdgeStorageOps<'a> for &'a EdgeStore { LayerIds::All => LayerVariants::All( (0..self.internal_num_layers()) .into_par_iter() - .filter(|&l| self.has_layer_inner(l)), + .filter(move |&l| self.has_layer_inner(l)), ), LayerIds::One(id) => { LayerVariants::One(self.has_layer_inner(*id).then_some(*id).into_par_iter()) @@ -296,7 +390,7 @@ impl<'a> EdgeStorageOps<'a> for &'a EdgeStore { LayerIds::Multiple(ids) => LayerVariants::Multiple( ids.par_iter() .copied() - .filter(|&id| self.has_layer_inner(id)), + .filter(move |&id| self.has_layer_inner(id)), ), } } @@ -309,6 +403,7 @@ impl<'a> EdgeStorageOps<'a> for &'a EdgeStore { TimeIndexRef::Ref(self.get_deletions(layer_id).unwrap_or(&TimeIndex::Empty)) } + #[inline(always)] fn temporal_prop_layer(self, layer_id: usize, prop_id: usize) -> impl TPropOps<'a> + 'a { self.temporal_prop_layer_inner(layer_id, prop_id) .unwrap_or(&TProp::Empty) diff --git a/raphtory/src/db/api/storage/edges/edges.rs b/raphtory/src/db/api/storage/edges/edges.rs index 7ded8af78c..45ed2a88c7 100644 --- a/raphtory/src/db/api/storage/edges/edges.rs +++ b/raphtory/src/db/api/storage/edges/edges.rs @@ -1,22 +1,25 @@ -use super::edge_entry::EdgeStorageEntry; -use crate::{ - core::{ - entities::{edges::edge_store::EdgeStore, LayerIds, EID}, - storage::ReadLockedStorage, - }, - db::api::storage::{ - edges::edge_storage_ops::EdgeStorageOps, nodes::unlocked::UnlockedEdges, - variants::storage_variants3::StorageVariants, - }, -}; -use rayon::iter::ParallelIterator; use std::sync::Arc; +use rayon::iter::ParallelIterator; + +#[cfg(not(feature = "storage"))] +use either::Either; + +#[cfg(feature = "storage")] +use crate::db::api::storage::variants::storage_variants3::StorageVariants; #[cfg(feature = "storage")] -use crate::disk_graph::storage_interface::{edges::DiskEdges, edges_ref::DiskEdgesRef}; +use crate::disk_graph::storage_interface::edges::DiskEdges; +#[cfg(feature = "storage")] +use crate::disk_graph::storage_interface::edges_ref::DiskEdgesRef; +use crate::{ + core::entities::{graph::edges::LockedEdges, LayerIds}, + db::api::storage::{edges::edge_storage_ops::EdgeStorageOps, nodes::unlocked::UnlockedEdges}, +}; + +use super::edge_entry::EdgeStorageEntry; pub enum EdgesStorage { - Mem(Arc>), + Mem(Arc), #[cfg(feature = "storage")] Disk(DiskEdges), } @@ -34,7 +37,7 @@ impl EdgesStorage { #[derive(Debug)] pub enum EdgesStorageRef<'a> { - Mem(&'a ReadLockedStorage), + Mem(&'a LockedEdges), Unlocked(UnlockedEdges<'a>), #[cfg(feature = "storage")] Disk(DiskEdgesRef<'a>), @@ -65,16 +68,16 @@ impl<'a> EdgesStorageRef<'a> { #[cfg(not(feature = "storage"))] pub fn iter(self, layers: LayerIds) -> impl Iterator> { match self { - EdgesStorageRef::Mem(storage) => StorageVariants::Mem( + EdgesStorageRef::Mem(storage) => Either::Left( storage .iter() .filter(move |e| e.has_layer(&layers)) .map(EdgeStorageEntry::Mem), ), - EdgesStorageRef::Unlocked(edges) => StorageVariants::Unlocked( + EdgesStorageRef::Unlocked(edges) => Either::Right( edges .iter() - .filter(move |e| e.has_layer(&layers)) + .filter(move |e| e.as_mem_edge().has_layer(&layers)) .map(EdgeStorageEntry::Unlocked), ), } @@ -104,16 +107,16 @@ impl<'a> EdgesStorageRef<'a> { #[cfg(not(feature = "storage"))] pub fn par_iter(self, layers: LayerIds) -> impl ParallelIterator> { match self { - EdgesStorageRef::Mem(storage) => StorageVariants::Mem( + EdgesStorageRef::Mem(storage) => Either::Left( storage .par_iter() .filter(move |e| e.has_layer(&layers)) .map(EdgeStorageEntry::Mem), ), - EdgesStorageRef::Unlocked(edges) => StorageVariants::Unlocked( + EdgesStorageRef::Unlocked(edges) => Either::Right( edges .par_iter() - .filter(move |e| e.has_layer(&layers)) + .filter(move |e| e.as_mem_edge().has_layer(&layers)) .map(EdgeStorageEntry::Unlocked), ), } @@ -130,7 +133,10 @@ impl<'a> EdgesStorageRef<'a> { EdgesStorageRef::Unlocked(edges) => match layers { LayerIds::None => 0, LayerIds::All => edges.len(), - _ => edges.par_iter().filter(|e| e.has_layer(layers)).count(), + _ => edges + .par_iter() + .filter(|e| e.as_mem_edge().has_layer(layers)) + .count(), }, #[cfg(feature = "storage")] EdgesStorageRef::Disk(storage) => storage.count(layers), diff --git a/raphtory/src/db/api/storage/locked.rs b/raphtory/src/db/api/storage/locked.rs index 8e8d5d2cc1..636c4a0d79 100644 --- a/raphtory/src/db/api/storage/locked.rs +++ b/raphtory/src/db/api/storage/locked.rs @@ -1,11 +1,21 @@ +use std::sync::Arc; + use crate::core::{ - entities::{edges::edge_store::EdgeStore, nodes::node_store::NodeStore, EID, VID}, + entities::{graph::edges::LockedEdges, nodes::node_store::NodeStore, VID}, storage::ReadLockedStorage, }; -use std::sync::Arc; -#[derive(Clone, Debug)] +#[derive(Debug)] pub struct LockedGraph { pub(crate) nodes: Arc>, - pub(crate) edges: Arc>, + pub(crate) edges: Arc, +} + +impl Clone for LockedGraph { + fn clone(&self) -> Self { + LockedGraph { + nodes: self.nodes.clone(), + edges: self.edges.clone(), + } + } } diff --git a/raphtory/src/db/api/storage/nodes/unlocked.rs b/raphtory/src/db/api/storage/nodes/unlocked.rs index 2217dcce21..2dcd6f229a 100644 --- a/raphtory/src/db/api/storage/nodes/unlocked.rs +++ b/raphtory/src/db/api/storage/nodes/unlocked.rs @@ -1,115 +1,27 @@ -use crate::core::{ - entities::{ - edges::edge_store::EdgeStore, graph::tgraph::InternalGraph, nodes::node_store::NodeStore, - LayerIds, - }, - storage::{ArcEntry, Entry}, -}; -use ouroboros::self_referencing; -use raphtory_api::core::{ - entities::{edges::edge_ref::EdgeRef, EID, VID}, - Direction, -}; +use crate::core::{entities::graph::tgraph_storage::GraphStorage, storage::raw_edges::EdgeRGuard}; +use raphtory_api::core::entities::EID; use rayon::prelude::*; -impl<'a> Entry<'a, NodeStore> { - pub fn into_edges_iter( - self, - layers: &'a LayerIds, - dir: Direction, - ) -> impl Iterator + 'a { - LockedEdgesRefIterBuilder { - entry: self, - iter_builder: |node| Box::new(node.edge_tuples(layers, dir)), - } - .build() - } -} - -#[self_referencing] -pub struct LockedEdgesRefIter<'a> { - entry: Entry<'a, NodeStore>, - #[borrows(entry)] - #[covariant] - iter: Box + Send + 'this>, -} - -impl<'a> Iterator for LockedEdgesRefIter<'a> { - type Item = EdgeRef; - - fn next(&mut self) -> Option { - self.with_iter_mut(|iter| iter.next()) - } -} - -#[derive(Clone, Copy, Debug)] -pub struct UnlockedNodes<'a>(pub &'a InternalGraph); - -impl<'a> UnlockedNodes<'a> { - pub fn len(self) -> usize { - self.0.inner().storage.nodes.len() - } - - pub fn node(&self, vid: VID) -> Entry<'a, NodeStore> { - self.0.inner().storage.nodes.entry(vid) - } - - pub fn iter(self) -> impl Iterator> + 'a { - let storage = &self.0.inner().storage.nodes; - (0..storage.len()).map(VID).map(|vid| storage.entry(vid)) - } - - pub fn par_iter(self) -> impl ParallelIterator> + 'a { - let storage = &self.0.inner().storage.nodes; - (0..storage.len()) - .into_par_iter() - .map(VID) - .map(|vid| storage.entry(vid)) - } -} - -#[derive(Debug, Clone)] -pub struct UnlockedOwnedNode { - g: InternalGraph, - vid: VID, -} - -impl UnlockedOwnedNode { - pub fn new(g: InternalGraph, vid: VID) -> Self { - Self { g, vid } - } - - pub fn arc_node(&self) -> ArcEntry { - self.g.inner().storage.nodes.entry_arc(self.vid) - } - - pub fn into_edges_iter( - self, - layers: LayerIds, - dir: Direction, - ) -> impl Iterator { - self.arc_node().into_edges(&layers, dir) - } -} - #[derive(Copy, Clone, Debug)] -pub struct UnlockedEdges<'a>(pub &'a InternalGraph); +pub struct UnlockedEdges<'a>(pub(crate) &'a GraphStorage); impl<'a> UnlockedEdges<'a> { - pub fn iter(self) -> impl Iterator> + 'a { - let storage = &self.0.inner().storage.edges; - (0..storage.len()).map(EID).map(|eid| storage.entry(eid)) + pub fn iter(self) -> impl Iterator> + 'a { + let storage = self.0; + (0..storage.edges_len()) + .map(EID) + .map(|eid| storage.edge_entry(eid)) } - pub fn par_iter(self) -> impl ParallelIterator> + 'a { - let storage = &self.0.inner().storage.edges; - (0..storage.len()) + pub fn par_iter(self) -> impl ParallelIterator> + 'a { + let storage = self.0; + (0..storage.edges_len()) .into_par_iter() .map(EID) - .map(|eid| storage.entry(eid)) + .map(|eid| storage.edge_entry(eid)) } pub fn len(self) -> usize { - self.0.inner().storage.edges.len() + self.0.edges_len() } } diff --git a/raphtory/src/db/api/storage/storage_ops.rs b/raphtory/src/db/api/storage/storage_ops.rs index 9001307010..57e8357aba 100644 --- a/raphtory/src/db/api/storage/storage_ops.rs +++ b/raphtory/src/db/api/storage/storage_ops.rs @@ -87,7 +87,7 @@ impl GraphStorage { match self { GraphStorage::Mem(storage) => NodeStorageEntry::Mem(storage.nodes.get(vid)), GraphStorage::Unlocked(storage) => { - NodeStorageEntry::Unlocked(storage.inner().node_entry(vid)) + NodeStorageEntry::Unlocked(storage.inner().storage.get_node(vid)) } #[cfg(feature = "storage")] GraphStorage::Disk(storage) => NodeStorageEntry::Disk(DiskNode::new(storage, vid)), @@ -110,7 +110,9 @@ impl GraphStorage { pub fn edges(&self) -> EdgesStorageRef { match self { GraphStorage::Mem(storage) => EdgesStorageRef::Mem(&storage.edges), - GraphStorage::Unlocked(storage) => EdgesStorageRef::Unlocked(UnlockedEdges(storage)), + GraphStorage::Unlocked(storage) => { + EdgesStorageRef::Unlocked(UnlockedEdges(&storage.inner().storage)) + } #[cfg(feature = "storage")] GraphStorage::Disk(storage) => EdgesStorageRef::Disk(DiskEdgesRef::new(storage)), } @@ -129,7 +131,7 @@ impl GraphStorage { match self { GraphStorage::Mem(storage) => EdgeStorageEntry::Mem(storage.edges.get(eid.pid())), GraphStorage::Unlocked(storage) => { - EdgeStorageEntry::Unlocked(storage.inner().edge_entry(eid.pid())) + EdgeStorageEntry::Unlocked(storage.inner().storage.edge_entry(eid.pid())) } #[cfg(feature = "storage")] GraphStorage::Disk(storage) => { @@ -226,15 +228,10 @@ impl GraphStorage { ) -> impl ParallelIterator + 'graph { view.node_list().into_par_iter().filter(move |&vid| { let node = self.node(vid); - let n = node.name(); - let i = node.node_type_id(); let r = type_filter .as_ref() .map_or(true, |type_filter| type_filter[node.node_type_id()]); let s = view.filter_node(self.node(vid).as_ref(), view.layer_ids()); - - println!("name = {:?}, id = {}, r = {}, s = {}", n, i, r, s); - r && s }) } diff --git a/raphtory/src/db/api/storage/tprop_storage_ops.rs b/raphtory/src/db/api/storage/tprop_storage_ops.rs index 3e98686ff2..31b7944fa6 100644 --- a/raphtory/src/db/api/storage/tprop_storage_ops.rs +++ b/raphtory/src/db/api/storage/tprop_storage_ops.rs @@ -1,14 +1,25 @@ -use crate::core::{entities::properties::tprop::TProp, storage::timeindex::AsTime, Prop}; -#[cfg(feature = "storage")] -use crate::db::api::storage::variants::storage_variants::StorageVariants; +use std::ops::{Deref, Range}; + +#[cfg(not(feature = "storage"))] +use either::Either; + #[cfg(feature = "storage")] use pometry_storage::tprops::DiskTProp; use raphtory_api::core::storage::timeindex::TimeIndexEntry; -use std::ops::Range; -#[derive(Copy, Clone, Debug)] +use crate::core::{ + entities::properties::tprop::TProp, + storage::{locked_view::LockedView, timeindex::AsTime}, + utils::iter::GenLockedIter, + Prop, +}; +#[cfg(feature = "storage")] +use crate::db::api::storage::variants::storage_variants3::StorageVariants; + +#[derive(Debug)] pub enum TPropRef<'a> { Mem(&'a TProp), + Locked(LockedView<'a, TProp>), #[cfg(feature = "storage")] Disk(DiskTProp<'a, TimeIndexEntry>), } @@ -17,6 +28,7 @@ macro_rules! for_all { ($value:expr, $pattern:pat => $result:expr) => { match $value { TPropRef::Mem($pattern) => $result, + TPropRef::Locked($pattern) => $result, #[cfg(feature = "storage")] TPropRef::Disk($pattern) => $result, } @@ -28,6 +40,7 @@ macro_rules! for_all_variants { ($value:expr, $pattern:pat => $result:expr) => { match $value { TPropRef::Mem($pattern) => StorageVariants::Mem($result), + TPropRef::Locked($pattern) => StorageVariants::Unlocked($result), TPropRef::Disk($pattern) => StorageVariants::Disk($result), } }; @@ -37,7 +50,8 @@ macro_rules! for_all_variants { macro_rules! for_all_variants { ($value:expr, $pattern:pat => $result:expr) => { match $value { - TPropRef::Mem($pattern) => $result, + TPropRef::Mem($pattern) => Either::Left($result), + TPropRef::Locked($pattern) => Either::Right($result), } }; } @@ -101,3 +115,22 @@ impl<'a> TPropOps<'a> for TPropRef<'a> { for_all!(self, tprop => tprop.len()) } } + +impl<'a> LockedView<'a, TProp> { + pub fn last_before(&self, t: i64) -> Option<(TimeIndexEntry, Prop)> { + self.deref().last_before(t) + } + + pub fn iter(self) -> impl Iterator + Send + 'a { + GenLockedIter::from(self, |t_prop| Box::new(t_prop.deref().iter())) + } + + pub fn iter_window( + self, + r: Range, + ) -> impl Iterator + Send + 'a { + GenLockedIter::from(self, move |t_prop| { + Box::new(t_prop.deref().iter_window(r.clone())) + }) + } +} diff --git a/raphtory/src/db/internal/core_ops.rs b/raphtory/src/db/internal/core_ops.rs index b8b50926d9..780e385572 100644 --- a/raphtory/src/db/internal/core_ops.rs +++ b/raphtory/src/db/internal/core_ops.rs @@ -1,7 +1,7 @@ use crate::{ core::{ entities::{ - edges::edge_ref::EdgeRef, + edges::{edge_ref::EdgeRef, edge_store::EdgeDataLike}, graph::tgraph::InternalGraph, nodes::node_ref::NodeRef, properties::{graph_meta::GraphMeta, props::Meta, tprop::TProp}, @@ -124,7 +124,7 @@ impl CoreGraphOps for InternalGraph { #[inline] fn constant_node_prop(&self, v: VID, prop_id: usize) -> Option { - let entry = self.inner().node_entry(v); + let entry = self.inner().storage.get_node(v); entry.const_prop(prop_id).cloned() } @@ -133,7 +133,8 @@ impl CoreGraphOps for InternalGraph { // FIXME: revisit the locking scheme so we don't have to collect the ids Box::new( self.inner() - .node_entry(v) + .storage + .get_node(v) .const_prop_ids() .collect_vec() .into_iter(), @@ -145,7 +146,8 @@ impl CoreGraphOps for InternalGraph { // FIXME: revisit the locking scheme so we don't have to collect the ids Box::new( self.inner() - .node_entry(v) + .storage + .get_node(v) .temporal_prop_ids() .collect_vec() .into_iter(), @@ -154,7 +156,7 @@ impl CoreGraphOps for InternalGraph { fn get_const_edge_prop(&self, e: EdgeRef, prop_id: usize, layer_ids: LayerIds) -> Option { let layer_ids = layer_ids.constrain_from_edge(e); - let entry = self.inner().edge_entry(e.pid()); + let entry = self.inner().storage.edge_entry(e.pid()); match layer_ids { LayerIds::None => None, LayerIds::All => { @@ -163,14 +165,12 @@ impl CoreGraphOps for InternalGraph { entry .layer_iter() .next() - .and_then(|data| data.layer.const_prop(prop_id).cloned()) + .and_then(|(_, data)| data.const_prop(prop_id).cloned()) } else { let prop_map: HashMap<_, _> = entry .layer_iter() - .enumerate() .flat_map(|(id, data)| { - data.layer - .const_prop(prop_id) + data.const_prop(prop_id) .map(|p| (self.inner().get_layer_name(id), p.clone())) }) .collect(); @@ -186,9 +186,8 @@ impl CoreGraphOps for InternalGraph { let prop_map: HashMap<_, _> = ids .iter() .flat_map(|&id| { - entry.layer(id).and_then(|layer| { - layer - .const_prop(prop_id) + entry.layer(id).and_then(|data| { + data.const_prop(prop_id) .map(|p| (self.inner().get_layer_name(id), p.clone())) }) }) @@ -207,14 +206,14 @@ impl CoreGraphOps for InternalGraph { e: EdgeRef, layer_ids: LayerIds, ) -> Box + '_> { - // FIXME: revisit the locking scheme so we don't have to collect all the ids + // // FIXME: revisit the locking scheme so we don't have to collect all the ids let layer_ids = layer_ids.constrain_from_edge(e); - let entry = self.inner().edge_entry(e.pid()); + let entry = self.inner().storage.edge_entry(e.pid()); let ids: Vec<_> = match layer_ids { LayerIds::None => vec![], LayerIds::All => entry .layer_iter() - .map(|data| data.layer.const_prop_ids()) + .map(|(_, data)| data.const_prop_ids()) .kmerge() .dedup() .collect(), @@ -237,8 +236,8 @@ impl CoreGraphOps for InternalGraph { e: EdgeRef, layer_ids: &LayerIds, ) -> Box + '_> { - // FIXME: revisit the locking scheme so we don't have to collect the ids - let entry = self.inner().edge_entry(e.pid()); + // // FIXME: revisit the locking scheme so we don't have to collect the ids + let entry = self.inner().storage.edge_entry(e.pid()); match layer_ids { LayerIds::None => Box::new(iter::empty()), LayerIds::All => Box::new(entry.temp_prop_ids(None).collect_vec().into_iter()), @@ -256,17 +255,17 @@ impl CoreGraphOps for InternalGraph { #[inline] fn core_edges(&self) -> EdgesStorage { - EdgesStorage::Mem(Arc::new(self.inner().storage.edges.read_lock())) + EdgesStorage::Mem(Arc::new(self.inner().storage.edges_read_lock())) } #[inline] fn core_nodes(&self) -> NodesStorage { - NodesStorage::Mem(Arc::new(self.inner().storage.nodes.read_lock())) + NodesStorage::Mem(Arc::new(self.inner().storage.nodes_read_lock())) } #[inline] fn core_edge(&self, eid: ELID) -> EdgeStorageEntry { - EdgeStorageEntry::Unlocked(self.inner().storage.edges.entry(eid.pid())) + EdgeStorageEntry::Unlocked(self.inner().storage.edge_entry(eid.pid())) } #[inline] @@ -279,12 +278,12 @@ impl CoreGraphOps for InternalGraph { } fn core_edge_arc(&self, eid: ELID) -> EdgeOwnedEntry { - EdgeOwnedEntry::Mem(self.inner().storage.edges.entry_arc(eid.pid())) + EdgeOwnedEntry::Mem(self.inner().storage.get_edge_arc(eid.pid())) } #[inline] fn unfiltered_num_edges(&self) -> usize { - self.inner().storage.edges.len() + self.inner().storage.edges_len() } } diff --git a/raphtory/src/db/internal/list_ops.rs b/raphtory/src/db/internal/list_ops.rs index 3be8e16cf6..ccbdefd7d5 100644 --- a/raphtory/src/db/internal/list_ops.rs +++ b/raphtory/src/db/internal/list_ops.rs @@ -6,13 +6,13 @@ use crate::{ impl ListOps for InternalGraph { fn node_list(&self) -> NodeList { NodeList::All { - num_nodes: self.inner().storage.nodes.len(), + num_nodes: self.inner().storage.nodes_len(), } } fn edge_list(&self) -> EdgeList { EdgeList::All { - num_edges: self.inner().storage.edges.len(), + num_edges: self.inner().storage.edges_len(), } } } diff --git a/raphtory/src/db/internal/prop_add.rs b/raphtory/src/db/internal/prop_add.rs index 0c2ac9611f..d98a10aab6 100644 --- a/raphtory/src/db/internal/prop_add.rs +++ b/raphtory/src/db/internal/prop_add.rs @@ -68,7 +68,7 @@ impl InternalPropertyAdditionOps for InternalGraph { props: Vec<(usize, Prop)>, ) -> Result<(), GraphError> { let mut edge = self.inner().storage.get_edge_mut(eid); - let mut edge_layer = edge.layer_mut(layer); + let edge_layer = edge.layer_mut(layer); for (prop_id, value) in props { edge_layer .add_constant_prop(prop_id, value) @@ -93,7 +93,7 @@ impl InternalPropertyAdditionOps for InternalGraph { props: Vec<(usize, Prop)>, ) -> Result<(), GraphError> { let mut edge = self.inner().storage.get_edge_mut(eid); - let mut edge_layer = edge.layer_mut(layer); + let edge_layer = edge.layer_mut(layer); for (prop_id, value) in props { edge_layer.update_constant_prop(prop_id, value)?; } diff --git a/raphtory/src/db/internal/time_semantics.rs b/raphtory/src/db/internal/time_semantics.rs index 732a669cbe..f8f9a3f01c 100644 --- a/raphtory/src/db/internal/time_semantics.rs +++ b/raphtory/src/db/internal/time_semantics.rs @@ -26,11 +26,21 @@ use std::ops::Range; impl TimeSemantics for InternalGraph { fn node_earliest_time(&self, v: VID) -> Option { - self.inner().node_entry(v).value().timestamps().first_t() + self.inner() + .storage + .get_node(v) + .value() + .timestamps() + .first_t() } fn node_latest_time(&self, v: VID) -> Option { - self.inner().node_entry(v).value().timestamps().last_t() + self.inner() + .storage + .get_node(v) + .value() + .timestamps() + .last_t() } fn view_start(&self) -> Option { @@ -71,7 +81,8 @@ impl TimeSemantics for InternalGraph { fn node_earliest_time_window(&self, v: VID, start: i64, end: i64) -> Option { self.inner() - .node_entry(v) + .storage + .get_node(v) .value() .timestamps() .range_t(start..end) @@ -80,7 +91,8 @@ impl TimeSemantics for InternalGraph { fn node_latest_time_window(&self, v: VID, start: i64, end: i64) -> Option { self.inner() - .node_entry(v) + .storage + .get_node(v) .value() .timestamps() .range_t(start..end) @@ -158,7 +170,7 @@ impl TimeSemantics for InternalGraph { } fn edge_exploded(&self, e: EdgeRef, layer_ids: &LayerIds) -> BoxedIter { - let entry = self.inner().storage.edges.entry_arc(e.pid()); + let entry = self.inner().storage.get_edge_arc(e.pid()); entry.into_exploded(layer_ids.clone(), e).into_dyn_boxed() } @@ -308,17 +320,17 @@ impl TimeSemantics for InternalGraph { } fn has_temporal_node_prop(&self, v: VID, prop_id: usize) -> bool { - let entry = self.inner().storage.nodes.get(v); + let entry = self.inner().storage.nodes.entry(v); entry.temporal_property(prop_id).is_some() } fn temporal_node_prop_vec(&self, v: VID, prop_id: usize) -> Vec<(i64, Prop)> { - let node = self.inner().storage.nodes.get(v); + let node = self.inner().storage.nodes.entry(v); node.temporal_properties(prop_id, None).collect() } fn has_temporal_node_prop_window(&self, v: VID, prop_id: usize, w: Range) -> bool { - let entry = self.inner().storage.nodes.get(v); + let entry = self.inner().storage.nodes.entry(v); entry .temporal_property(prop_id) .filter(|p| p.iter_window_t(w).next().is_some()) @@ -335,7 +347,7 @@ impl TimeSemantics for InternalGraph { self.inner() .storage .nodes - .get(v) + .entry(v) .temporal_properties(prop_id, Some(start..end)) .collect() }