diff --git a/Cargo.lock b/Cargo.lock index 9a73e5e1e..3a37f998d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -840,9 +840,9 @@ checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" [[package]] name = "bytemuck" -version = "1.17.1" +version = "1.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "773d90827bc3feecfb67fab12e24de0749aad83c74b9504ecde46237b5cd24e2" +checksum = "94bbb0ad554ad961ddc5da507a12a29b14e4ae5bda06b19f575a3e6079d2e2ae" dependencies = [ "bytemuck_derive", ] @@ -4344,6 +4344,7 @@ dependencies = [ "async-openai", "async-trait", "bincode", + "bytemuck", "bzip2", "chrono", "csv", @@ -4402,6 +4403,7 @@ dependencies = [ name = "raphtory-api" version = "0.11.2" dependencies = [ + "bytemuck", "chrono", "dashmap 6.0.1", "lock_api", diff --git a/Cargo.toml b/Cargo.toml index 9ad496a1b..062466032 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -119,7 +119,7 @@ wasm-bindgen-test = "0.3.41" memmap2 = { version = "0.9.4" } ahash = { version = "0.8.3", features = ["serde"] } strum = { version = "0.26.1", features = ["derive"] } -bytemuck = { version = "1.15.0" } +bytemuck = { version = "1.18.0", features = ["derive"] } ouroboros = "0.18.3" url = "2.2" base64-compat = { package = "base64-compat", version = "1.0.0" } diff --git a/Makefile b/Makefile index b6e4832e6..dc83b61cf 100644 --- a/Makefile +++ b/Makefile @@ -54,7 +54,7 @@ python-fmt: tidy: rust-fmt stubs python-fmt -build-python: +build-python: activate-storage cd python && maturin develop -r --features storage python-docs: diff --git a/pometry-storage-private b/pometry-storage-private index 942230d44..f94a7ee78 160000 --- a/pometry-storage-private +++ b/pometry-storage-private @@ -1 +1 @@ -Subproject commit 942230d44640241016f5780c348ee795bc633568 +Subproject commit f94a7ee78e0cb3d99398775d8ca5b666b8e41730 diff --git a/raphtory-api/Cargo.toml b/raphtory-api/Cargo.toml index aedc155f0..7109b4d4f 100644 --- a/raphtory-api/Cargo.toml +++ b/raphtory-api/Cargo.toml @@ -16,6 +16,7 @@ edition.workspace = true [dependencies] serde = { workspace = true, features = ["derive"] } +bytemuck = { workspace = true } chrono.workspace = true dashmap = { workspace = true } rustc-hash = { workspace = true } diff --git a/raphtory-api/src/atomic_extra/mod.rs b/raphtory-api/src/atomic_extra/mod.rs new file mode 100644 index 000000000..c486670e1 --- /dev/null +++ b/raphtory-api/src/atomic_extra/mod.rs @@ -0,0 +1,24 @@ +use std::sync::atomic::{AtomicU64, AtomicUsize}; + +/// Construct atomic slice from mut slice (reimplementation of currently unstable feature) +#[inline] +pub fn atomic_usize_from_mut_slice(v: &mut [usize]) -> &mut [AtomicUsize] { + use std::mem::align_of; + let [] = [(); align_of::() - align_of::()]; + // SAFETY: + // - the mutable reference guarantees unique ownership. + // - the alignment of `usize` and `AtomicUsize` is the + // same, as verified above. + unsafe { &mut *(v as *mut [usize] as *mut [AtomicUsize]) } +} + +#[inline] +pub fn atomic_u64_from_mut_slice(v: &mut [u64]) -> &mut [AtomicU64] { + use std::mem::align_of; + let [] = [(); align_of::() - align_of::()]; + // SAFETY: + // - the mutable reference guarantees unique ownership. + // - the alignment of `u64` and `AtomicU64` is the + // same, as verified above. + unsafe { &mut *(v as *mut [u64] as *mut [AtomicU64]) } +} diff --git a/raphtory-api/src/core/entities/mod.rs b/raphtory-api/src/core/entities/mod.rs index f5f63237a..476f51b33 100644 --- a/raphtory-api/src/core/entities/mod.rs +++ b/raphtory-api/src/core/entities/mod.rs @@ -1,5 +1,6 @@ use self::edges::edge_ref::EdgeRef; use super::input::input_node::parse_u64_strict; +use bytemuck::{Pod, Zeroable}; use num_traits::ToPrimitive; use serde::{Deserialize, Serialize}; use std::{ @@ -12,10 +13,16 @@ pub mod edges; // the only reason this is public is because the physical ids of the nodes don't move #[repr(transparent)] #[derive( - Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Deserialize, Serialize, Default, + Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Deserialize, Serialize, Pod, Zeroable, )] pub struct VID(pub usize); +impl Default for VID { + fn default() -> Self { + VID(usize::MAX) + } +} + impl VID { pub fn index(&self) -> usize { self.0 @@ -40,10 +47,16 @@ impl From for usize { #[repr(transparent)] #[derive( - Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Deserialize, Serialize, Default, + Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Deserialize, Serialize, Pod, Zeroable, )] pub struct EID(pub usize); +impl Default for EID { + fn default() -> Self { + EID(usize::MAX) + } +} + impl EID { pub fn as_u64(self) -> u64 { self.0 as u64 @@ -105,7 +118,7 @@ pub enum GID { impl Default for GID { fn default() -> Self { - GID::U64(0) + GID::U64(u64::MAX) } } @@ -119,6 +132,12 @@ impl Display for GID { } impl GID { + pub fn dtype(&self) -> GidType { + match self { + GID::U64(_) => GidType::U64, + GID::Str(_) => GidType::Str, + } + } pub fn into_str(self) -> Option { match self { GID::Str(v) => Some(v), @@ -147,7 +166,7 @@ impl GID { } } - pub fn to_str(&self) -> Cow { + pub fn to_str(&self) -> Cow { match self { GID::U64(v) => Cow::Owned(v.to_string()), GID::Str(v) => Cow::Borrowed(v), @@ -202,6 +221,25 @@ pub enum GidRef<'a> { Str(&'a str), } +#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] +pub enum GidType { + U64, + Str, +} + +impl Display for GidType { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + GidType::U64 => { + write!(f, "Numeric") + } + GidType::Str => { + write!(f, "String") + } + } + } +} + impl Display for GidRef<'_> { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { @@ -221,6 +259,12 @@ impl<'a> From<&'a GID> for GidRef<'a> { } impl<'a> GidRef<'a> { + pub fn dtype(self) -> GidType { + match self { + GidRef::U64(_) => GidType::U64, + GidRef::Str(_) => GidType::Str, + } + } pub fn as_str(self) -> Option<&'a str> { match self { GidRef::Str(s) => Some(s), diff --git a/raphtory-api/src/lib.rs b/raphtory-api/src/lib.rs index eeaed5934..6a943145f 100644 --- a/raphtory-api/src/lib.rs +++ b/raphtory-api/src/lib.rs @@ -1,4 +1,4 @@ +pub mod atomic_extra; pub mod core; - #[cfg(feature = "python")] pub mod python; diff --git a/raphtory/Cargo.toml b/raphtory/Cargo.toml index ccc13f84e..4a000e104 100644 --- a/raphtory/Cargo.toml +++ b/raphtory/Cargo.toml @@ -40,7 +40,7 @@ serde_json = { workspace = true } ouroboros = { workspace = true } either = { workspace = true } kdam = { workspace = true } - +bytemuck = { workspace = true } # io optional dependencies csv = { workspace = true, optional = true } @@ -73,6 +73,7 @@ pometry-storage = { workspace = true, optional = true } prost = { workspace = true, optional = true } prost-types = { workspace = true, optional = true } + [target.'cfg(target_os = "macos")'.dependencies] snmalloc-rs = { workspace = true } diff --git a/raphtory/src/core/entities/edges/edge_store.rs b/raphtory/src/core/entities/edges/edge_store.rs index c4defa93d..3d6c0e682 100644 --- a/raphtory/src/core/entities/edges/edge_store.rs +++ b/raphtory/src/core/entities/edges/edge_store.rs @@ -115,6 +115,10 @@ impl EdgeStore { } } + pub fn initialised(&self) -> bool { + self.eid != EID::default() + } + pub fn as_edge_ref(&self) -> EdgeRef { EdgeRef::new_outgoing(self.eid, self.src, self.dst) } diff --git a/raphtory/src/core/entities/graph/logical_to_physical.rs b/raphtory/src/core/entities/graph/logical_to_physical.rs index 16b7c4a86..c06d17b78 100644 --- a/raphtory/src/core/entities/graph/logical_to_physical.rs +++ b/raphtory/src/core/entities/graph/logical_to_physical.rs @@ -7,7 +7,7 @@ use dashmap::mapref::entry::Entry; use either::Either; use once_cell::sync::OnceCell; use raphtory_api::core::{ - entities::{GidRef, VID}, + entities::{GidRef, GidType, VID}, storage::{dict_mapper::MaybeNew, FxDashMap}, }; use serde::{Deserialize, Deserializer, Serialize}; @@ -47,6 +47,12 @@ pub(crate) struct Mapping { } impl Mapping { + pub fn dtype(&self) -> Option { + self.map.get().map(|map| match map { + Map::U64(_) => GidType::U64, + Map::Str(_) => GidType::Str, + }) + } pub fn new() -> Self { Mapping { map: OnceCell::new(), @@ -69,7 +75,44 @@ impl Mapping { .ok_or_else(|| MutateGraphError::InvalidNodeId(gid.into()).into()) } - pub fn get_or_init<'a>( + pub fn get_or_init( + &self, + gid: GidRef, + next_id: impl FnOnce() -> VID, + ) -> Result, GraphError> { + let map = self.map.get_or_init(|| match &gid { + GidRef::U64(_) => Map::U64(FxDashMap::default()), + GidRef::Str(_) => Map::Str(FxDashMap::default()), + }); + let vid = match gid { + GidRef::U64(id) => map.as_u64().map(|m| match m.entry(id) { + Entry::Occupied(id) => MaybeNew::Existing(*id.get()), + Entry::Vacant(entry) => { + let vid = next_id(); + entry.insert(vid); + MaybeNew::New(vid) + } + }), + GidRef::Str(id) => map.as_str().map(|m| { + m.get(id) + .map(|vid| MaybeNew::Existing(*vid)) + .unwrap_or_else(|| match m.entry(id.to_owned()) { + Entry::Occupied(entry) => MaybeNew::Existing(*entry.get()), + Entry::Vacant(entry) => { + let vid = next_id(); + entry.insert(vid); + MaybeNew::New(vid) + } + }) + }), + }; + + vid.ok_or_else(|| GraphError::FailedToMutateGraph { + source: MutateGraphError::InvalidNodeId(gid.into()), + }) + } + + pub fn get_or_init_node<'a>( &self, gid: GidRef, f_init: impl FnOnce() -> UninitialisedEntry<'a, NodeStore>, diff --git a/raphtory/src/core/entities/graph/tgraph.rs b/raphtory/src/core/entities/graph/tgraph.rs index 82694a68e..c9cd67f1a 100644 --- a/raphtory/src/core/entities/graph/tgraph.rs +++ b/raphtory/src/core/entities/graph/tgraph.rs @@ -12,7 +12,7 @@ use crate::{ LayerIds, EID, VID, }, storage::{ - raw_edges::EdgeWGuard, + raw_edges::MutEdge, timeindex::{AsTime, TimeIndexEntry}, PairEntryMut, }, @@ -20,7 +20,6 @@ use crate::{ Direction, Prop, }, db::api::{storage::graph::edges::edge_storage_ops::EdgeStorageOps, view::Layer}, - DEFAULT_NUM_SHARDS, }; use dashmap::DashSet; use either::Either; @@ -81,7 +80,7 @@ impl std::fmt::Display for TemporalGraph { impl Default for TemporalGraph { fn default() -> Self { - Self::new(DEFAULT_NUM_SHARDS) + Self::new(rayon::current_num_threads()) } } @@ -344,7 +343,7 @@ impl TemporalGraph { eid: EID, t: TimeIndexEntry, layer: usize, - edge_fn: impl FnOnce(&mut EdgeWGuard) -> Result<(), GraphError>, + edge_fn: impl FnOnce(MutEdge) -> Result<(), GraphError>, ) -> Result<(), GraphError> { let (src, dst) = { let edge_r = self.storage.edges.get_edge(eid); @@ -357,10 +356,10 @@ impl TemporalGraph { self.link_nodes_inner(&mut node_pair, eid, t, layer)?; } let mut edge_w = self.storage.edges.get_edge_mut(eid); - edge_fn(&mut edge_w) + edge_fn(edge_w.as_mut()) } - pub(crate) fn link_nodes Result<(), GraphError>>( + pub(crate) fn link_nodes Result<(), GraphError>>( &self, src_id: VID, dst_id: VID, @@ -371,12 +370,12 @@ impl TemporalGraph { let edge = { let mut node_pair = self.storage.pair_node_mut(src_id, dst_id); let src = node_pair.get_i(); - let edge = match src.find_edge_eid(dst_id, &LayerIds::All) { + let mut edge = match src.find_edge_eid(dst_id, &LayerIds::All) { Some(edge_id) => Either::Left(self.storage.get_edge_mut(edge_id)), None => Either::Right(self.storage.push_edge(EdgeStore::new(src_id, dst_id))), }; - let eid = match edge.as_ref() { - Either::Left(edge) => edge.edge_store().eid, + let eid = match edge.as_mut() { + Either::Left(edge) => edge.as_ref().eid(), Either::Right(edge) => edge.value().eid, }; self.link_nodes_inner(&mut node_pair, eid, t, layer)?; @@ -385,13 +384,13 @@ impl TemporalGraph { match edge { Either::Left(mut edge) => { - edge_fn(&mut edge)?; - Ok(MaybeNew::Existing(edge.edge_store().eid)) + edge_fn(edge.as_mut())?; + Ok(MaybeNew::Existing(edge.as_ref().eid())) } Either::Right(edge) => { let mut edge = edge.init(); - edge_fn(&mut edge)?; - Ok(MaybeNew::New(edge.edge_store().eid)) + edge_fn(edge.as_mut())?; + Ok(MaybeNew::New(edge.as_ref().eid())) } } } diff --git a/raphtory/src/core/entities/graph/tgraph_storage.rs b/raphtory/src/core/entities/graph/tgraph_storage.rs index ae911a89f..ad49f42ae 100644 --- a/raphtory/src/core/entities/graph/tgraph_storage.rs +++ b/raphtory/src/core/entities/graph/tgraph_storage.rs @@ -5,7 +5,7 @@ use crate::core::{ raw_edges::{ EdgeArcGuard, EdgeRGuard, EdgeWGuard, EdgesStorage, LockedEdges, UninitialisedEdge, }, - Entry, EntryMut, PairEntryMut, UninitialisedEntry, + Entry, EntryMut, NodeStorage, PairEntryMut, UninitialisedEntry, }, }; use serde::{Deserialize, Serialize}; @@ -13,7 +13,7 @@ use serde::{Deserialize, Serialize}; #[derive(Debug, Deserialize, Serialize, PartialEq)] pub(crate) struct GraphStorage { // node storage with having (id, time_index, properties, adj list for each layer) - pub(crate) nodes: storage::RawStorage, + pub(crate) nodes: NodeStorage, pub(crate) edges: EdgesStorage, } @@ -21,7 +21,7 @@ pub(crate) struct GraphStorage { impl GraphStorage { pub(crate) fn new(num_locks: usize) -> Self { Self { - nodes: storage::RawStorage::new(num_locks), + nodes: storage::NodeStorage::new(num_locks), edges: EdgesStorage::new(num_locks), } } @@ -48,7 +48,7 @@ impl GraphStorage { #[inline] pub(crate) fn push_node(&self, node: NodeStore) -> UninitialisedEntry { - self.nodes.push(node, |vid, node| node.vid = vid.into()) + self.nodes.push(node) } #[inline] pub(crate) fn push_edge(&self, edge: EdgeStore) -> UninitialisedEdge { diff --git a/raphtory/src/core/entities/nodes/node_store.rs b/raphtory/src/core/entities/nodes/node_store.rs index 686f1ac19..ac7ad74b9 100644 --- a/raphtory/src/core/entities/nodes/node_store.rs +++ b/raphtory/src/core/entities/nodes/node_store.rs @@ -14,6 +14,7 @@ use crate::core::{ Direction, Prop, }; use itertools::Itertools; +use raphtory_api::core::entities::GidRef; use serde::{Deserialize, Serialize}; use std::{iter, ops::Deref}; @@ -31,27 +32,38 @@ pub struct NodeStore { } impl NodeStore { - pub fn new(global_id: GID, t: TimeIndexEntry) -> Self { + #[inline] + pub fn is_initialised(&self) -> bool { + self.vid != VID::default() + } + + #[inline] + pub fn init(&mut self, vid: VID, gid: GidRef) { + if !self.is_initialised() { + self.vid = vid; + self.global_id = gid.to_owned(); + } + } + + pub fn empty(global_id: GID) -> Self { let mut layers = Vec::with_capacity(1); layers.push(Adj::Solo); Self { global_id, - vid: 0.into(), - timestamps: TimeIndex::one(t.t()), + vid: VID(0), + timestamps: TimeIndex::Empty, layers, props: None, node_type: 0, } } - pub fn empty(global_id: GID) -> Self { - let mut layers = Vec::with_capacity(1); - layers.push(Adj::Solo); + pub fn resolved(global_id: GID, vid: VID) -> Self { Self { global_id, - vid: VID(0), - timestamps: TimeIndex::Empty, - layers, + vid, + timestamps: Default::default(), + layers: vec![], props: None, node_type: 0, } diff --git a/raphtory/src/core/storage/mod.rs b/raphtory/src/core/storage/mod.rs index 83962a493..bd8a4269b 100644 --- a/raphtory/src/core/storage/mod.rs +++ b/raphtory/src/core/storage/mod.rs @@ -1,5 +1,7 @@ +use crate::core::entities::nodes::node_store::NodeStore; use lock_api; use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard}; +use raphtory_api::core::entities::{GidRef, VID}; use rayon::prelude::*; use serde::{Deserialize, Serialize}; use std::{ @@ -19,7 +21,6 @@ pub mod sorted_vec_map; pub mod timeindex; type ArcRwLockReadGuard = lock_api::ArcRwLockReadGuard; - #[must_use] pub struct UninitialisedEntry<'a, T> { offset: usize, @@ -79,13 +80,12 @@ impl LockVec { } #[derive(Serialize, Deserialize, Debug)] -pub struct RawStorage { - pub(crate) data: Box<[LockVec]>, +pub struct NodeStorage { + pub(crate) data: Box<[LockVec]>, len: AtomicUsize, - _index: PhantomData, } -impl PartialEq for RawStorage { +impl PartialEq for NodeStorage { fn eq(&self, other: &Self) -> bool { self.data.eq(&other.data) } @@ -166,26 +166,20 @@ where } } -impl RawStorage -where - usize: From, -{ - pub fn count_with_filter bool + Send + Sync>(&self, f: F) -> usize { +impl NodeStorage { + pub fn count_with_filter bool + Send + Sync>(&self, f: F) -> usize { self.read_lock().par_iter().filter(|x| f(x)).count() } } -impl RawStorage -where - usize: From, -{ +impl NodeStorage { #[inline] fn resolve(&self, index: usize) -> (usize, usize) { resolve(index, self.data.len()) } #[inline] - pub fn read_lock(&self) -> ReadLockedStorage { + pub fn read_lock(&self) -> ReadLockedStorage { let guards = self .data .iter() @@ -198,21 +192,26 @@ where } } + pub(crate) fn write_lock(&self) -> WriteLockedNodes { + WriteLockedNodes { + guards: self.data.iter().map(|lock| lock.data.write()).collect(), + } + } + pub fn new(n_locks: usize) -> Self { - let data: Box<[LockVec]> = (0..n_locks) + let data: Box<[LockVec]> = (0..n_locks) .map(|_| LockVec::new()) .collect::>() .into(); Self { data, len: AtomicUsize::new(0), - _index: PhantomData, } } - pub fn push(&self, mut value: T, f: F) -> UninitialisedEntry { + pub fn push(&self, mut value: NodeStore) -> UninitialisedEntry { let index = self.len.fetch_add(1, Ordering::Relaxed); - f(index, &mut value); + value.vid = VID(index); let (bucket, offset) = self.resolve(index); let guard = self.data[bucket].data.write(); UninitialisedEntry { @@ -222,27 +221,26 @@ where } } - pub fn set(&self, index: Index, value: T) -> UninitialisedEntry { - let index = index.into(); + pub fn set(&self, value: NodeStore) { + let VID(index) = value.vid; self.len.fetch_max(index + 1, Ordering::Relaxed); let (bucket, offset) = self.resolve(index); - let guard = self.data[bucket].data.write(); - UninitialisedEntry { - offset, - guard, - value, + let mut guard = self.data[bucket].data.write(); + if guard.len() <= offset { + guard.resize_with(offset + 1, NodeStore::default) } + guard[offset] = value } #[inline] - pub fn entry(&self, index: Index) -> Entry<'_, T> { + pub fn entry(&self, index: VID) -> Entry<'_, NodeStore> { let index = index.into(); let (bucket, offset) = self.resolve(index); let guard = self.data[bucket].data.read_recursive(); Entry { offset, guard } } - pub fn entry_arc(&self, index: Index) -> ArcEntry { + pub fn entry_arc(&self, index: VID) -> ArcEntry { let index = index.into(); let (bucket, offset) = self.resolve(index); let guard = &self.data[bucket].data; @@ -253,7 +251,7 @@ where } } - pub fn entry_mut(&self, index: Index) -> EntryMut<'_, T> { + pub fn entry_mut(&self, index: VID) -> EntryMut<'_, NodeStore> { let index = index.into(); let (bucket, offset) = self.resolve(index); let guard = self.data[bucket].data.write(); @@ -261,7 +259,7 @@ where } // This helps get the right locks when adding an edge - pub fn pair_entry_mut(&self, i: Index, j: Index) -> PairEntryMut<'_, T> { + pub fn pair_entry_mut(&self, i: VID, j: VID) -> PairEntryMut<'_, NodeStore> { let i = i.into(); let j = j.into(); let (bucket_i, offset_i) = self.resolve(i); @@ -298,6 +296,84 @@ where pub fn len(&self) -> usize { self.len.load(Ordering::SeqCst) } + + pub fn next_id(&self) -> VID { + VID(self.len.fetch_add(1, Ordering::Relaxed)) + } +} + +pub struct WriteLockedNodes<'a> { + guards: Vec>>, +} + +pub struct NodeShardWriter<'a> { + shard: &'a mut Vec, + shard_id: usize, + num_shards: usize, +} + +impl<'a> NodeShardWriter<'a> { + #[inline] + fn resolve(&self, index: VID) -> Option { + let (shard_id, offset) = resolve(index.into(), self.num_shards); + (shard_id == self.shard_id).then_some(offset) + } + + #[inline] + pub fn get_mut(&mut self, index: VID) -> Option<&mut NodeStore> { + self.resolve(index).map(|offset| &mut self.shard[offset]) + } + + pub fn set(&mut self, vid: VID, gid: GidRef) { + if let Some(offset) = self.resolve(vid) { + if offset >= self.shard.len() { + self.shard.resize_with(offset + 1, NodeStore::default); + } + self.shard[offset] = NodeStore::resolved(gid.to_owned(), vid); + } + } + + pub fn shard_id(&self) -> usize { + self.shard_id + } + + fn resize(&mut self, new_global_len: usize) { + let mut new_len = new_global_len / self.num_shards; + if self.shard_id < new_global_len % self.num_shards { + new_len += 1; + } + if new_len > self.shard.len() { + self.shard.resize_with(new_len, Default::default) + } + } +} + +impl<'a> WriteLockedNodes<'a> { + pub fn par_iter_mut(&mut self) -> impl IndexedParallelIterator + '_ { + let num_shards = self.guards.len(); + let shards: Vec<&mut Vec> = self + .guards + .iter_mut() + .map(|guard| guard.deref_mut()) + .collect(); + shards + .into_par_iter() + .enumerate() + .map(move |(shard_id, shard)| NodeShardWriter { + shard, + shard_id, + num_shards, + }) + } + + pub fn resize(&mut self, new_len: usize) { + self.par_iter_mut() + .for_each(|mut shard| shard.resize(new_len)) + } + + pub fn num_shards(&self) -> usize { + self.guards.len() + } } #[derive(Debug)] @@ -401,75 +477,90 @@ impl<'a, T> DerefMut for EntryMut<'a, T> { #[cfg(test)] mod test { - use super::RawStorage; + use super::NodeStorage; + use crate::core::entities::nodes::node_store::NodeStore; use pretty_assertions::assert_eq; use quickcheck_macros::quickcheck; + use raphtory_api::core::entities::{GID, VID}; use rayon::prelude::{IntoParallelIterator, ParallelIterator}; + use std::borrow::Cow; #[test] fn add_5_values_to_storage() { - let storage = RawStorage::::new(2); + let storage = NodeStorage::new(2); for i in 0..5 { - storage.push(i.to_string(), |_, _| {}).init(); + storage.push(NodeStore::empty(i.into())).init(); } assert_eq!(storage.len(), 5); for i in 0..5 { - let entry = storage.entry(i); - assert_eq!(*entry, i.to_string()); + let entry = storage.entry(VID(i)); + assert_eq!(entry.vid, VID(i)); } let items_iter = storage.read_lock().into_iter(); - let actual = items_iter.map(|s| (*s).to_owned()).collect::>(); + let actual = items_iter.map(|s| s.vid.index()).collect::>(); - assert_eq!(actual, vec!["0", "2", "4", "1", "3"]); + assert_eq!(actual, vec![0, 2, 4, 1, 3]); } #[test] fn test_index_correctness() { - let storage = RawStorage::::new(2); + let storage = NodeStorage::new(2); for i in 0..5 { - storage.push(i.to_string(), |_, _| {}).init(); + storage.push(NodeStore::empty(i.into())).init(); } let locked = storage.read_lock(); - let actual: Vec<_> = (0..5).map(|i| (i, locked.get(i).as_str())).collect(); + let actual: Vec<_> = (0..5) + .map(|i| (i, locked.get(VID(i)).global_id.to_str())) + .collect(); + assert_eq!( actual, - vec![(0usize, "0"), (1, "1"), (2, "2"), (3, "3"), (4, "4")] + vec![ + (0usize, Cow::Borrowed("0")), + (1, "1".into()), + (2, "2".into()), + (3, "3".into()), + (4, "4".into()) + ] ); } #[test] fn test_entry() { - let storage = RawStorage::::new(2); + let storage = NodeStorage::new(2); for i in 0..5 { - storage.push(i.to_string(), |_, _| {}).init(); + storage.push(NodeStore::empty(i.into())).init(); } for i in 0..5 { - let entry = storage.entry(i); - assert_eq!(*entry, i.to_string()); + let entry = storage.entry(VID(i)); + assert_eq!(*entry.global_id.to_str(), i.to_string()); } } #[quickcheck] - fn concurrent_push(v: Vec) -> bool { - let storage = RawStorage::::new(16); + fn concurrent_push(v: Vec) -> bool { + let storage = NodeStorage::new(16); let mut expected = v .into_par_iter() .map(|v| { - storage.push(v, |_, _| {}).init(); + storage.push(NodeStore::empty(GID::U64(v))).init(); v }) .collect::>(); let locked = storage.read_lock(); - let mut actual: Vec<_> = locked.iter().copied().collect(); + let mut actual: Vec<_> = locked + .iter() + .map(|n| n.global_id.as_u64().unwrap()) + .collect(); actual.sort(); expected.sort(); diff --git a/raphtory/src/core/storage/raw_edges.rs b/raphtory/src/core/storage/raw_edges.rs index 0d84ed174..71b8c02ec 100644 --- a/raphtory/src/core/storage/raw_edges.rs +++ b/raphtory/src/core/storage/raw_edges.rs @@ -5,7 +5,6 @@ use crate::{ LayerIds, }, db::api::storage::graph::edges::edge_storage_ops::{EdgeStorageOps, MemEdge}, - DEFAULT_NUM_SHARDS, }; use lock_api::ArcRwLockReadGuard; use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard}; @@ -13,9 +12,9 @@ use raphtory_api::core::{entities::EID, storage::timeindex::TimeIndexEntry}; use rayon::prelude::*; use serde::{Deserialize, Serialize}; use std::{ - ops::Deref, + ops::{Deref, DerefMut}, sync::{ - atomic::{self, AtomicUsize}, + atomic::{self, AtomicUsize, Ordering}, Arc, }, }; @@ -108,7 +107,7 @@ impl PartialEq for EdgesStorage { impl Default for EdgesStorage { fn default() -> Self { - Self::new(DEFAULT_NUM_SHARDS) + Self::new(rayon::current_num_threads()) } } @@ -133,6 +132,10 @@ impl EdgesStorage { self.len.load(atomic::Ordering::SeqCst) } + pub fn next_id(&self) -> EID { + EID(self.len.fetch_add(1, Ordering::Relaxed)) + } + pub fn read_lock(&self) -> LockedEdges { LockedEdges { shards: self @@ -144,6 +147,12 @@ impl EdgesStorage { } } + pub fn write_lock(&self) -> WriteLockedEdges { + WriteLockedEdges { + shards: self.shards.iter().map(|shard| shard.write()).collect(), + } + } + #[inline] fn resolve(&self, index: usize) -> (usize, usize) { resolve(index, self.shards.len()) @@ -214,10 +223,28 @@ pub struct EdgeWGuard<'a> { } impl<'a> EdgeWGuard<'a> { - pub fn edge_store(&self) -> &EdgeStore { - &self.guard.edge_ids[self.i] + pub fn as_mut(&mut self) -> MutEdge { + MutEdge { + guard: self.guard.deref_mut(), + i: self.i, + } + } + + pub fn as_ref(&self) -> MemEdge { + MemEdge::new(&self.guard, self.i) + } + + pub fn eid(&self) -> EID { + self.as_ref().eid() } +} + +pub struct MutEdge<'a> { + guard: &'a mut EdgeShard, + i: usize, +} +impl<'a> MutEdge<'a> { pub fn edge_store_mut(&mut self) -> &mut EdgeStore { &mut self.guard.edge_ids[self.i] } @@ -348,3 +375,62 @@ impl LockedEdges { }) } } + +pub struct EdgeShardWriter<'a> { + shard: &'a mut EdgeShard, + shard_id: usize, + num_shards: usize, +} + +impl<'a> EdgeShardWriter<'a> { + /// Map an edge id to local offset if it is in the shard + fn resolve(&self, eid: EID) -> Option { + let EID(eid) = eid; + let (bucket, offset) = resolve(eid, self.num_shards); + (bucket == self.shard_id).then_some(offset) + } + + pub fn get_mut(&mut self, eid: EID) -> Option { + let offset = self.resolve(eid)?; + if self.shard.edge_ids.len() <= offset { + self.shard + .edge_ids + .resize_with(offset + 1, EdgeStore::default) + } + Some(MutEdge { + guard: self.shard, + i: offset, + }) + } + + pub fn shard_id(&self) -> usize { + self.shard_id + } +} + +pub struct WriteLockedEdges<'a> { + shards: Vec>, +} + +impl<'a> WriteLockedEdges<'a> { + pub fn par_iter_mut(&mut self) -> impl IndexedParallelIterator + '_ { + let num_shards = self.shards.len(); + let shards: Vec<_> = self + .shards + .iter_mut() + .map(|shard| shard.deref_mut()) + .collect(); + shards + .into_par_iter() + .enumerate() + .map(move |(shard_id, shard)| EdgeShardWriter { + shard, + shard_id, + num_shards, + }) + } + + pub fn num_shards(&self) -> usize { + self.shards.len() + } +} diff --git a/raphtory/src/core/utils/errors.rs b/raphtory/src/core/utils/errors.rs index b6b95d696..7bd6a7fb2 100644 --- a/raphtory/src/core/utils/errors.rs +++ b/raphtory/src/core/utils/errors.rs @@ -5,7 +5,10 @@ use polars_arrow::{datatypes::ArrowDataType, legacy::error}; use pometry_storage::RAError; #[cfg(feature = "python")] use pyo3::PyErr; -use raphtory_api::core::{entities::GID, storage::arc_str::ArcStr}; +use raphtory_api::core::{ + entities::{GidType, GID}, + storage::arc_str::ArcStr, +}; use std::{io, path::PathBuf}; #[cfg(feature = "search")] use tantivy; @@ -59,6 +62,10 @@ pub enum LoadError { MissingNodeError, #[error("Missing value for timestamp")] MissingTimeError, + #[error("Node IDs have the wrong type, expected {existing}, got {new}")] + NodeIdTypeError { existing: GidType, new: GidType }, + #[error("Fatal load error, graph may be in a dirty state.")] + FatalError, } #[cfg(feature = "proto")] diff --git a/raphtory/src/db/api/mutation/internal/internal_addition_ops.rs b/raphtory/src/db/api/mutation/internal/internal_addition_ops.rs index 8c3346d25..5adb417d3 100644 --- a/raphtory/src/db/api/mutation/internal/internal_addition_ops.rs +++ b/raphtory/src/db/api/mutation/internal/internal_addition_ops.rs @@ -5,13 +5,15 @@ use crate::{ utils::errors::GraphError, Prop, PropType, }, - db::api::view::internal::Base, + db::api::{storage::graph::locked::WriteLockedGraph, view::internal::Base}, }; use enum_dispatch::enum_dispatch; -use raphtory_api::core::storage::dict_mapper::MaybeNew; +use raphtory_api::core::{entities::GidType, storage::dict_mapper::MaybeNew}; #[enum_dispatch] pub trait InternalAdditionOps { + fn id_type(&self) -> Option; + fn write_lock(&self) -> Result; fn num_shards(&self) -> Result; /// get the sequence id for the next event fn next_event_id(&self) -> Result; @@ -102,6 +104,16 @@ pub trait DelegateAdditionOps { } impl InternalAdditionOps for G { + #[inline] + fn id_type(&self) -> Option { + self.graph().id_type() + } + + #[inline] + fn write_lock(&self) -> Result { + self.graph().write_lock() + } + #[inline] fn num_shards(&self) -> Result { self.graph().num_shards() diff --git a/raphtory/src/db/api/storage/graph/locked.rs b/raphtory/src/db/api/storage/graph/locked.rs index 5a8b421e2..f8e5a83fe 100644 --- a/raphtory/src/db/api/storage/graph/locked.rs +++ b/raphtory/src/db/api/storage/graph/locked.rs @@ -1,9 +1,13 @@ -use std::sync::Arc; - use crate::core::{ entities::{graph::tgraph::TemporalGraph, nodes::node_store::NodeStore, VID}, - storage::{raw_edges::LockedEdges, ReadLockedStorage}, + storage::{ + raw_edges::{LockedEdges, WriteLockedEdges}, + ReadLockedStorage, WriteLockedNodes, + }, + utils::errors::GraphError, }; +use raphtory_api::core::{entities::GidRef, storage::dict_mapper::MaybeNew}; +use std::sync::Arc; #[derive(Debug)] pub struct LockedGraph { @@ -51,3 +55,42 @@ impl Clone for LockedGraph { } } } + +pub struct WriteLockedGraph<'a> { + pub nodes: WriteLockedNodes<'a>, + pub edges: WriteLockedEdges<'a>, + pub graph: &'a TemporalGraph, +} + +impl<'a> WriteLockedGraph<'a> { + pub(crate) fn new(graph: &'a TemporalGraph) -> Self { + let nodes = graph.storage.nodes.write_lock(); + let edges = graph.storage.edges.write_lock(); + Self { + nodes, + edges, + graph, + } + } + + pub fn num_nodes(&self) -> usize { + self.graph.storage.nodes.len() + } + pub fn resolve_node(&self, gid: GidRef) -> Result, GraphError> { + self.graph + .logical_to_physical + .get_or_init(gid, || self.graph.storage.nodes.next_id()) + } + + pub fn num_shards(&self) -> usize { + self.nodes.num_shards().max(self.edges.num_shards()) + } + + pub fn edges_mut(&mut self) -> &mut WriteLockedEdges<'a> { + &mut self.edges + } + + pub fn graph(&self) -> &TemporalGraph { + &self.graph + } +} diff --git a/raphtory/src/db/api/storage/graph/storage_ops/additions.rs b/raphtory/src/db/api/storage/graph/storage_ops/additions.rs index 9128594be..bdda43a51 100644 --- a/raphtory/src/db/api/storage/graph/storage_ops/additions.rs +++ b/raphtory/src/db/api/storage/graph/storage_ops/additions.rs @@ -8,17 +8,25 @@ use crate::{ utils::errors::GraphError, PropType, }, - db::api::mutation::internal::InternalAdditionOps, + db::api::{mutation::internal::InternalAdditionOps, storage::graph::locked::WriteLockedGraph}, prelude::Prop, }; use either::Either; use raphtory_api::core::{ - entities::{EID, VID}, + entities::{GidType, EID, VID}, storage::{dict_mapper::MaybeNew, timeindex::TimeIndexEntry}, }; use std::sync::atomic::Ordering; impl InternalAdditionOps for TemporalGraph { + fn id_type(&self) -> Option { + self.logical_to_physical.dtype() + } + + fn write_lock(&self) -> Result { + Ok(WriteLockedGraph::new(self)) + } + fn num_shards(&self) -> Result { Ok(self.storage.nodes.data.len()) } @@ -40,7 +48,7 @@ impl InternalAdditionOps for TemporalGraph { fn resolve_node(&self, n: V) -> Result, GraphError> { match n.as_gid_ref() { Either::Left(id) => { - let ref_mut = self.logical_to_physical.get_or_init(id, || { + let ref_mut = self.logical_to_physical.get_or_init_node(id, || { let node_store = NodeStore::empty(id.into()); self.storage.push_node(node_store) })?; @@ -133,7 +141,7 @@ impl InternalAdditionOps for TemporalGraph { props: &[(usize, Prop)], layer: usize, ) -> Result, GraphError> { - self.link_nodes(src, dst, t, layer, move |edge| { + self.link_nodes(src, dst, t, layer, move |mut edge| { edge.additions_mut(layer).insert(t); if !props.is_empty() { let edge_layer = edge.layer_mut(layer); @@ -153,7 +161,7 @@ impl InternalAdditionOps for TemporalGraph { props: &[(usize, Prop)], layer: usize, ) -> Result<(), GraphError> { - self.link_edge(edge, t, layer, |edge| { + self.link_edge(edge, t, layer, |mut edge| { edge.additions_mut(layer).insert(t); if !props.is_empty() { let edge_layer = edge.layer_mut(layer); @@ -168,6 +176,22 @@ impl InternalAdditionOps for TemporalGraph { } impl InternalAdditionOps for GraphStorage { + fn id_type(&self) -> Option { + match self { + GraphStorage::Unlocked(storage) => storage.id_type(), + GraphStorage::Mem(storage) => storage.graph.id_type(), + #[cfg(feature = "storage")] + GraphStorage::Disk(storage) => Some(storage.inner().id_type()), + } + } + + fn write_lock(&self) -> Result { + match self { + GraphStorage::Unlocked(storage) => storage.write_lock(), + _ => Err(GraphError::AttemptToMutateImmutableGraph), + } + } + fn num_shards(&self) -> Result { match self { GraphStorage::Unlocked(storage) => storage.num_shards(), diff --git a/raphtory/src/db/api/storage/graph/storage_ops/deletions.rs b/raphtory/src/db/api/storage/graph/storage_ops/deletions.rs index febabaef1..f91d7ab09 100644 --- a/raphtory/src/db/api/storage/graph/storage_ops/deletions.rs +++ b/raphtory/src/db/api/storage/graph/storage_ops/deletions.rs @@ -16,7 +16,7 @@ impl InternalDeletionOps for TemporalGraph { dst: VID, layer: usize, ) -> Result, GraphError> { - self.link_nodes(src, dst, t, layer, |new_edge| { + self.link_nodes(src, dst, t, layer, |mut new_edge| { new_edge.deletions_mut(layer).insert(t); Ok(()) }) @@ -28,7 +28,7 @@ impl InternalDeletionOps for TemporalGraph { eid: EID, layer: usize, ) -> Result<(), GraphError> { - self.link_edge(eid, t, layer, |edge| { + self.link_edge(eid, t, layer, |mut edge| { edge.deletions_mut(layer).insert(t); Ok(()) }) diff --git a/raphtory/src/db/api/storage/graph/storage_ops/mod.rs b/raphtory/src/db/api/storage/graph/storage_ops/mod.rs index 668686efb..1557a93ea 100644 --- a/raphtory/src/db/api/storage/graph/storage_ops/mod.rs +++ b/raphtory/src/db/api/storage/graph/storage_ops/mod.rs @@ -13,6 +13,7 @@ use crate::{ properties::{graph_meta::GraphMeta, props::Meta}, LayerIds, EID, VID, }, + utils::errors::GraphError, Direction, }, db::api::{ @@ -22,7 +23,7 @@ use crate::{ edge_storage_ops::EdgeStorageOps, edges::{EdgesStorage, EdgesStorageRef}, }, - locked::LockedGraph, + locked::{LockedGraph, WriteLockedGraph}, nodes::{ node_owned_entry::NodeOwnedEntry, node_storage_ops::{NodeStorageIntoOps, NodeStorageOps}, @@ -125,6 +126,16 @@ impl GraphStorage { } } + pub fn write_lock(&self) -> Result { + match self { + GraphStorage::Unlocked(storage) => { + let locked = WriteLockedGraph::new(storage); + Ok(locked) + } + _ => Err(GraphError::AttemptToMutateImmutableGraph), + } + } + #[inline(always)] pub fn nodes(&self) -> NodesStorageEntry { match self { diff --git a/raphtory/src/db/api/storage/graph/storage_ops/prop_add.rs b/raphtory/src/db/api/storage/graph/storage_ops/prop_add.rs index c042e1459..9a8b6c1b0 100644 --- a/raphtory/src/db/api/storage/graph/storage_ops/prop_add.rs +++ b/raphtory/src/db/api/storage/graph/storage_ops/prop_add.rs @@ -84,6 +84,7 @@ impl InternalPropertyAdditionOps for TemporalGraph { props: &[(usize, Prop)], ) -> Result<(), GraphError> { let mut edge = self.storage.get_edge_mut(eid); + let mut edge = edge.as_mut(); let edge_layer = edge.layer_mut(layer); for (prop_id, prop) in props { let prop = self.process_prop_value(prop); @@ -110,6 +111,7 @@ impl InternalPropertyAdditionOps for TemporalGraph { props: &[(usize, Prop)], ) -> Result<(), GraphError> { let mut edge = self.storage.get_edge_mut(eid); + let mut edge = edge.as_mut(); let edge_layer = edge.layer_mut(layer); for (prop_id, prop) in props { let prop = self.process_prop_value(prop); diff --git a/raphtory/src/db/api/storage/storage.rs b/raphtory/src/db/api/storage/storage.rs index 8610530da..dae3fb0de 100644 --- a/raphtory/src/db/api/storage/storage.rs +++ b/raphtory/src/db/api/storage/storage.rs @@ -1,3 +1,5 @@ +#[cfg(feature = "proto")] +use crate::serialise::incremental::GraphWriter; use crate::{ core::{ entities::{ @@ -11,13 +13,16 @@ use crate::{ mutation::internal::{ InternalAdditionOps, InternalDeletionOps, InternalPropertyAdditionOps, }, - storage::graph::{nodes::node_storage_ops::NodeStorageOps, storage_ops::GraphStorage}, + storage::graph::{ + locked::WriteLockedGraph, nodes::node_storage_ops::NodeStorageOps, + storage_ops::GraphStorage, + }, view::{Base, InheritViewOps}, }, }; use once_cell::sync::OnceCell; use raphtory_api::core::{ - entities::{EID, VID}, + entities::{GidType, EID, VID}, storage::{dict_mapper::MaybeNew, timeindex::TimeIndexEntry}, }; use serde::{Deserialize, Serialize}; @@ -26,15 +31,14 @@ use std::{ sync::Arc, }; -#[cfg(feature = "proto")] -use crate::serialise::incremental::GraphWriter; - #[derive(Debug, Default, Serialize, Deserialize)] pub struct Storage { graph: GraphStorage, #[cfg(feature = "proto")] #[serde(skip)] pub(crate) cache: OnceCell, + // search index (tantivy) + // vector index } impl Display for Storage { @@ -80,6 +84,15 @@ impl Storage { impl InheritViewOps for Storage {} impl InternalAdditionOps for Storage { + #[inline] + fn id_type(&self) -> Option { + self.graph.id_type() + } + + fn write_lock(&self) -> Result { + self.graph.write_lock() + } + #[inline] fn num_shards(&self) -> Result { self.graph.num_shards() diff --git a/raphtory/src/db/api/view/internal/materialize.rs b/raphtory/src/db/api/view/internal/materialize.rs index 77f98b142..afed0292f 100644 --- a/raphtory/src/db/api/view/internal/materialize.rs +++ b/raphtory/src/db/api/view/internal/materialize.rs @@ -23,6 +23,7 @@ use crate::{ edge_entry::EdgeStorageEntry, edge_owned_entry::EdgeOwnedEntry, edge_ref::EdgeStorageRef, edges::EdgesStorage, }, + locked::WriteLockedGraph, nodes::{ node_entry::NodeStorageEntry, node_owned_entry::NodeOwnedEntry, nodes::NodesStorage, @@ -37,7 +38,10 @@ use crate::{ }; use chrono::{DateTime, Utc}; use enum_dispatch::enum_dispatch; -use raphtory_api::core::storage::{arc_str::ArcStr, dict_mapper::MaybeNew}; +use raphtory_api::core::{ + entities::GidType, + storage::{arc_str::ArcStr, dict_mapper::MaybeNew}, +}; use serde::{Deserialize, Serialize}; #[enum_dispatch(CoreGraphOps)] diff --git a/raphtory/src/db/graph/graph.rs b/raphtory/src/db/graph/graph.rs index 20c7c699d..e5d570773 100644 --- a/raphtory/src/db/graph/graph.rs +++ b/raphtory/src/db/graph/graph.rs @@ -546,13 +546,13 @@ mod db_tests { g.add_edge(*t, *src, *dst, NO_PROPS, None).unwrap(); } - let tmp_raphtory_path: TempDir = TempDir::new().expect("Failed to create tempdir"); + let tmp_raphtory_path: TempDir = TempDir::new().unwrap(); let graph_path = format!("{}/graph.bin", tmp_raphtory_path.path().display()); - g.encode(&graph_path).expect("Failed to save graph"); + g.encode(&graph_path).unwrap(); // Load from files - let g2 = Graph::decode(&graph_path).expect("Failed to load graph"); + let g2 = Graph::decode(&graph_path).unwrap(); assert_eq!(g, g2); diff --git a/raphtory/src/io/arrow/dataframe.rs b/raphtory/src/io/arrow/dataframe.rs index d3c95fd2a..d43de1704 100644 --- a/raphtory/src/io/arrow/dataframe.rs +++ b/raphtory/src/io/arrow/dataframe.rs @@ -9,6 +9,7 @@ use polars_arrow::{ datatypes::{ArrowDataType as DataType, TimeUnit}, }; use rayon::prelude::*; +use std::fmt::{Debug, Formatter}; pub(crate) struct DFView { pub names: Vec, @@ -16,6 +17,15 @@ pub(crate) struct DFView { pub num_rows: usize, } +impl Debug for DFView { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DFView") + .field("names", &self.names) + .field("num_rows", &self.num_rows) + .finish() + } +} + impl DFView where I: Iterator>, @@ -48,6 +58,9 @@ impl TimeCol { .as_any() .downcast_ref::>() .ok_or_else(|| LoadError::InvalidTimestamp(arr.data_type().clone()))?; + if arr.null_count() > 0 { + return Err(LoadError::MissingTimeError); + } let arr = if let DataType::Timestamp(_, _) = arr.data_type() { let array = cast::cast( arr, @@ -63,6 +76,7 @@ impl TimeCol { } else { arr.clone() }; + Ok(Self(arr)) } @@ -70,12 +84,16 @@ impl TimeCol { (0..self.0.len()).into_par_iter().map(|i| self.get(i)) } + pub fn iter(&self) -> impl Iterator + '_ { + self.0.values_iter().copied() + } + pub fn get(&self, i: usize) -> Option { self.0.get(i) } } -#[derive(Clone)] +#[derive(Clone, Debug)] pub(crate) struct DFChunk { pub(crate) chunk: Vec>, } diff --git a/raphtory/src/io/arrow/df_loaders.rs b/raphtory/src/io/arrow/df_loaders.rs index 72573865b..98a273067 100644 --- a/raphtory/src/io/arrow/df_loaders.rs +++ b/raphtory/src/io/arrow/df_loaders.rs @@ -1,5 +1,6 @@ use crate::{ core::{ + entities::LayerIds, utils::errors::{GraphError, LoadError}, PropType, }, @@ -11,11 +12,20 @@ use crate::{ prop_handler::*, }, prelude::*, + serialise::incremental::InternalCache, }; +use bytemuck::checked::cast_slice_mut; use kdam::{Bar, BarBuilder, BarExt}; -use raphtory_api::core::storage::{dict_mapper::MaybeNew, timeindex::TimeIndexEntry}; +use raphtory_api::{ + atomic_extra::atomic_usize_from_mut_slice, + core::{ + entities::EID, + storage::{dict_mapper::MaybeNew, timeindex::TimeIndexEntry}, + Direction, + }, +}; use rayon::prelude::*; -use std::collections::HashMap; +use std::{collections::HashMap, sync::atomic::Ordering}; fn build_progress_bar(des: String, num_rows: usize) -> Result { BarBuilder::default() @@ -41,7 +51,6 @@ fn process_shared_properties( } pub(crate) fn load_nodes_from_df< - 'a, G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps, >( df_view: DFView>>, @@ -136,7 +145,7 @@ pub(crate) fn load_nodes_from_df< pub(crate) fn load_edges_from_df< 'a, - G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps, + G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps + InternalCache, >( df_view: DFView>>, time: &str, @@ -178,6 +187,18 @@ pub(crate) fn load_edges_from_df< let _ = pb.update(0); let mut start_idx = graph.reserve_event_ids(df_view.num_rows)?; + let mut src_col_resolved = vec![]; + let mut dst_col_resolved = vec![]; + let mut eid_col_resolved = vec![]; + + let cache = graph.get_cache(); + let mut write_locked_graph = graph.write_lock()?; + let cache_shards = cache.map(|cache| { + (0..write_locked_graph.num_shards()) + .map(|_| cache.fork()) + .collect::>() + }); + for chunk in df_view.chunks { let df = chunk?; let prop_cols = combine_properties(properties, &properties_indices, &df, |key, dtype| { @@ -190,37 +211,177 @@ pub(crate) fn load_edges_from_df< |key, dtype| graph.resolve_edge_property(key, dtype, true), )?; let layer = lift_layer_col(layer, layer_index, &df)?; + let layer_col_resolved = layer.resolve(graph)?; + let src_col = df.node_col(src_index)?; + src_col.validate(graph, LoadError::MissingSrcError)?; + let dst_col = df.node_col(dst_index)?; + dst_col.validate(graph, LoadError::MissingDstError)?; + let time_col = df.time_col(time_index)?; + + // It's our graph, no one else can change it + src_col_resolved.resize_with(df.len(), Default::default); src_col .par_iter() - .zip(dst_col.par_iter()) - .zip(time_col.par_iter()) - .zip(layer.par_iter()) - .zip(prop_cols.par_rows()) - .zip(const_prop_cols.par_rows()) - .enumerate() - .try_for_each(|(idx, (((((src, dst), time), layer), t_props), c_props))| { - let src = src.ok_or(LoadError::MissingSrcError)?; - let dst = dst.ok_or(LoadError::MissingDstError)?; - let time = time.ok_or(LoadError::MissingTimeError)?; - let time_idx = TimeIndexEntry(time, start_idx + idx); - let src = graph.resolve_node(src)?.inner(); - let dst = graph.resolve_node(dst)?.inner(); - let layer = graph.resolve_layer(layer)?.inner(); - let t_props: Vec<_> = t_props.collect(); - let c_props: Vec<_> = c_props - .chain(shared_constant_properties.iter().cloned()) - .collect(); - let eid = graph - .internal_add_edge(time_idx, src, dst, &t_props, layer)? - .inner(); - if !c_props.is_empty() { - graph.internal_add_constant_edge_properties(eid, layer, &c_props)?; + .zip(src_col_resolved.par_iter_mut()) + .try_for_each(|(gid, resolved)| { + let gid = gid.ok_or(LoadError::FatalError)?; + let vid = write_locked_graph + .resolve_node(gid) + .map_err(|_| LoadError::FatalError)?; + if let Some(cache) = cache { + cache.resolve_node(vid, gid); + } + *resolved = vid.inner(); + Ok::<(), LoadError>(()) + })?; + + dst_col_resolved.resize_with(df.len(), Default::default); + dst_col + .par_iter() + .zip(dst_col_resolved.par_iter_mut()) + .try_for_each(|(gid, resolved)| { + let gid = gid.ok_or(LoadError::FatalError)?; + let vid = write_locked_graph + .resolve_node(gid) + .map_err(|_| LoadError::FatalError)?; + if let Some(cache) = cache { + cache.resolve_node(vid, gid); + } + *resolved = vid.inner(); + Ok::<(), LoadError>(()) + })?; + + write_locked_graph + .nodes + .resize(write_locked_graph.num_nodes()); + + // resolve all the edges + eid_col_resolved.resize_with(df.len(), Default::default); + let eid_col_shared = atomic_usize_from_mut_slice(cast_slice_mut(&mut eid_col_resolved)); + let g = write_locked_graph.graph; + let next_edge_id = || g.storage.edges.next_id(); + let update_time = |time| g.update_time(time); + write_locked_graph + .nodes + .par_iter_mut() + .for_each(|mut shard| { + for (row, ((((src, src_gid), dst), time), layer)) in src_col_resolved + .iter() + .zip(src_col.iter()) + .zip(dst_col_resolved.iter()) + .zip(time_col.iter()) + .zip(layer_col_resolved.iter()) + .enumerate() + { + if let Some(src_node) = shard.get_mut(*src) { + src_node.init(*src, src_gid); + update_time(TimeIndexEntry(time, start_idx + row)); + src_node.update_time(TimeIndexEntry(time, start_idx + row)); + let EID(eid) = match src_node.find_edge_eid(*dst, &LayerIds::All) { + None => { + let eid = next_edge_id(); + src_node.add_edge(*dst, Direction::OUT, *layer, eid); + if let Some(cache_shards) = cache_shards.as_ref() { + cache_shards[shard.shard_id()].resolve_edge( + MaybeNew::New(eid), + *src, + *dst, + ); + } + eid + } + Some(eid) => eid, + }; + eid_col_shared[row].store(eid, Ordering::Relaxed); + } + } + }); + + // link the destinations + write_locked_graph + .nodes + .par_iter_mut() + .for_each(|mut shard| { + for (row, ((((src, (dst, dst_gid)), eid), time), layer)) in src_col_resolved + .iter() + .zip(dst_col_resolved.iter().zip(dst_col.iter())) + .zip(eid_col_resolved.iter()) + .zip(time_col.iter()) + .zip(layer_col_resolved.iter()) + .enumerate() + { + if let Some(node) = shard.get_mut(*dst) { + node.init(*dst, dst_gid); + node.update_time(TimeIndexEntry(time, row + start_idx)); + node.add_edge(*src, Direction::IN, *layer, *eid) + } + } + }); + + write_locked_graph + .edges + .par_iter_mut() + .try_for_each(|mut shard| { + let mut t_props = vec![]; + let mut c_props = vec![]; + for (idx, ((((src, dst), time), eid), layer)) in src_col_resolved + .iter() + .zip(dst_col_resolved.iter()) + .zip(time_col.iter()) + .zip(eid_col_resolved.iter()) + .zip(layer_col_resolved.iter()) + .enumerate() + { + let shard_id = shard.shard_id(); + if let Some(mut edge) = shard.get_mut(*eid) { + let edge_store = edge.edge_store_mut(); + if !edge_store.initialised() { + edge_store.src = *src; + edge_store.dst = *dst; + edge_store.eid = *eid; + } + let t = TimeIndexEntry(time, start_idx + idx); + edge.additions_mut(*layer).insert(t); + t_props.clear(); + t_props.extend(prop_cols.iter_row(idx)); + + c_props.clear(); + c_props.extend(const_prop_cols.iter_row(idx)); + c_props.extend_from_slice(&shared_constant_properties); + + if let Some(caches) = cache_shards.as_ref() { + let cache = &caches[shard_id]; + cache.add_edge_update(t, *eid, &t_props, *layer); + cache.add_edge_cprops(*eid, *layer, &c_props); + } + + if !t_props.is_empty() || !c_props.is_empty() { + let edge_layer = edge.layer_mut(*layer); + + for (id, prop) in t_props.drain(..) { + edge_layer.add_prop(t, id, prop)?; + } + + for (id, prop) in c_props.drain(..) { + edge_layer.update_constant_prop(id, prop)?; + } + } + } } Ok::<(), GraphError>(()) })?; + if let Some(cache) = cache { + cache.write()?; + } + if let Some(cache_shards) = cache_shards.as_ref() { + for cache in cache_shards { + cache.write()?; + } + } + start_idx += df.len(); let _ = pb.update(df.len()); } @@ -434,3 +595,120 @@ pub(crate) fn load_edges_props_from_df< } Ok(()) } + +#[cfg(test)] +mod tests { + use crate::{ + core::utils::errors::GraphError, + db::graph::graph::assert_graph_equal, + io::arrow::{ + dataframe::{DFChunk, DFView}, + df_loaders::load_edges_from_df, + }, + prelude::*, + }; + use itertools::Itertools; + use polars_arrow::array::{MutableArray, MutablePrimitiveArray, MutableUtf8Array}; + use proptest::{ + prelude::{any, Strategy}, + proptest, + }; + use tempfile::NamedTempFile; + + fn build_edge_list( + len: usize, + num_nodes: u64, + ) -> impl Strategy> { + proptest::collection::vec( + ( + 0..num_nodes, + 0..num_nodes, + any::(), + any::(), + any::(), + ), + 0..=len, + ) + } + + fn build_df( + chunk_size: usize, + edges: &[(u64, u64, i64, String, i64)], + ) -> DFView>> { + let chunks = edges.iter().chunks(chunk_size); + let chunks = chunks + .into_iter() + .map(|chunk| { + let mut src_col = MutablePrimitiveArray::new(); + let mut dst_col = MutablePrimitiveArray::new(); + let mut time_col = MutablePrimitiveArray::new(); + let mut str_prop_col = MutableUtf8Array::::new(); + let mut int_prop_col = MutablePrimitiveArray::new(); + for (src, dst, time, str_prop, int_prop) in chunk { + src_col.push_value(*src); + dst_col.push_value(*dst); + time_col.push_value(*time); + str_prop_col.push(Some(str_prop)); + int_prop_col.push_value(*int_prop); + } + let chunk = vec![ + src_col.as_box(), + dst_col.as_box(), + time_col.as_box(), + str_prop_col.as_box(), + int_prop_col.as_box(), + ]; + Ok(DFChunk { chunk }) + }) + .collect_vec(); + DFView { + names: vec![ + "src".to_owned(), + "dst".to_owned(), + "time".to_owned(), + "str_prop".to_owned(), + "int_prop".to_owned(), + ], + chunks: chunks.into_iter(), + num_rows: edges.len(), + } + } + #[test] + fn test_load_edges() { + proptest!(|(edges in build_edge_list(1000, 100), chunk_size in 1usize..=1000)| { + let df_view = build_df(chunk_size, &edges); + let g = Graph::new(); + let props = ["str_prop", "int_prop"]; + load_edges_from_df(df_view, "time", "src", "dst", Some(&props), None, None, None, None, &g).unwrap(); + let g2 = Graph::new(); + for (src, dst, time, str_prop, int_prop) in edges { + g2.add_edge(time, src, dst, [("str_prop", str_prop.clone().into_prop()), ("int_prop", int_prop.into_prop())], None).unwrap(); + let edge = g.edge(src, dst).unwrap().at(time); + assert_eq!(edge.properties().get("str_prop").unwrap_str(), str_prop); + assert_eq!(edge.properties().get("int_prop").unwrap_i64(), int_prop); + } + assert_graph_equal(&g, &g2); + }) + } + + #[test] + fn test_load_edges_with_cache() { + proptest!(|(edges in build_edge_list(100, 100), chunk_size in 1usize..=100)| { + let df_view = build_df(chunk_size, &edges); + let g = Graph::new(); + let cache_file = NamedTempFile::new().unwrap(); + g.cache(cache_file.path()).unwrap(); + let props = ["str_prop", "int_prop"]; + load_edges_from_df(df_view, "time", "src", "dst", Some(&props), None, None, None, None, &g).unwrap(); + let g = Graph::load_cached(cache_file.path()).unwrap(); + let g2 = Graph::new(); + for (src, dst, time, str_prop, int_prop) in edges { + g2.add_edge(time, src, dst, [("str_prop", str_prop.clone().into_prop()), ("int_prop", int_prop.into_prop())], None).unwrap(); + let edge = g.edge(src, dst).unwrap().at(time); + assert_eq!(edge.properties().get("str_prop").unwrap_str(), str_prop); + assert_eq!(edge.properties().get("int_prop").unwrap_i64(), int_prop); + } + assert_graph_equal(&g, &g2); + }) + } +} diff --git a/raphtory/src/io/arrow/layer_col.rs b/raphtory/src/io/arrow/layer_col.rs index 3c0846bf1..2926eedea 100644 --- a/raphtory/src/io/arrow/layer_col.rs +++ b/raphtory/src/io/arrow/layer_col.rs @@ -1,11 +1,15 @@ use crate::{ core::utils::errors::{GraphError, LoadError}, + db::api::mutation::internal::InternalAdditionOps, io::arrow::dataframe::DFChunk, }; use polars_arrow::array::Utf8Array; -use rayon::iter::{ - plumbing::{Consumer, ProducerCallback, UnindexedConsumer}, - IndexedParallelIterator, IntoParallelIterator, ParallelIterator, +use rayon::{ + iter::{ + plumbing::{Consumer, ProducerCallback, UnindexedConsumer}, + IndexedParallelIterator, IntoParallelIterator, ParallelIterator, + }, + prelude::*, }; #[derive(Copy, Clone)] @@ -86,6 +90,29 @@ impl<'a> LayerCol<'a> { } } } + + pub fn resolve( + self, + graph: &(impl InternalAdditionOps + Send + Sync), + ) -> Result, GraphError> { + match self { + LayerCol::Name { name, len } => { + let layer = graph.resolve_layer(name)?.inner(); + Ok(vec![layer; len]) + } + col => { + let iter = col.par_iter(); + let mut res = vec![0usize; iter.len()]; + iter.zip(res.par_iter_mut()) + .try_for_each(|(layer, entry)| { + let layer = graph.resolve_layer(layer)?.inner(); + *entry = layer; + Ok::<(), GraphError>(()) + })?; + Ok(res) + } + } + } } pub(crate) fn lift_layer_col<'a>( diff --git a/raphtory/src/io/arrow/node_col.rs b/raphtory/src/io/arrow/node_col.rs index 0f589cc4e..76f32604a 100644 --- a/raphtory/src/io/arrow/node_col.rs +++ b/raphtory/src/io/arrow/node_col.rs @@ -1,16 +1,22 @@ -use crate::{core::utils::errors::LoadError, io::arrow::dataframe::DFChunk}; +use crate::{ + core::utils::errors::LoadError, db::api::mutation::internal::InternalAdditionOps, + io::arrow::dataframe::DFChunk, +}; use polars_arrow::{ array::{Array, PrimitiveArray, StaticArray, Utf8Array}, datatypes::ArrowDataType, offset::Offset, }; -use raphtory_api::core::entities::GidRef; +use raphtory_api::core::entities::{GidRef, GidType}; use rayon::prelude::{IndexedParallelIterator, *}; -trait NodeColOps: Send + Sync { +trait NodeColOps: Array + Send + Sync { + fn has_missing_values(&self) -> bool { + self.null_count() != 0 + } fn get(&self, i: usize) -> Option; - fn len(&self) -> usize; + fn dtype(&self) -> GidType; } impl NodeColOps for PrimitiveArray { @@ -18,8 +24,8 @@ impl NodeColOps for PrimitiveArray { StaticArray::get(self, i).map(GidRef::U64) } - fn len(&self) -> usize { - self.len() + fn dtype(&self) -> GidType { + GidType::U64 } } @@ -28,8 +34,8 @@ impl NodeColOps for PrimitiveArray { StaticArray::get(self, i).map(|v| GidRef::U64(v as u64)) } - fn len(&self) -> usize { - self.len() + fn dtype(&self) -> GidType { + GidType::U64 } } @@ -38,8 +44,8 @@ impl NodeColOps for PrimitiveArray { StaticArray::get(self, i).map(|v| GidRef::U64(v as u64)) } - fn len(&self) -> usize { - self.len() + fn dtype(&self) -> GidType { + GidType::U64 } } @@ -48,8 +54,8 @@ impl NodeColOps for PrimitiveArray { StaticArray::get(self, i).map(|v| GidRef::U64(v as u64)) } - fn len(&self) -> usize { - self.len() + fn dtype(&self) -> GidType { + GidType::U64 } } @@ -69,9 +75,8 @@ impl NodeColOps for Utf8Array { } } } - - fn len(&self) -> usize { - self.len() + fn dtype(&self) -> GidType { + GidType::Str } } @@ -139,6 +144,27 @@ impl NodeCol { pub fn par_iter(&self) -> impl IndexedParallelIterator>> + '_ { (0..self.0.len()).into_par_iter().map(|i| self.0.get(i)) } + + pub fn iter(&self) -> impl Iterator + '_ { + (0..self.0.len()).map(|i| self.0.get(i).unwrap()) + } + + pub fn validate( + &self, + graph: &impl InternalAdditionOps, + node_missing_error: LoadError, + ) -> Result<(), LoadError> { + if let Some(existing) = graph.id_type().filter(|&id_type| id_type != self.0.dtype()) { + return Err(LoadError::NodeIdTypeError { + existing, + new: self.0.dtype(), + }); + } + if self.0.has_missing_values() { + return Err(node_missing_error); + } + Ok(()) + } } pub fn lift_node_col(index: usize, df: &DFChunk) -> Result { diff --git a/raphtory/src/io/parquet_loaders.rs b/raphtory/src/io/parquet_loaders.rs index ee5a828ed..2182bd28a 100644 --- a/raphtory/src/io/parquet_loaders.rs +++ b/raphtory/src/io/parquet_loaders.rs @@ -9,6 +9,7 @@ use crate::{ }, io::arrow::{dataframe::*, df_loaders::*}, prelude::DeletionOps, + serialise::incremental::InternalCache, }; use itertools::Itertools; use polars_arrow::datatypes::{ArrowDataType as DataType, ArrowSchema, Field}; @@ -63,8 +64,8 @@ pub fn load_nodes_from_parquet< Ok(()) } -pub fn load_edges_from_parquet< - G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps, +pub(crate) fn load_edges_from_parquet< + G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps + InternalCache, >( graph: &G, parquet_path: impl AsRef, diff --git a/raphtory/src/lib.rs b/raphtory/src/lib.rs index 3668b1b84..f80cabb05 100644 --- a/raphtory/src/lib.rs +++ b/raphtory/src/lib.rs @@ -91,8 +91,6 @@ pub mod graphgen; #[cfg(target_os = "macos")] use snmalloc_rs; -pub const DEFAULT_NUM_SHARDS: usize = 128; - #[cfg(target_os = "macos")] #[global_allocator] static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc; diff --git a/raphtory/src/python/graph/graph.rs b/raphtory/src/python/graph/graph.rs index b9310ad4f..d384f8782 100644 --- a/raphtory/src/python/graph/graph.rs +++ b/raphtory/src/python/graph/graph.rs @@ -399,7 +399,7 @@ impl PyGraph { shared_constant_properties: Option>, ) -> Result<(), GraphError> { load_nodes_from_pandas( - self.graph.core_graph(), + &self.graph, df, time, id, @@ -477,7 +477,7 @@ impl PyGraph { layer_col: Option<&str>, ) -> Result<(), GraphError> { load_edges_from_pandas( - self.graph.core_graph(), + &self.graph, df, time, src, @@ -551,7 +551,7 @@ impl PyGraph { shared_constant_properties: Option>, ) -> Result<(), GraphError> { load_node_props_from_pandas( - self.graph.core_graph(), + &self.graph, df, id, node_type, @@ -615,7 +615,7 @@ impl PyGraph { layer_col: Option<&str>, ) -> Result<(), GraphError> { load_edge_props_from_pandas( - self.graph.core_graph(), + &self.graph, df, src, dst, diff --git a/raphtory/src/python/graph/graph_with_deletions.rs b/raphtory/src/python/graph/graph_with_deletions.rs index 3b8bc955d..ff6327c15 100644 --- a/raphtory/src/python/graph/graph_with_deletions.rs +++ b/raphtory/src/python/graph/graph_with_deletions.rs @@ -368,7 +368,7 @@ impl PyPersistentGraph { shared_constant_properties: Option>, ) -> Result<(), GraphError> { load_nodes_from_pandas( - self.graph.core_graph(), + &self.graph, df, time, id, @@ -452,7 +452,7 @@ impl PyPersistentGraph { layer_col: Option<&str>, ) -> Result<(), GraphError> { load_edges_from_pandas( - self.graph.core_graph(), + &self.graph, df, time, src, @@ -533,15 +533,7 @@ impl PyPersistentGraph { layer: Option<&str>, layer_col: Option<&str>, ) -> Result<(), GraphError> { - load_edge_deletions_from_pandas( - self.graph.core_graph(), - df, - time, - src, - dst, - layer, - layer_col, - ) + load_edge_deletions_from_pandas(&self.graph, df, time, src, dst, layer, layer_col) } /// Load edges deletions from a Parquet file into the graph. @@ -605,7 +597,7 @@ impl PyPersistentGraph { shared_constant_properties: Option>, ) -> Result<(), GraphError> { load_node_props_from_pandas( - self.graph.core_graph(), + &self.graph, df, id, node_type, @@ -679,7 +671,7 @@ impl PyPersistentGraph { layer_col: Option<&str>, ) -> Result<(), GraphError> { load_edge_props_from_pandas( - self.graph.core_graph(), + &self.graph, df, src, dst, diff --git a/raphtory/src/python/graph/io/pandas_loaders.rs b/raphtory/src/python/graph/io/pandas_loaders.rs index a7fac2e19..c8a7114b7 100644 --- a/raphtory/src/python/graph/io/pandas_loaders.rs +++ b/raphtory/src/python/graph/io/pandas_loaders.rs @@ -1,8 +1,12 @@ use crate::{ core::{utils::errors::GraphError, Prop}, - db::api::{storage::graph::storage_ops::GraphStorage, view::internal::CoreGraphOps}, + db::api::{ + mutation::internal::{InternalAdditionOps, InternalPropertyAdditionOps}, + view::StaticGraphViewOps, + }, io::arrow::{dataframe::*, df_loaders::*}, python::graph::io::*, + serialise::incremental::InternalCache, }; use polars_arrow::{array::Array, ffi}; use pyo3::{ @@ -12,8 +16,10 @@ use pyo3::{ }; use std::collections::HashMap; -pub fn load_nodes_from_pandas( - graph: &GraphStorage, +pub fn load_nodes_from_pandas< + G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps, +>( + graph: &G, df: &PyAny, time: &str, id: &str, @@ -47,8 +53,10 @@ pub fn load_nodes_from_pandas( }) } -pub fn load_edges_from_pandas( - graph: &GraphStorage, +pub(crate) fn load_edges_from_pandas< + G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps + InternalCache, +>( + graph: &G, df: &PyAny, time: &str, src: &str, @@ -84,8 +92,10 @@ pub fn load_edges_from_pandas( }) } -pub fn load_node_props_from_pandas( - graph: &GraphStorage, +pub(crate) fn load_node_props_from_pandas< + G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps, +>( + graph: &G, df: &PyAny, id: &str, node_type: Option<&str>, @@ -113,8 +123,10 @@ pub fn load_node_props_from_pandas( }) } -pub fn load_edge_props_from_pandas( - graph: &GraphStorage, +pub(crate) fn load_edge_props_from_pandas< + G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps, +>( + graph: &G, df: &PyAny, src: &str, dst: &str, @@ -144,8 +156,10 @@ pub fn load_edge_props_from_pandas( }) } -pub fn load_edge_deletions_from_pandas( - graph: &GraphStorage, +pub fn load_edge_deletions_from_pandas< + G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps, +>( + graph: &G, df: &PyAny, time: &str, src: &str, diff --git a/raphtory/src/search/mod.rs b/raphtory/src/search/mod.rs index ddc0ae361..ac19d6eca 100644 --- a/raphtory/src/search/mod.rs +++ b/raphtory/src/search/mod.rs @@ -19,7 +19,7 @@ use crate::{ mutation::internal::{ InheritPropertyAdditionOps, InternalAdditionOps, InternalDeletionOps, }, - storage::graph::edges::edge_storage_ops::EdgeStorageOps, + storage::graph::{edges::edge_storage_ops::EdgeStorageOps, locked::WriteLockedGraph}, view::{ internal::{DynamicGraph, InheritViewOps, IntoDynamic, Static}, Base, StaticGraphViewOps, @@ -29,7 +29,10 @@ use crate::{ }, prelude::*, }; -use raphtory_api::core::storage::{arc_str::ArcStr, dict_mapper::MaybeNew}; +use raphtory_api::core::{ + entities::GidType, + storage::{arc_str::ArcStr, dict_mapper::MaybeNew}, +}; use rayon::{prelude::ParallelIterator, slice::ParallelSlice}; use std::{collections::HashSet, ops::Deref, sync::Arc}; use tantivy::{ @@ -756,6 +759,15 @@ impl<'graph, G: GraphViewOps<'graph>> IndexedGraph { } impl InternalAdditionOps for IndexedGraph { + #[inline] + fn id_type(&self) -> Option { + self.graph.id_type() + } + + fn write_lock(&self) -> Result { + self.graph.write_lock() + } + #[inline] fn num_shards(&self) -> Result { self.graph.num_shards() diff --git a/raphtory/src/serialise/incremental.rs b/raphtory/src/serialise/incremental.rs index c7a6a0830..bb1f9192e 100644 --- a/raphtory/src/serialise/incremental.rs +++ b/raphtory/src/serialise/incremental.rs @@ -26,11 +26,12 @@ use std::{ mem, ops::DerefMut, path::Path, + sync::Arc, }; #[derive(Debug)] pub struct GraphWriter { - writer: Mutex, + writer: Arc>, proto_delta: Mutex, } @@ -49,7 +50,15 @@ fn try_write(writer: &mut File, bytes: &[u8]) -> Result<(), WriteError> { impl GraphWriter { pub fn new(file: File) -> Self { Self { - writer: Mutex::new(file), + writer: Arc::new(Mutex::new(file)), + proto_delta: Default::default(), + } + } + + /// Get an independent writer pointing at the same underlying cache file + pub fn fork(&self) -> Self { + GraphWriter { + writer: self.writer.clone(), proto_delta: Default::default(), } } @@ -207,11 +216,13 @@ impl GraphWriter { } pub fn add_edge_cprops(&self, edge: EID, layer: usize, props: &[(usize, Prop)]) { - self.proto_delta.lock().update_edge_cprops( - edge, - layer, - props.iter().map(|(id, prop)| (*id, prop)), - ) + if !props.is_empty() { + self.proto_delta.lock().update_edge_cprops( + edge, + layer, + props.iter().map(|(id, prop)| (*id, prop)), + ) + } } pub fn delete_edge(&self, edge: EID, t: TimeIndexEntry, layer: usize) { diff --git a/raphtory/src/serialise/serialise.rs b/raphtory/src/serialise/serialise.rs index 26b9be0ad..64179bd3f 100644 --- a/raphtory/src/serialise/serialise.rs +++ b/raphtory/src/serialise/serialise.rs @@ -712,7 +712,7 @@ impl StableDecode for TemporalGraph { let mut node_store = NodeStore::empty(gid.to_owned()); node_store.vid = vid; node_store.node_type = node.type_id as usize; - storage.storage.nodes.set(vid, node_store).init(); + storage.storage.nodes.set(node_store); Ok::<(), GraphError>(()) })?; graph.edges.par_iter().for_each(|edge| {