diff --git a/raphtory/src/core/entities/graph/edges.rs b/raphtory/src/core/entities/graph/edges.rs deleted file mode 100644 index cad887d31d..0000000000 --- a/raphtory/src/core/entities/graph/edges.rs +++ /dev/null @@ -1,78 +0,0 @@ -use crate::{ - core::{ - entities::edges::edge_store::EdgeStore, - storage::raw_edges::{EdgeArcGuard, EdgeRGuard, EdgeWGuard, EdgesShard, LockedEdgesShard}, - }, - db::api::storage::edges::edge_storage_ops::MemEdge, -}; -use raphtory_api::core::entities::EID; -use serde::{Deserialize, Serialize}; - -#[derive(Debug, Deserialize, Serialize, PartialEq)] -pub(crate) struct EdgesStorage { - edges: EdgesShard, -} - -#[derive(Debug)] -pub struct LockedEdges { - edges: LockedEdgesShard, -} - -impl LockedEdges { - pub fn iter(&self) -> impl Iterator + '_ { - self.edges.iter() - } - - pub fn par_iter(&self) -> impl rayon::iter::ParallelIterator + '_ { - self.edges.par_iter() - } - - #[inline] - pub fn get(&self, id: EID) -> MemEdge { - self.edges.get_mem(id) - } - - pub fn len(&self) -> usize { - self.edges.len() - } -} - -impl EdgesStorage { - pub(crate) fn new() -> Self { - Self { - edges: EdgesShard::new(), - } - } - - pub fn read_lock(&self) -> LockedEdges { - LockedEdges { - edges: self.edges.lock(), - } - } - - #[inline] - pub(crate) fn len(&self) -> usize { - self.edges.len() - } - - pub(crate) fn push(&self, edge: EdgeStore) -> EdgeWGuard { - let (eid, mut edge) = self.edges.push(edge); - edge.edge_store_mut().eid = eid; - edge - } - - #[inline] - pub(crate) fn entry(&self, id: EID) -> EdgeRGuard { - self.edges.get_edge(id) - } - - #[inline] - pub(crate) fn entry_mut(&self, id: EID) -> EdgeWGuard { - self.edges.get_edge_mut(id) - } - - #[inline] - pub(crate) fn entry_arc(&self, id: EID) -> EdgeArcGuard { - self.edges.get_edge_arc(id) - } -} diff --git a/raphtory/src/core/entities/graph/mod.rs b/raphtory/src/core/entities/graph/mod.rs index 64575837f6..75a98c5915 100644 --- a/raphtory/src/core/entities/graph/mod.rs +++ b/raphtory/src/core/entities/graph/mod.rs @@ -1,4 +1,3 @@ -pub mod edges; pub mod tgraph; pub mod tgraph_storage; pub(crate) mod timer; diff --git a/raphtory/src/core/entities/graph/tgraph_storage.rs b/raphtory/src/core/entities/graph/tgraph_storage.rs index 1d5ca695f1..efdda59595 100644 --- a/raphtory/src/core/entities/graph/tgraph_storage.rs +++ b/raphtory/src/core/entities/graph/tgraph_storage.rs @@ -1,9 +1,8 @@ -use super::edges::{EdgesStorage, LockedEdges}; use crate::core::{ entities::{edges::edge_store::EdgeStore, nodes::node_store::NodeStore, EID, VID}, storage::{ self, - raw_edges::{EdgeArcGuard, EdgeRGuard, EdgeWGuard}, + raw_edges::{EdgeArcGuard, EdgeRGuard, EdgeWGuard, EdgesStorage, LockedEdges}, Entry, EntryMut, PairEntryMut, }, }; @@ -50,7 +49,7 @@ impl GraphStorage { } #[inline] pub(crate) fn push_edge(&self, edge: EdgeStore) -> EdgeWGuard { - self.edges.push(edge) + self.edges.push_edge(edge) } #[inline] @@ -59,8 +58,8 @@ impl GraphStorage { } #[inline] - pub(crate) fn get_edge_mut(&self, id: EID) -> EdgeWGuard { - self.edges.entry_mut(id) + pub(crate) fn get_edge_mut(&self, eid: EID) -> EdgeWGuard { + self.edges.get_edge_mut(eid) } #[inline] @@ -69,13 +68,13 @@ impl GraphStorage { } #[inline] - pub(crate) fn edge_entry(&self, id: EID) -> EdgeRGuard { - self.edges.entry(id) + pub(crate) fn edge_entry(&self, eid: EID) -> EdgeRGuard { + self.edges.get_edge(eid) } #[inline] - pub(crate) fn get_edge_arc(&self, id: EID) -> EdgeArcGuard { - self.edges.entry_arc(id) + pub(crate) fn get_edge_arc(&self, eid: EID) -> EdgeArcGuard { + self.edges.get_edge_arc(eid) } #[inline] diff --git a/raphtory/src/core/storage/raw_edges.rs b/raphtory/src/core/storage/raw_edges.rs index 4e6a4e2b36..b3119fe830 100644 --- a/raphtory/src/core/storage/raw_edges.rs +++ b/raphtory/src/core/storage/raw_edges.rs @@ -70,12 +70,12 @@ impl EdgeShard { pub const SHARD_SIZE: usize = 64; #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct EdgesShard { +pub struct EdgesStorage { shards: Arc<[Arc>]>, len: Arc, } -impl PartialEq for EdgesShard { +impl PartialEq for EdgesStorage { fn eq(&self, other: &Self) -> bool { self.shards.len() == other.shards.len() && self @@ -86,7 +86,7 @@ impl PartialEq for EdgesShard { } } -impl EdgesShard { +impl EdgesStorage { pub fn new() -> Self { let mut shards = (0..SHARD_SIZE).into_iter().map(|_| { Arc::new(RwLock::new(EdgeShard { @@ -96,7 +96,7 @@ impl EdgesShard { deletions: Vec::with_capacity(0), })) }); - EdgesShard { + EdgesStorage { shards: shards.collect(), len: Arc::new(AtomicUsize::new(0)), } @@ -107,8 +107,29 @@ impl EdgesShard { self.len.load(atomic::Ordering::SeqCst) } - pub fn lock(&self) -> LockedEdgesShard { - LockedEdgesShard { + pub(crate) fn push_edge(&self, edge: EdgeStore) -> EdgeWGuard { + let (eid, mut edge) = self.push(edge); + edge.edge_store_mut().eid = eid; + edge + } + + // #[inline] + // pub(crate) fn entry(&self, id: EID) -> EdgeRGuard { + // self.get_edge(id) + // } + + // #[inline] + // pub(crate) fn entry_mut(&self, id: EID) -> EdgeWGuard { + // self.get_edge_mut(id) + // } + + // #[inline] + // pub(crate) fn entry_arc(&self, id: EID) -> EdgeArcGuard { + // self.get_edge_arc(id) + // } + + pub fn read_lock(&self) -> LockedEdges { + LockedEdges { shards: self.shards.iter().map(|shard| shard.read_arc()).collect(), len: self.len(), } @@ -119,8 +140,8 @@ impl EdgesShard { resolve(index, self.shards.len()) } - pub fn push(&self, mut value: EdgeStore) -> (EID, EdgeWGuard) { - let index = self.len.fetch_add(1, atomic::Ordering::SeqCst); + fn push(&self, mut value: EdgeStore) -> (EID, EdgeWGuard) { + let index = self.len.fetch_add(1, atomic::Ordering::Relaxed); let (bucket, offset) = self.resolve(index); let mut shard = self.shards[bucket].write(); shard.insert(offset, value); @@ -263,12 +284,12 @@ impl<'a> EdgeRGuard<'a> { } #[derive(Debug)] -pub struct LockedEdgesShard { +pub struct LockedEdges { shards: Arc<[ArcRwLockReadGuard]>, len: usize, } -impl LockedEdgesShard { +impl LockedEdges { pub fn get(&self, eid: EID) -> &EdgeShard { let (bucket, offset) = resolve(eid.into(), self.shards.len()); let shard = &self.shards[bucket]; diff --git a/raphtory/src/db/api/storage/edges/edges.rs b/raphtory/src/db/api/storage/edges/edges.rs index 45ed2a88c7..cfc2443d11 100644 --- a/raphtory/src/db/api/storage/edges/edges.rs +++ b/raphtory/src/db/api/storage/edges/edges.rs @@ -1,22 +1,15 @@ -use std::sync::Arc; - -use rayon::iter::ParallelIterator; - -#[cfg(not(feature = "storage"))] -use either::Either; - -#[cfg(feature = "storage")] -use crate::db::api::storage::variants::storage_variants3::StorageVariants; -#[cfg(feature = "storage")] -use crate::disk_graph::storage_interface::edges::DiskEdges; +use super::{edge_entry::EdgeStorageEntry, unlocked::UnlockedEdges}; #[cfg(feature = "storage")] -use crate::disk_graph::storage_interface::edges_ref::DiskEdgesRef; +use crate::disk_graph::storage_interface::{edges::DiskEdges, edges_ref::DiskEdgesRef}; use crate::{ - core::entities::{graph::edges::LockedEdges, LayerIds}, - db::api::storage::{edges::edge_storage_ops::EdgeStorageOps, nodes::unlocked::UnlockedEdges}, + core::{entities::LayerIds, storage::raw_edges::LockedEdges}, + db::api::storage::edges::edge_storage_ops::EdgeStorageOps, }; -use super::edge_entry::EdgeStorageEntry; +use crate::db::api::storage::variants::storage_variants3::StorageVariants; + +use rayon::iter::ParallelIterator; +use std::sync::Arc; pub enum EdgesStorage { Mem(Arc), @@ -68,13 +61,13 @@ impl<'a> EdgesStorageRef<'a> { #[cfg(not(feature = "storage"))] pub fn iter(self, layers: LayerIds) -> impl Iterator> { match self { - EdgesStorageRef::Mem(storage) => Either::Left( + EdgesStorageRef::Mem(storage) => StorageVariants::Mem( storage .iter() .filter(move |e| e.has_layer(&layers)) .map(EdgeStorageEntry::Mem), ), - EdgesStorageRef::Unlocked(edges) => Either::Right( + EdgesStorageRef::Unlocked(edges) => StorageVariants::Unlocked( edges .iter() .filter(move |e| e.as_mem_edge().has_layer(&layers)) @@ -107,13 +100,13 @@ impl<'a> EdgesStorageRef<'a> { #[cfg(not(feature = "storage"))] pub fn par_iter(self, layers: LayerIds) -> impl ParallelIterator> { match self { - EdgesStorageRef::Mem(storage) => Either::Left( + EdgesStorageRef::Mem(storage) => StorageVariants::Mem( storage .par_iter() .filter(move |e| e.has_layer(&layers)) .map(EdgeStorageEntry::Mem), ), - EdgesStorageRef::Unlocked(edges) => Either::Right( + EdgesStorageRef::Unlocked(edges) => StorageVariants::Unlocked( edges .par_iter() .filter(move |e| e.as_mem_edge().has_layer(&layers)) diff --git a/raphtory/src/db/api/storage/edges/mod.rs b/raphtory/src/db/api/storage/edges/mod.rs index 2b16658360..be155e8413 100644 --- a/raphtory/src/db/api/storage/edges/mod.rs +++ b/raphtory/src/db/api/storage/edges/mod.rs @@ -3,3 +3,4 @@ pub mod edge_owned_entry; pub mod edge_ref; pub mod edge_storage_ops; pub mod edges; +pub mod unlocked; diff --git a/raphtory/src/db/api/storage/nodes/unlocked.rs b/raphtory/src/db/api/storage/edges/unlocked.rs similarity index 100% rename from raphtory/src/db/api/storage/nodes/unlocked.rs rename to raphtory/src/db/api/storage/edges/unlocked.rs diff --git a/raphtory/src/db/api/storage/locked.rs b/raphtory/src/db/api/storage/locked.rs index 636c4a0d79..c9d1efb675 100644 --- a/raphtory/src/db/api/storage/locked.rs +++ b/raphtory/src/db/api/storage/locked.rs @@ -1,8 +1,8 @@ use std::sync::Arc; use crate::core::{ - entities::{graph::edges::LockedEdges, nodes::node_store::NodeStore, VID}, - storage::ReadLockedStorage, + entities::{nodes::node_store::NodeStore, VID}, + storage::{raw_edges::LockedEdges, ReadLockedStorage}, }; #[derive(Debug)] diff --git a/raphtory/src/db/api/storage/nodes/mod.rs b/raphtory/src/db/api/storage/nodes/mod.rs index 015bd0ee05..26081101c8 100644 --- a/raphtory/src/db/api/storage/nodes/mod.rs +++ b/raphtory/src/db/api/storage/nodes/mod.rs @@ -4,4 +4,3 @@ pub mod node_ref; pub mod node_storage_ops; pub mod nodes; pub mod nodes_ref; -pub mod unlocked; diff --git a/raphtory/src/db/api/storage/storage_ops.rs b/raphtory/src/db/api/storage/storage_ops.rs index 57e8357aba..a8cb1e2db4 100644 --- a/raphtory/src/db/api/storage/storage_ops.rs +++ b/raphtory/src/db/api/storage/storage_ops.rs @@ -42,8 +42,8 @@ use crate::{ use pometry_storage::graph::TemporalGraph; use super::{ - edges::edge_entry::EdgeStorageEntry, - nodes::{node_entry::NodeStorageEntry, unlocked::UnlockedEdges}, + edges::{edge_entry::EdgeStorageEntry, unlocked::UnlockedEdges}, + nodes::node_entry::NodeStorageEntry, }; #[derive(Debug, Clone)] @@ -129,7 +129,7 @@ impl GraphStorage { pub fn edge(&self, eid: EdgeRef) -> EdgeStorageEntry { match self { - GraphStorage::Mem(storage) => EdgeStorageEntry::Mem(storage.edges.get(eid.pid())), + GraphStorage::Mem(storage) => EdgeStorageEntry::Mem(storage.edges.get_mem(eid.pid())), GraphStorage::Unlocked(storage) => { EdgeStorageEntry::Unlocked(storage.inner().storage.edge_entry(eid.pid())) } @@ -277,25 +277,25 @@ impl GraphStorage { EdgesStorage::Mem(edges) => { let iter = (0..edges.len()).map(EID); let filtered = match view.filter_state() { - FilterState::Neither => { - FilterVariants::Neither(iter.map(move |eid| edges.get(eid).as_edge_ref())) - } + FilterState::Neither => FilterVariants::Neither( + iter.map(move |eid| edges.get_mem(eid).as_edge_ref()), + ), FilterState::Both => FilterVariants::Both(iter.filter_map(move |e| { - let e = EdgeStorageRef::Mem(edges.get(e)); + let e = EdgeStorageRef::Mem(edges.get_mem(e)); (view.filter_edge(e, view.layer_ids()) && view.filter_node(nodes.node_entry(e.src()), view.layer_ids()) && view.filter_node(nodes.node_entry(e.dst()), view.layer_ids())) .then(|| e.out_ref()) })), FilterState::Nodes => FilterVariants::Nodes(iter.filter_map(move |e| { - let e = EdgeStorageRef::Mem(edges.get(e)); + let e = EdgeStorageRef::Mem(edges.get_mem(e)); (view.filter_node(nodes.node_entry(e.src()), view.layer_ids()) && view.filter_node(nodes.node_entry(e.dst()), view.layer_ids())) .then(|| e.out_ref()) })), FilterState::Edges | FilterState::BothIndependent => { FilterVariants::Edges(iter.filter_map(move |e| { - let e = EdgeStorageRef::Mem(edges.get(e)); + let e = EdgeStorageRef::Mem(edges.get_mem(e)); view.filter_edge(e, view.layer_ids()).then(|| e.out_ref()) })) } @@ -404,25 +404,25 @@ impl GraphStorage { EdgesStorage::Mem(edges) => { let iter = (0..edges.len()).into_par_iter().map(EID); let filtered = match view.filter_state() { - FilterState::Neither => { - FilterVariants::Neither(iter.map(move |eid| edges.get(eid).as_edge_ref())) - } + FilterState::Neither => FilterVariants::Neither( + iter.map(move |eid| edges.get_mem(eid).as_edge_ref()), + ), FilterState::Both => FilterVariants::Both(iter.filter_map(move |e| { - let e = EdgeStorageRef::Mem(edges.get(e)); + let e = EdgeStorageRef::Mem(edges.get_mem(e)); (view.filter_edge(e, view.layer_ids()) && view.filter_node(nodes.node_entry(e.src()), view.layer_ids()) && view.filter_node(nodes.node_entry(e.dst()), view.layer_ids())) .then(|| e.out_ref()) })), FilterState::Nodes => FilterVariants::Nodes(iter.filter_map(move |e| { - let e = EdgeStorageRef::Mem(edges.get(e)); + let e = EdgeStorageRef::Mem(edges.get_mem(e)); (view.filter_node(nodes.node_entry(e.src()), view.layer_ids()) && view.filter_node(nodes.node_entry(e.dst()), view.layer_ids())) .then(|| e.out_ref()) })), FilterState::Edges | FilterState::BothIndependent => { FilterVariants::Edges(iter.filter_map(move |e| { - let e = EdgeStorageRef::Mem(edges.get(e)); + let e = EdgeStorageRef::Mem(edges.get_mem(e)); view.filter_edge(e, view.layer_ids()).then(|| e.out_ref()) })) }