Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the edge storage more columnar to reduce memory usage #1670

Merged
merged 12 commits into from
Jul 2, 2024
2 changes: 1 addition & 1 deletion pometry-storage-private
fabianmurariu marked this conversation as resolved.
Show resolved Hide resolved
78 changes: 0 additions & 78 deletions raphtory/src/core/entities/graph/edges.rs

This file was deleted.

1 change: 0 additions & 1 deletion raphtory/src/core/entities/graph/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
pub mod edges;
pub mod tgraph;
pub mod tgraph_storage;
pub(crate) mod timer;
Expand Down
17 changes: 8 additions & 9 deletions raphtory/src/core/entities/graph/tgraph_storage.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use super::edges::{EdgesStorage, LockedEdges};
use crate::core::{
entities::{edges::edge_store::EdgeStore, nodes::node_store::NodeStore, EID, VID},
storage::{
self,
raw_edges::{EdgeArcGuard, EdgeRGuard, EdgeWGuard},
raw_edges::{EdgeArcGuard, EdgeRGuard, EdgeWGuard, EdgesStorage, LockedEdges},
Entry, EntryMut, PairEntryMut,
},
};
Expand Down Expand Up @@ -50,7 +49,7 @@ impl GraphStorage {
}
#[inline]
pub(crate) fn push_edge(&self, edge: EdgeStore) -> EdgeWGuard {
self.edges.push(edge)
self.edges.push_edge(edge)
}

#[inline]
Expand All @@ -59,8 +58,8 @@ impl GraphStorage {
}

#[inline]
pub(crate) fn get_edge_mut(&self, id: EID) -> EdgeWGuard {
self.edges.entry_mut(id)
pub(crate) fn get_edge_mut(&self, eid: EID) -> EdgeWGuard {
self.edges.get_edge_mut(eid)
}

#[inline]
Expand All @@ -69,13 +68,13 @@ impl GraphStorage {
}

#[inline]
pub(crate) fn edge_entry(&self, id: EID) -> EdgeRGuard {
self.edges.entry(id)
pub(crate) fn edge_entry(&self, eid: EID) -> EdgeRGuard {
self.edges.get_edge(eid)
}

#[inline]
pub(crate) fn get_edge_arc(&self, id: EID) -> EdgeArcGuard {
self.edges.entry_arc(id)
pub(crate) fn get_edge_arc(&self, eid: EID) -> EdgeArcGuard {
self.edges.get_edge_arc(eid)
}

#[inline]
Expand Down
41 changes: 31 additions & 10 deletions raphtory/src/core/storage/raw_edges.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,12 @@ impl EdgeShard {
pub const SHARD_SIZE: usize = 64;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EdgesShard {
pub struct EdgesStorage {
shards: Arc<[Arc<RwLock<EdgeShard>>]>,
len: Arc<AtomicUsize>,
}

impl PartialEq for EdgesShard {
impl PartialEq for EdgesStorage {
fn eq(&self, other: &Self) -> bool {
self.shards.len() == other.shards.len()
&& self
Expand All @@ -86,7 +86,7 @@ impl PartialEq for EdgesShard {
}
}

impl EdgesShard {
impl EdgesStorage {
pub fn new() -> Self {
let mut shards = (0..SHARD_SIZE).into_iter().map(|_| {
Arc::new(RwLock::new(EdgeShard {
Expand All @@ -96,7 +96,7 @@ impl EdgesShard {
deletions: Vec::with_capacity(0),
}))
});
EdgesShard {
EdgesStorage {
shards: shards.collect(),
len: Arc::new(AtomicUsize::new(0)),
}
Expand All @@ -107,8 +107,29 @@ impl EdgesShard {
self.len.load(atomic::Ordering::SeqCst)
}

pub fn lock(&self) -> LockedEdgesShard {
LockedEdgesShard {
pub(crate) fn push_edge(&self, edge: EdgeStore) -> EdgeWGuard {
let (eid, mut edge) = self.push(edge);
edge.edge_store_mut().eid = eid;
edge
}

// #[inline]
// pub(crate) fn entry(&self, id: EID) -> EdgeRGuard {
// self.get_edge(id)
// }

// #[inline]
// pub(crate) fn entry_mut(&self, id: EID) -> EdgeWGuard {
// self.get_edge_mut(id)
// }

// #[inline]
// pub(crate) fn entry_arc(&self, id: EID) -> EdgeArcGuard {
// self.get_edge_arc(id)
// }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you left in some comments


pub fn read_lock(&self) -> LockedEdges {
LockedEdges {
shards: self.shards.iter().map(|shard| shard.read_arc()).collect(),
len: self.len(),
}
Expand All @@ -119,8 +140,8 @@ impl EdgesShard {
resolve(index, self.shards.len())
}

pub fn push(&self, mut value: EdgeStore) -> (EID, EdgeWGuard) {
let index = self.len.fetch_add(1, atomic::Ordering::SeqCst);
fn push(&self, mut value: EdgeStore) -> (EID, EdgeWGuard) {
let index = self.len.fetch_add(1, atomic::Ordering::Relaxed);
let (bucket, offset) = self.resolve(index);
let mut shard = self.shards[bucket].write();
shard.insert(offset, value);
Expand Down Expand Up @@ -263,12 +284,12 @@ impl<'a> EdgeRGuard<'a> {
}

#[derive(Debug)]
pub struct LockedEdgesShard {
pub struct LockedEdges {
shards: Arc<[ArcRwLockReadGuard<parking_lot::RawRwLock, EdgeShard>]>,
len: usize,
}

impl LockedEdgesShard {
impl LockedEdges {
pub fn get(&self, eid: EID) -> &EdgeShard {
let (bucket, offset) = resolve(eid.into(), self.shards.len());
let shard = &self.shards[bucket];
Expand Down
31 changes: 12 additions & 19 deletions raphtory/src/db/api/storage/edges/edges.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,15 @@
use std::sync::Arc;

use rayon::iter::ParallelIterator;

#[cfg(not(feature = "storage"))]
use either::Either;

#[cfg(feature = "storage")]
use crate::db::api::storage::variants::storage_variants3::StorageVariants;
#[cfg(feature = "storage")]
use crate::disk_graph::storage_interface::edges::DiskEdges;
use super::{edge_entry::EdgeStorageEntry, unlocked::UnlockedEdges};
#[cfg(feature = "storage")]
use crate::disk_graph::storage_interface::edges_ref::DiskEdgesRef;
use crate::disk_graph::storage_interface::{edges::DiskEdges, edges_ref::DiskEdgesRef};
use crate::{
core::entities::{graph::edges::LockedEdges, LayerIds},
db::api::storage::{edges::edge_storage_ops::EdgeStorageOps, nodes::unlocked::UnlockedEdges},
core::{entities::LayerIds, storage::raw_edges::LockedEdges},
db::api::storage::edges::edge_storage_ops::EdgeStorageOps,
};

use super::edge_entry::EdgeStorageEntry;
use crate::db::api::storage::variants::storage_variants3::StorageVariants;

use rayon::iter::ParallelIterator;
use std::sync::Arc;

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still some blank lines which means this didn't get fully simplified

pub enum EdgesStorage {
Mem(Arc<LockedEdges>),
Expand Down Expand Up @@ -68,13 +61,13 @@ impl<'a> EdgesStorageRef<'a> {
#[cfg(not(feature = "storage"))]
pub fn iter(self, layers: LayerIds) -> impl Iterator<Item = EdgeStorageEntry<'a>> {
match self {
EdgesStorageRef::Mem(storage) => Either::Left(
EdgesStorageRef::Mem(storage) => StorageVariants::Mem(
storage
.iter()
.filter(move |e| e.has_layer(&layers))
.map(EdgeStorageEntry::Mem),
),
EdgesStorageRef::Unlocked(edges) => Either::Right(
EdgesStorageRef::Unlocked(edges) => StorageVariants::Unlocked(
edges
.iter()
.filter(move |e| e.as_mem_edge().has_layer(&layers))
Expand Down Expand Up @@ -107,13 +100,13 @@ impl<'a> EdgesStorageRef<'a> {
#[cfg(not(feature = "storage"))]
pub fn par_iter(self, layers: LayerIds) -> impl ParallelIterator<Item = EdgeStorageEntry<'a>> {
match self {
EdgesStorageRef::Mem(storage) => Either::Left(
EdgesStorageRef::Mem(storage) => StorageVariants::Mem(
storage
.par_iter()
.filter(move |e| e.has_layer(&layers))
.map(EdgeStorageEntry::Mem),
),
EdgesStorageRef::Unlocked(edges) => Either::Right(
EdgesStorageRef::Unlocked(edges) => StorageVariants::Unlocked(
edges
.par_iter()
.filter(move |e| e.as_mem_edge().has_layer(&layers))
Expand Down
1 change: 1 addition & 0 deletions raphtory/src/db/api/storage/edges/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ pub mod edge_owned_entry;
pub mod edge_ref;
pub mod edge_storage_ops;
pub mod edges;
pub mod unlocked;
4 changes: 2 additions & 2 deletions raphtory/src/db/api/storage/locked.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::sync::Arc;

use crate::core::{
entities::{graph::edges::LockedEdges, nodes::node_store::NodeStore, VID},
storage::ReadLockedStorage,
entities::{nodes::node_store::NodeStore, VID},
storage::{raw_edges::LockedEdges, ReadLockedStorage},
};

#[derive(Debug)]
Expand Down
1 change: 0 additions & 1 deletion raphtory/src/db/api/storage/nodes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,3 @@ pub mod node_ref;
pub mod node_storage_ops;
pub mod nodes;
pub mod nodes_ref;
pub mod unlocked;
30 changes: 15 additions & 15 deletions raphtory/src/db/api/storage/storage_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ use crate::{
use pometry_storage::graph::TemporalGraph;

use super::{
edges::edge_entry::EdgeStorageEntry,
nodes::{node_entry::NodeStorageEntry, unlocked::UnlockedEdges},
edges::{edge_entry::EdgeStorageEntry, unlocked::UnlockedEdges},
nodes::node_entry::NodeStorageEntry,
};

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -129,7 +129,7 @@ impl GraphStorage {

pub fn edge(&self, eid: EdgeRef) -> EdgeStorageEntry {
match self {
GraphStorage::Mem(storage) => EdgeStorageEntry::Mem(storage.edges.get(eid.pid())),
GraphStorage::Mem(storage) => EdgeStorageEntry::Mem(storage.edges.get_mem(eid.pid())),
GraphStorage::Unlocked(storage) => {
EdgeStorageEntry::Unlocked(storage.inner().storage.edge_entry(eid.pid()))
}
Expand Down Expand Up @@ -277,25 +277,25 @@ impl GraphStorage {
EdgesStorage::Mem(edges) => {
let iter = (0..edges.len()).map(EID);
let filtered = match view.filter_state() {
FilterState::Neither => {
FilterVariants::Neither(iter.map(move |eid| edges.get(eid).as_edge_ref()))
}
FilterState::Neither => FilterVariants::Neither(
iter.map(move |eid| edges.get_mem(eid).as_edge_ref()),
),
FilterState::Both => FilterVariants::Both(iter.filter_map(move |e| {
let e = EdgeStorageRef::Mem(edges.get(e));
let e = EdgeStorageRef::Mem(edges.get_mem(e));
(view.filter_edge(e, view.layer_ids())
&& view.filter_node(nodes.node_entry(e.src()), view.layer_ids())
&& view.filter_node(nodes.node_entry(e.dst()), view.layer_ids()))
.then(|| e.out_ref())
})),
FilterState::Nodes => FilterVariants::Nodes(iter.filter_map(move |e| {
let e = EdgeStorageRef::Mem(edges.get(e));
let e = EdgeStorageRef::Mem(edges.get_mem(e));
(view.filter_node(nodes.node_entry(e.src()), view.layer_ids())
&& view.filter_node(nodes.node_entry(e.dst()), view.layer_ids()))
.then(|| e.out_ref())
})),
FilterState::Edges | FilterState::BothIndependent => {
FilterVariants::Edges(iter.filter_map(move |e| {
let e = EdgeStorageRef::Mem(edges.get(e));
let e = EdgeStorageRef::Mem(edges.get_mem(e));
view.filter_edge(e, view.layer_ids()).then(|| e.out_ref())
}))
}
Expand Down Expand Up @@ -404,25 +404,25 @@ impl GraphStorage {
EdgesStorage::Mem(edges) => {
let iter = (0..edges.len()).into_par_iter().map(EID);
let filtered = match view.filter_state() {
FilterState::Neither => {
FilterVariants::Neither(iter.map(move |eid| edges.get(eid).as_edge_ref()))
}
FilterState::Neither => FilterVariants::Neither(
iter.map(move |eid| edges.get_mem(eid).as_edge_ref()),
),
FilterState::Both => FilterVariants::Both(iter.filter_map(move |e| {
let e = EdgeStorageRef::Mem(edges.get(e));
let e = EdgeStorageRef::Mem(edges.get_mem(e));
(view.filter_edge(e, view.layer_ids())
&& view.filter_node(nodes.node_entry(e.src()), view.layer_ids())
&& view.filter_node(nodes.node_entry(e.dst()), view.layer_ids()))
.then(|| e.out_ref())
})),
FilterState::Nodes => FilterVariants::Nodes(iter.filter_map(move |e| {
let e = EdgeStorageRef::Mem(edges.get(e));
let e = EdgeStorageRef::Mem(edges.get_mem(e));
(view.filter_node(nodes.node_entry(e.src()), view.layer_ids())
&& view.filter_node(nodes.node_entry(e.dst()), view.layer_ids()))
.then(|| e.out_ref())
})),
FilterState::Edges | FilterState::BothIndependent => {
FilterVariants::Edges(iter.filter_map(move |e| {
let e = EdgeStorageRef::Mem(edges.get(e));
let e = EdgeStorageRef::Mem(edges.get_mem(e));
view.filter_edge(e, view.layer_ids()).then(|| e.out_ref())
}))
}
Expand Down
Loading