Skip to content

Commit

Permalink
fixes as per review
Browse files Browse the repository at this point in the history
  • Loading branch information
fabianmurariu committed Jul 2, 2024
1 parent 591dcec commit 2f7cb9b
Show file tree
Hide file tree
Showing 10 changed files with 69 additions and 135 deletions.
78 changes: 0 additions & 78 deletions raphtory/src/core/entities/graph/edges.rs

This file was deleted.

1 change: 0 additions & 1 deletion raphtory/src/core/entities/graph/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
pub mod edges;
pub mod tgraph;
pub mod tgraph_storage;
pub(crate) mod timer;
Expand Down
17 changes: 8 additions & 9 deletions raphtory/src/core/entities/graph/tgraph_storage.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use super::edges::{EdgesStorage, LockedEdges};
use crate::core::{
entities::{edges::edge_store::EdgeStore, nodes::node_store::NodeStore, EID, VID},
storage::{
self,
raw_edges::{EdgeArcGuard, EdgeRGuard, EdgeWGuard},
raw_edges::{EdgeArcGuard, EdgeRGuard, EdgeWGuard, EdgesStorage, LockedEdges},
Entry, EntryMut, PairEntryMut,
},
};
Expand Down Expand Up @@ -50,7 +49,7 @@ impl GraphStorage {
}
#[inline]
pub(crate) fn push_edge(&self, edge: EdgeStore) -> EdgeWGuard {
self.edges.push(edge)
self.edges.push_edge(edge)
}

#[inline]
Expand All @@ -59,8 +58,8 @@ impl GraphStorage {
}

#[inline]
pub(crate) fn get_edge_mut(&self, id: EID) -> EdgeWGuard {
self.edges.entry_mut(id)
pub(crate) fn get_edge_mut(&self, eid: EID) -> EdgeWGuard {
self.edges.get_edge_mut(eid)
}

#[inline]
Expand All @@ -69,13 +68,13 @@ impl GraphStorage {
}

#[inline]
pub(crate) fn edge_entry(&self, id: EID) -> EdgeRGuard {
self.edges.entry(id)
pub(crate) fn edge_entry(&self, eid: EID) -> EdgeRGuard {
self.edges.get_edge(eid)
}

#[inline]
pub(crate) fn get_edge_arc(&self, id: EID) -> EdgeArcGuard {
self.edges.entry_arc(id)
pub(crate) fn get_edge_arc(&self, eid: EID) -> EdgeArcGuard {
self.edges.get_edge_arc(eid)
}

#[inline]
Expand Down
41 changes: 31 additions & 10 deletions raphtory/src/core/storage/raw_edges.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,12 @@ impl EdgeShard {
pub const SHARD_SIZE: usize = 64;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EdgesShard {
pub struct EdgesStorage {
shards: Arc<[Arc<RwLock<EdgeShard>>]>,
len: Arc<AtomicUsize>,
}

impl PartialEq for EdgesShard {
impl PartialEq for EdgesStorage {
fn eq(&self, other: &Self) -> bool {
self.shards.len() == other.shards.len()
&& self
Expand All @@ -86,7 +86,7 @@ impl PartialEq for EdgesShard {
}
}

impl EdgesShard {
impl EdgesStorage {
pub fn new() -> Self {
let mut shards = (0..SHARD_SIZE).into_iter().map(|_| {
Arc::new(RwLock::new(EdgeShard {
Expand All @@ -96,7 +96,7 @@ impl EdgesShard {
deletions: Vec::with_capacity(0),
}))
});
EdgesShard {
EdgesStorage {
shards: shards.collect(),
len: Arc::new(AtomicUsize::new(0)),
}
Expand All @@ -107,8 +107,29 @@ impl EdgesShard {
self.len.load(atomic::Ordering::SeqCst)
}

pub fn lock(&self) -> LockedEdgesShard {
LockedEdgesShard {
pub(crate) fn push_edge(&self, edge: EdgeStore) -> EdgeWGuard {
let (eid, mut edge) = self.push(edge);
edge.edge_store_mut().eid = eid;
edge
}

// #[inline]
// pub(crate) fn entry(&self, id: EID) -> EdgeRGuard {
// self.get_edge(id)
// }

// #[inline]
// pub(crate) fn entry_mut(&self, id: EID) -> EdgeWGuard {
// self.get_edge_mut(id)
// }

// #[inline]
// pub(crate) fn entry_arc(&self, id: EID) -> EdgeArcGuard {
// self.get_edge_arc(id)
// }

pub fn read_lock(&self) -> LockedEdges {
LockedEdges {
shards: self.shards.iter().map(|shard| shard.read_arc()).collect(),
len: self.len(),
}
Expand All @@ -119,8 +140,8 @@ impl EdgesShard {
resolve(index, self.shards.len())
}

pub fn push(&self, mut value: EdgeStore) -> (EID, EdgeWGuard) {
let index = self.len.fetch_add(1, atomic::Ordering::SeqCst);
fn push(&self, mut value: EdgeStore) -> (EID, EdgeWGuard) {
let index = self.len.fetch_add(1, atomic::Ordering::Relaxed);
let (bucket, offset) = self.resolve(index);
let mut shard = self.shards[bucket].write();
shard.insert(offset, value);
Expand Down Expand Up @@ -263,12 +284,12 @@ impl<'a> EdgeRGuard<'a> {
}

#[derive(Debug)]
pub struct LockedEdgesShard {
pub struct LockedEdges {
shards: Arc<[ArcRwLockReadGuard<parking_lot::RawRwLock, EdgeShard>]>,
len: usize,
}

impl LockedEdgesShard {
impl LockedEdges {
pub fn get(&self, eid: EID) -> &EdgeShard {
let (bucket, offset) = resolve(eid.into(), self.shards.len());
let shard = &self.shards[bucket];
Expand Down
31 changes: 12 additions & 19 deletions raphtory/src/db/api/storage/edges/edges.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,15 @@
use std::sync::Arc;

use rayon::iter::ParallelIterator;

#[cfg(not(feature = "storage"))]
use either::Either;

#[cfg(feature = "storage")]
use crate::db::api::storage::variants::storage_variants3::StorageVariants;
#[cfg(feature = "storage")]
use crate::disk_graph::storage_interface::edges::DiskEdges;
use super::{edge_entry::EdgeStorageEntry, unlocked::UnlockedEdges};
#[cfg(feature = "storage")]
use crate::disk_graph::storage_interface::edges_ref::DiskEdgesRef;
use crate::disk_graph::storage_interface::{edges::DiskEdges, edges_ref::DiskEdgesRef};
use crate::{
core::entities::{graph::edges::LockedEdges, LayerIds},
db::api::storage::{edges::edge_storage_ops::EdgeStorageOps, nodes::unlocked::UnlockedEdges},
core::{entities::LayerIds, storage::raw_edges::LockedEdges},
db::api::storage::edges::edge_storage_ops::EdgeStorageOps,
};

use super::edge_entry::EdgeStorageEntry;
use crate::db::api::storage::variants::storage_variants3::StorageVariants;

use rayon::iter::ParallelIterator;
use std::sync::Arc;

pub enum EdgesStorage {
Mem(Arc<LockedEdges>),
Expand Down Expand Up @@ -68,13 +61,13 @@ impl<'a> EdgesStorageRef<'a> {
#[cfg(not(feature = "storage"))]
pub fn iter(self, layers: LayerIds) -> impl Iterator<Item = EdgeStorageEntry<'a>> {
match self {
EdgesStorageRef::Mem(storage) => Either::Left(
EdgesStorageRef::Mem(storage) => StorageVariants::Mem(
storage
.iter()
.filter(move |e| e.has_layer(&layers))
.map(EdgeStorageEntry::Mem),
),
EdgesStorageRef::Unlocked(edges) => Either::Right(
EdgesStorageRef::Unlocked(edges) => StorageVariants::Unlocked(
edges
.iter()
.filter(move |e| e.as_mem_edge().has_layer(&layers))
Expand Down Expand Up @@ -107,13 +100,13 @@ impl<'a> EdgesStorageRef<'a> {
#[cfg(not(feature = "storage"))]
pub fn par_iter(self, layers: LayerIds) -> impl ParallelIterator<Item = EdgeStorageEntry<'a>> {
match self {
EdgesStorageRef::Mem(storage) => Either::Left(
EdgesStorageRef::Mem(storage) => StorageVariants::Mem(
storage
.par_iter()
.filter(move |e| e.has_layer(&layers))
.map(EdgeStorageEntry::Mem),
),
EdgesStorageRef::Unlocked(edges) => Either::Right(
EdgesStorageRef::Unlocked(edges) => StorageVariants::Unlocked(
edges
.par_iter()
.filter(move |e| e.as_mem_edge().has_layer(&layers))
Expand Down
1 change: 1 addition & 0 deletions raphtory/src/db/api/storage/edges/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ pub mod edge_owned_entry;
pub mod edge_ref;
pub mod edge_storage_ops;
pub mod edges;
pub mod unlocked;
4 changes: 2 additions & 2 deletions raphtory/src/db/api/storage/locked.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::sync::Arc;

use crate::core::{
entities::{graph::edges::LockedEdges, nodes::node_store::NodeStore, VID},
storage::ReadLockedStorage,
entities::{nodes::node_store::NodeStore, VID},
storage::{raw_edges::LockedEdges, ReadLockedStorage},
};

#[derive(Debug)]
Expand Down
1 change: 0 additions & 1 deletion raphtory/src/db/api/storage/nodes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,3 @@ pub mod node_ref;
pub mod node_storage_ops;
pub mod nodes;
pub mod nodes_ref;
pub mod unlocked;
30 changes: 15 additions & 15 deletions raphtory/src/db/api/storage/storage_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ use crate::{
use pometry_storage::graph::TemporalGraph;

use super::{
edges::edge_entry::EdgeStorageEntry,
nodes::{node_entry::NodeStorageEntry, unlocked::UnlockedEdges},
edges::{edge_entry::EdgeStorageEntry, unlocked::UnlockedEdges},
nodes::node_entry::NodeStorageEntry,
};

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -129,7 +129,7 @@ impl GraphStorage {

pub fn edge(&self, eid: EdgeRef) -> EdgeStorageEntry {
match self {
GraphStorage::Mem(storage) => EdgeStorageEntry::Mem(storage.edges.get(eid.pid())),
GraphStorage::Mem(storage) => EdgeStorageEntry::Mem(storage.edges.get_mem(eid.pid())),
GraphStorage::Unlocked(storage) => {
EdgeStorageEntry::Unlocked(storage.inner().storage.edge_entry(eid.pid()))
}
Expand Down Expand Up @@ -277,25 +277,25 @@ impl GraphStorage {
EdgesStorage::Mem(edges) => {
let iter = (0..edges.len()).map(EID);
let filtered = match view.filter_state() {
FilterState::Neither => {
FilterVariants::Neither(iter.map(move |eid| edges.get(eid).as_edge_ref()))
}
FilterState::Neither => FilterVariants::Neither(
iter.map(move |eid| edges.get_mem(eid).as_edge_ref()),
),
FilterState::Both => FilterVariants::Both(iter.filter_map(move |e| {
let e = EdgeStorageRef::Mem(edges.get(e));
let e = EdgeStorageRef::Mem(edges.get_mem(e));
(view.filter_edge(e, view.layer_ids())
&& view.filter_node(nodes.node_entry(e.src()), view.layer_ids())
&& view.filter_node(nodes.node_entry(e.dst()), view.layer_ids()))
.then(|| e.out_ref())
})),
FilterState::Nodes => FilterVariants::Nodes(iter.filter_map(move |e| {
let e = EdgeStorageRef::Mem(edges.get(e));
let e = EdgeStorageRef::Mem(edges.get_mem(e));
(view.filter_node(nodes.node_entry(e.src()), view.layer_ids())
&& view.filter_node(nodes.node_entry(e.dst()), view.layer_ids()))
.then(|| e.out_ref())
})),
FilterState::Edges | FilterState::BothIndependent => {
FilterVariants::Edges(iter.filter_map(move |e| {
let e = EdgeStorageRef::Mem(edges.get(e));
let e = EdgeStorageRef::Mem(edges.get_mem(e));
view.filter_edge(e, view.layer_ids()).then(|| e.out_ref())
}))
}
Expand Down Expand Up @@ -404,25 +404,25 @@ impl GraphStorage {
EdgesStorage::Mem(edges) => {
let iter = (0..edges.len()).into_par_iter().map(EID);
let filtered = match view.filter_state() {
FilterState::Neither => {
FilterVariants::Neither(iter.map(move |eid| edges.get(eid).as_edge_ref()))
}
FilterState::Neither => FilterVariants::Neither(
iter.map(move |eid| edges.get_mem(eid).as_edge_ref()),
),
FilterState::Both => FilterVariants::Both(iter.filter_map(move |e| {
let e = EdgeStorageRef::Mem(edges.get(e));
let e = EdgeStorageRef::Mem(edges.get_mem(e));
(view.filter_edge(e, view.layer_ids())
&& view.filter_node(nodes.node_entry(e.src()), view.layer_ids())
&& view.filter_node(nodes.node_entry(e.dst()), view.layer_ids()))
.then(|| e.out_ref())
})),
FilterState::Nodes => FilterVariants::Nodes(iter.filter_map(move |e| {
let e = EdgeStorageRef::Mem(edges.get(e));
let e = EdgeStorageRef::Mem(edges.get_mem(e));
(view.filter_node(nodes.node_entry(e.src()), view.layer_ids())
&& view.filter_node(nodes.node_entry(e.dst()), view.layer_ids()))
.then(|| e.out_ref())
})),
FilterState::Edges | FilterState::BothIndependent => {
FilterVariants::Edges(iter.filter_map(move |e| {
let e = EdgeStorageRef::Mem(edges.get(e));
let e = EdgeStorageRef::Mem(edges.get_mem(e));
view.filter_edge(e, view.layer_ids()).then(|| e.out_ref())
}))
}
Expand Down

0 comments on commit 2f7cb9b

Please sign in to comment.