Skip to content

Commit

Permalink
Write-locked graph storage implementation for faster bulk loaders (#1741
Browse files Browse the repository at this point in the history
)

* write locked graph implementation

* wip implementing edge resolution

* load_edges_from_df is passing all tests in rust with new implementation

* fix warnings

* don't hit the map so hard

* track the time updates on node and graph

* fix warning

* move the buffers out of the loop and don't allocate storage for edge properties if there are none

* resolution improvements in the batch loader

* remove unused iter in NodeColOps trait as it was slower than calling get

* remove unused imports

* fmt

* get the lock outside the loop

* add cache to df loaders

* add proptest for df loader

* add test for cache and fix silly bug

* clean up unused resolve_node_no_init and update bytemuck version

* new is not used again
  • Loading branch information
ljeub-pometry authored Sep 6, 2024
1 parent b317933 commit 51bb68c
Show file tree
Hide file tree
Showing 38 changed files with 1,005 additions and 205 deletions.
6 changes: 4 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ wasm-bindgen-test = "0.3.41"
memmap2 = { version = "0.9.4" }
ahash = { version = "0.8.3", features = ["serde"] }
strum = { version = "0.26.1", features = ["derive"] }
bytemuck = { version = "1.15.0" }
bytemuck = { version = "1.18.0", features = ["derive"] }
ouroboros = "0.18.3"
url = "2.2"
base64-compat = { package = "base64-compat", version = "1.0.0" }
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ python-fmt:

tidy: rust-fmt stubs python-fmt

build-python:
build-python: activate-storage
cd python && maturin develop -r --features storage

python-docs:
Expand Down
2 changes: 1 addition & 1 deletion pometry-storage-private
1 change: 1 addition & 0 deletions raphtory-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ edition.workspace = true

[dependencies]
serde = { workspace = true, features = ["derive"] }
bytemuck = { workspace = true }
chrono.workspace = true
dashmap = { workspace = true }
rustc-hash = { workspace = true }
Expand Down
24 changes: 24 additions & 0 deletions raphtory-api/src/atomic_extra/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use std::sync::atomic::{AtomicU64, AtomicUsize};

/// Construct atomic slice from mut slice (reimplementation of currently unstable feature)
#[inline]
pub fn atomic_usize_from_mut_slice(v: &mut [usize]) -> &mut [AtomicUsize] {
use std::mem::align_of;
let [] = [(); align_of::<AtomicUsize>() - align_of::<usize>()];
// SAFETY:
// - the mutable reference guarantees unique ownership.
// - the alignment of `usize` and `AtomicUsize` is the
// same, as verified above.
unsafe { &mut *(v as *mut [usize] as *mut [AtomicUsize]) }
}

#[inline]
pub fn atomic_u64_from_mut_slice(v: &mut [u64]) -> &mut [AtomicU64] {
use std::mem::align_of;
let [] = [(); align_of::<AtomicU64>() - align_of::<u64>()];
// SAFETY:
// - the mutable reference guarantees unique ownership.
// - the alignment of `u64` and `AtomicU64` is the
// same, as verified above.
unsafe { &mut *(v as *mut [u64] as *mut [AtomicU64]) }
}
52 changes: 48 additions & 4 deletions raphtory-api/src/core/entities/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use self::edges::edge_ref::EdgeRef;
use super::input::input_node::parse_u64_strict;
use bytemuck::{Pod, Zeroable};
use num_traits::ToPrimitive;
use serde::{Deserialize, Serialize};
use std::{
Expand All @@ -12,10 +13,16 @@ pub mod edges;
// the only reason this is public is because the physical ids of the nodes don't move
#[repr(transparent)]
#[derive(
Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Deserialize, Serialize, Default,
Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Deserialize, Serialize, Pod, Zeroable,
)]
pub struct VID(pub usize);

impl Default for VID {
fn default() -> Self {
VID(usize::MAX)
}
}

impl VID {
pub fn index(&self) -> usize {
self.0
Expand All @@ -40,10 +47,16 @@ impl From<VID> for usize {

#[repr(transparent)]
#[derive(
Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Deserialize, Serialize, Default,
Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Deserialize, Serialize, Pod, Zeroable,
)]
pub struct EID(pub usize);

impl Default for EID {
fn default() -> Self {
EID(usize::MAX)
}
}

impl EID {
pub fn as_u64(self) -> u64 {
self.0 as u64
Expand Down Expand Up @@ -105,7 +118,7 @@ pub enum GID {

impl Default for GID {
fn default() -> Self {
GID::U64(0)
GID::U64(u64::MAX)
}
}

Expand All @@ -119,6 +132,12 @@ impl Display for GID {
}

impl GID {
pub fn dtype(&self) -> GidType {
match self {
GID::U64(_) => GidType::U64,
GID::Str(_) => GidType::Str,
}
}
pub fn into_str(self) -> Option<String> {
match self {
GID::Str(v) => Some(v),
Expand Down Expand Up @@ -147,7 +166,7 @@ impl GID {
}
}

pub fn to_str(&self) -> Cow<String> {
pub fn to_str(&self) -> Cow<str> {
match self {
GID::U64(v) => Cow::Owned(v.to_string()),
GID::Str(v) => Cow::Borrowed(v),
Expand Down Expand Up @@ -202,6 +221,25 @@ pub enum GidRef<'a> {
Str(&'a str),
}

#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
pub enum GidType {
U64,
Str,
}

impl Display for GidType {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
GidType::U64 => {
write!(f, "Numeric")
}
GidType::Str => {
write!(f, "String")
}
}
}
}

impl Display for GidRef<'_> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Expand All @@ -221,6 +259,12 @@ impl<'a> From<&'a GID> for GidRef<'a> {
}

impl<'a> GidRef<'a> {
pub fn dtype(self) -> GidType {
match self {
GidRef::U64(_) => GidType::U64,
GidRef::Str(_) => GidType::Str,
}
}
pub fn as_str(self) -> Option<&'a str> {
match self {
GidRef::Str(s) => Some(s),
Expand Down
2 changes: 1 addition & 1 deletion raphtory-api/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pub mod atomic_extra;
pub mod core;

#[cfg(feature = "python")]
pub mod python;
3 changes: 2 additions & 1 deletion raphtory/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ serde_json = { workspace = true }
ouroboros = { workspace = true }
either = { workspace = true }
kdam = { workspace = true }

bytemuck = { workspace = true }

# io optional dependencies
csv = { workspace = true, optional = true }
Expand Down Expand Up @@ -73,6 +73,7 @@ pometry-storage = { workspace = true, optional = true }
prost = { workspace = true, optional = true }
prost-types = { workspace = true, optional = true }


[target.'cfg(target_os = "macos")'.dependencies]
snmalloc-rs = { workspace = true }

Expand Down
4 changes: 4 additions & 0 deletions raphtory/src/core/entities/edges/edge_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ impl EdgeStore {
}
}

pub fn initialised(&self) -> bool {
self.eid != EID::default()
}

pub fn as_edge_ref(&self) -> EdgeRef {
EdgeRef::new_outgoing(self.eid, self.src, self.dst)
}
Expand Down
47 changes: 45 additions & 2 deletions raphtory/src/core/entities/graph/logical_to_physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use dashmap::mapref::entry::Entry;
use either::Either;
use once_cell::sync::OnceCell;
use raphtory_api::core::{
entities::{GidRef, VID},
entities::{GidRef, GidType, VID},
storage::{dict_mapper::MaybeNew, FxDashMap},
};
use serde::{Deserialize, Deserializer, Serialize};
Expand Down Expand Up @@ -47,6 +47,12 @@ pub(crate) struct Mapping {
}

impl Mapping {
pub fn dtype(&self) -> Option<GidType> {
self.map.get().map(|map| match map {
Map::U64(_) => GidType::U64,
Map::Str(_) => GidType::Str,
})
}
pub fn new() -> Self {
Mapping {
map: OnceCell::new(),
Expand All @@ -69,7 +75,44 @@ impl Mapping {
.ok_or_else(|| MutateGraphError::InvalidNodeId(gid.into()).into())
}

pub fn get_or_init<'a>(
pub fn get_or_init(
&self,
gid: GidRef,
next_id: impl FnOnce() -> VID,
) -> Result<MaybeNew<VID>, GraphError> {
let map = self.map.get_or_init(|| match &gid {
GidRef::U64(_) => Map::U64(FxDashMap::default()),
GidRef::Str(_) => Map::Str(FxDashMap::default()),
});
let vid = match gid {
GidRef::U64(id) => map.as_u64().map(|m| match m.entry(id) {
Entry::Occupied(id) => MaybeNew::Existing(*id.get()),
Entry::Vacant(entry) => {
let vid = next_id();
entry.insert(vid);
MaybeNew::New(vid)
}
}),
GidRef::Str(id) => map.as_str().map(|m| {
m.get(id)
.map(|vid| MaybeNew::Existing(*vid))
.unwrap_or_else(|| match m.entry(id.to_owned()) {
Entry::Occupied(entry) => MaybeNew::Existing(*entry.get()),
Entry::Vacant(entry) => {
let vid = next_id();
entry.insert(vid);
MaybeNew::New(vid)
}
})
}),
};

vid.ok_or_else(|| GraphError::FailedToMutateGraph {
source: MutateGraphError::InvalidNodeId(gid.into()),
})
}

pub fn get_or_init_node<'a>(
&self,
gid: GidRef,
f_init: impl FnOnce() -> UninitialisedEntry<'a, NodeStore>,
Expand Down
25 changes: 12 additions & 13 deletions raphtory/src/core/entities/graph/tgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,14 @@ use crate::{
LayerIds, EID, VID,
},
storage::{
raw_edges::EdgeWGuard,
raw_edges::MutEdge,
timeindex::{AsTime, TimeIndexEntry},
PairEntryMut,
},
utils::errors::GraphError,
Direction, Prop,
},
db::api::{storage::graph::edges::edge_storage_ops::EdgeStorageOps, view::Layer},
DEFAULT_NUM_SHARDS,
};
use dashmap::DashSet;
use either::Either;
Expand Down Expand Up @@ -81,7 +80,7 @@ impl std::fmt::Display for TemporalGraph {

impl Default for TemporalGraph {
fn default() -> Self {
Self::new(DEFAULT_NUM_SHARDS)
Self::new(rayon::current_num_threads())
}
}

Expand Down Expand Up @@ -344,7 +343,7 @@ impl TemporalGraph {
eid: EID,
t: TimeIndexEntry,
layer: usize,
edge_fn: impl FnOnce(&mut EdgeWGuard) -> Result<(), GraphError>,
edge_fn: impl FnOnce(MutEdge) -> Result<(), GraphError>,
) -> Result<(), GraphError> {
let (src, dst) = {
let edge_r = self.storage.edges.get_edge(eid);
Expand All @@ -357,10 +356,10 @@ impl TemporalGraph {
self.link_nodes_inner(&mut node_pair, eid, t, layer)?;
}
let mut edge_w = self.storage.edges.get_edge_mut(eid);
edge_fn(&mut edge_w)
edge_fn(edge_w.as_mut())
}

pub(crate) fn link_nodes<F: FnOnce(&mut EdgeWGuard) -> Result<(), GraphError>>(
pub(crate) fn link_nodes<F: FnOnce(MutEdge) -> Result<(), GraphError>>(
&self,
src_id: VID,
dst_id: VID,
Expand All @@ -371,12 +370,12 @@ impl TemporalGraph {
let edge = {
let mut node_pair = self.storage.pair_node_mut(src_id, dst_id);
let src = node_pair.get_i();
let edge = match src.find_edge_eid(dst_id, &LayerIds::All) {
let mut edge = match src.find_edge_eid(dst_id, &LayerIds::All) {
Some(edge_id) => Either::Left(self.storage.get_edge_mut(edge_id)),
None => Either::Right(self.storage.push_edge(EdgeStore::new(src_id, dst_id))),
};
let eid = match edge.as_ref() {
Either::Left(edge) => edge.edge_store().eid,
let eid = match edge.as_mut() {
Either::Left(edge) => edge.as_ref().eid(),
Either::Right(edge) => edge.value().eid,
};
self.link_nodes_inner(&mut node_pair, eid, t, layer)?;
Expand All @@ -385,13 +384,13 @@ impl TemporalGraph {

match edge {
Either::Left(mut edge) => {
edge_fn(&mut edge)?;
Ok(MaybeNew::Existing(edge.edge_store().eid))
edge_fn(edge.as_mut())?;
Ok(MaybeNew::Existing(edge.as_ref().eid()))
}
Either::Right(edge) => {
let mut edge = edge.init();
edge_fn(&mut edge)?;
Ok(MaybeNew::New(edge.edge_store().eid))
edge_fn(edge.as_mut())?;
Ok(MaybeNew::New(edge.as_ref().eid()))
}
}
}
Expand Down
Loading

0 comments on commit 51bb68c

Please sign in to comment.