From 9cdacae93e687386be3f7a96a141151455a4fe3c Mon Sep 17 00:00:00 2001 From: Fabian Murariu <2404621+fabianmurariu@users.noreply.github.com> Date: Wed, 20 Nov 2024 17:33:10 +0000 Subject: [PATCH] Sparse Node temporal props (#1848) * Add support for sparse temporal properties for nodes * remove the need for name when adding sparse properties layer * remove some unused code and relegate progress bars to python * add time to PartialEq check on Properties * fix py3.13 warning and update storage --- pometry-storage-private | 2 +- raphtory-api/src/core/storage/timeindex.rs | 56 ++++++ raphtory-api/src/iter.rs | 12 ++ raphtory-api/src/lib.rs | 2 + .../src/executor/table_provider/edge.rs | 6 +- raphtory-cypher/src/transpiler/mod.rs | 2 +- raphtory/Cargo.toml | 2 +- .../core/entities/properties/graph_meta.rs | 4 +- raphtory/src/core/mod.rs | 34 ---- raphtory/src/core/storage/timeindex.rs | 56 ------ .../db/api/storage/graph/nodes/node_entry.rs | 2 +- raphtory/src/db/api/view/mod.rs | 13 +- raphtory/src/db/graph/graph.rs | 67 ------- .../disk_graph/graph_impl/edge_storage_ops.rs | 10 +- raphtory/src/disk_graph/graph_impl/mod.rs | 1 - .../graph_impl/time_index_into_ops.rs | 176 ------------------ raphtory/src/disk_graph/graph_impl/tprops.rs | 56 +----- .../src/disk_graph/storage_interface/node.rs | 35 ++-- raphtory/src/io/arrow/df_loaders.rs | 8 + raphtory/src/python/graph/disk_graph.rs | 15 +- raphtory/src/python/packages/vectors.rs | 84 +-------- raphtory/src/serialise/incremental.rs | 2 +- scripts/activate_private_storage.py | 2 +- 23 files changed, 135 insertions(+), 512 deletions(-) create mode 100644 raphtory-api/src/iter.rs delete mode 100644 raphtory/src/disk_graph/graph_impl/time_index_into_ops.rs diff --git a/pometry-storage-private b/pometry-storage-private index 8ac845b0db..0d993ebd36 160000 --- a/pometry-storage-private +++ b/pometry-storage-private @@ -1 +1 @@ -Subproject commit 8ac845b0dbeee01db27ef6ecfafd22ab8ad0e262 +Subproject commit 0d993ebd36ba637903c53b61b8e868ff0af63ca0 diff --git a/raphtory-api/src/core/storage/timeindex.rs b/raphtory-api/src/core/storage/timeindex.rs index 7b6d214dd4..c96e0002ca 100644 --- a/raphtory-api/src/core/storage/timeindex.rs +++ b/raphtory-api/src/core/storage/timeindex.rs @@ -23,6 +23,62 @@ pub trait AsTime: fmt::Debug + Copy + Ord + Eq + Send + Sync + 'static { fn new(t: i64, s: usize) -> Self; } +pub trait TimeIndexIntoOps: Sized { + type IndexType: AsTime; + type RangeType: TimeIndexIntoOps; + + fn into_range(self, w: Range) -> Self::RangeType; + + fn into_range_t(self, w: Range) -> Self::RangeType { + self.into_range(Self::IndexType::range(w)) + } + + fn into_iter(self) -> impl Iterator + Send; + + fn into_iter_t(self) -> impl Iterator + Send { + self.into_iter().map(|time| time.t()) + } +} + +pub trait TimeIndexOps: Send + Sync { + type IndexType: AsTime; + type RangeType<'a>: TimeIndexOps + 'a + where + Self: 'a; + + fn active(&self, w: Range) -> bool; + + fn active_t(&self, w: Range) -> bool { + self.active(Self::IndexType::range(w)) + } + + fn range(&self, w: Range) -> Self::RangeType<'_>; + + fn range_t(&self, w: Range) -> Self::RangeType<'_> { + self.range(Self::IndexType::range(w)) + } + + fn first_t(&self) -> Option { + self.first().map(|ti| ti.t()) + } + + fn first(&self) -> Option; + + fn last_t(&self) -> Option { + self.last().map(|ti| ti.t()) + } + + fn last(&self) -> Option; + + fn iter(&self) -> Box + Send + '_>; + + fn iter_t(&self) -> Box + Send + '_> { + Box::new(self.iter().map(|time| time.t())) + } + + fn len(&self) -> usize; +} + impl From for TimeIndexEntry { fn from(value: i64) -> Self { Self::start(value) diff --git a/raphtory-api/src/iter.rs b/raphtory-api/src/iter.rs new file mode 100644 index 0000000000..840687fa0e --- /dev/null +++ b/raphtory-api/src/iter.rs @@ -0,0 +1,12 @@ +pub type BoxedIter = Box + Send>; +pub type BoxedLIter<'a, T> = Box + Send + 'a>; + +pub trait IntoDynBoxed<'a, T> { + fn into_dyn_boxed(self) -> BoxedLIter<'a, T>; +} + +impl<'a, T, I: Iterator + Send + 'a> IntoDynBoxed<'a, T> for I { + fn into_dyn_boxed(self) -> BoxedLIter<'a, T> { + Box::new(self) + } +} diff --git a/raphtory-api/src/lib.rs b/raphtory-api/src/lib.rs index c90b7d8bb9..a5bb02383c 100644 --- a/raphtory-api/src/lib.rs +++ b/raphtory-api/src/lib.rs @@ -3,3 +3,5 @@ pub mod compute; pub mod core; #[cfg(feature = "python")] pub mod python; + +pub mod iter; diff --git a/raphtory-cypher/src/executor/table_provider/edge.rs b/raphtory-cypher/src/executor/table_provider/edge.rs index a980874c63..3312b2e005 100644 --- a/raphtory-cypher/src/executor/table_provider/edge.rs +++ b/raphtory-cypher/src/executor/table_provider/edge.rs @@ -54,7 +54,7 @@ impl EdgeListTableProvider { .as_ref() .layer(layer_id) .edges_storage() - .time() + .time_col() .values() .len(); @@ -186,7 +186,7 @@ fn produce_record_batch( let layer = graph.as_ref().layer(layer_id); let edges = layer.edges_storage(); - let chunked_lists_ts = edges.time(); + let chunked_lists_ts = edges.time_col(); let offsets = chunked_lists_ts.offsets(); // FIXME: potentially implement into_iter_chunks() for chunked arrays to avoid having to collect these chunks, if it turns out to be a problem let time_values_chunks = chunked_lists_ts @@ -257,7 +257,7 @@ fn produce_record_batch( let column_ids = layer .edges_storage() - .data_type() + .prop_dtypes() .iter() .enumerate() .skip(1) // first one is supposed to be time diff --git a/raphtory-cypher/src/transpiler/mod.rs b/raphtory-cypher/src/transpiler/mod.rs index 2b129d4847..ea9099edc1 100644 --- a/raphtory-cypher/src/transpiler/mod.rs +++ b/raphtory-cypher/src/transpiler/mod.rs @@ -262,7 +262,7 @@ fn scan_edges_as_sql_cte( // TODO: this needs to match the schema from EdgeListTableProvider fn full_layer_fields(graph: &DiskGraphStorage, layer_id: usize) -> Option { - let dt = graph.as_ref().layer(layer_id).edges_props_data_type(); + let dt = graph.as_ref().layer(layer_id).edges_props_data_type()?; let arr_dt: arrow_schema::DataType = dt.clone().into(); match arr_dt { arrow_schema::DataType::Struct(fields) => { diff --git a/raphtory/Cargo.toml b/raphtory/Cargo.toml index b21b95e78b..246d42176e 100644 --- a/raphtory/Cargo.toml +++ b/raphtory/Cargo.toml @@ -40,7 +40,7 @@ quad-rand = { workspace = true } serde_json = { workspace = true } ouroboros = { workspace = true } either = { workspace = true } -kdam = { workspace = true } +kdam = { workspace = true, optional = true} bytemuck = { workspace = true } tracing = { workspace = true } diff --git a/raphtory/src/core/entities/properties/graph_meta.rs b/raphtory/src/core/entities/properties/graph_meta.rs index e40837ff47..57abbcf780 100644 --- a/raphtory/src/core/entities/properties/graph_meta.rs +++ b/raphtory/src/core/entities/properties/graph_meta.rs @@ -11,7 +11,9 @@ use raphtory_api::core::storage::{ FxDashMap, }; use serde::{Deserialize, Serialize}; -use std::ops::{Deref, DerefMut}; +#[cfg(feature = "proto")] +use std::ops::Deref; +use std::ops::DerefMut; #[derive(Serialize, Deserialize, Debug)] pub struct GraphMeta { diff --git a/raphtory/src/core/mod.rs b/raphtory/src/core/mod.rs index 52611058f1..58205c3010 100644 --- a/raphtory/src/core/mod.rs +++ b/raphtory/src/core/mod.rs @@ -32,7 +32,6 @@ use chrono::{DateTime, NaiveDateTime, Utc}; use itertools::Itertools; use raphtory_api::core::storage::arc_str::ArcStr; use serde::{Deserialize, Serialize}; -use serde_json::Value; use std::{ cmp::Ordering, collections::HashMap, @@ -171,39 +170,6 @@ impl PartialOrd for Prop { } impl Prop { - pub fn to_json(&self) -> Value { - match self { - Prop::Str(value) => Value::String(value.to_string()), - Prop::U8(value) => Value::Number((*value).into()), - Prop::U16(value) => Value::Number((*value).into()), - Prop::I32(value) => Value::Number((*value).into()), - Prop::I64(value) => Value::Number((*value).into()), - Prop::U32(value) => Value::Number((*value).into()), - Prop::U64(value) => Value::Number((*value).into()), - Prop::F32(value) => Value::Number(serde_json::Number::from_f64(*value as f64).unwrap()), - Prop::F64(value) => Value::Number(serde_json::Number::from_f64(*value).unwrap()), - Prop::Bool(value) => Value::Bool(*value), - Prop::List(value) => { - let vec: Vec = value.iter().map(|v| v.to_json()).collect(); - Value::Array(vec) - } - Prop::Map(value) => { - let map: serde_json::Map = value - .iter() - .map(|(k, v)| (k.to_string(), v.to_json())) - .collect(); - Value::Object(map) - } - Prop::DTime(value) => Value::String(value.to_string()), - Prop::NDTime(value) => Value::String(value.to_string()), - Prop::Graph(_) => Value::String("Graph cannot be converted to JSON".to_string()), - Prop::PersistentGraph(_) => { - Value::String("Persistent Graph cannot be converted to JSON".to_string()) - } - Prop::Document(DocumentInput { content, .. }) => Value::String(content.to_owned()), // TODO: return Value::Object ?? - } - } - pub fn dtype(&self) -> PropType { match self { Prop::Str(_) => PropType::Str, diff --git a/raphtory/src/core/storage/timeindex.rs b/raphtory/src/core/storage/timeindex.rs index fe58e28693..e79e71c247 100644 --- a/raphtory/src/core/storage/timeindex.rs +++ b/raphtory/src/core/storage/timeindex.rs @@ -281,62 +281,6 @@ impl<'a, T: AsTime, Ops: TimeIndexOps, V: AsRef> + Send } } -pub trait TimeIndexOps: Send + Sync { - type IndexType: AsTime; - type RangeType<'a>: TimeIndexOps + 'a - where - Self: 'a; - - fn active(&self, w: Range) -> bool; - - fn active_t(&self, w: Range) -> bool { - self.active(Self::IndexType::range(w)) - } - - fn range(&self, w: Range) -> Self::RangeType<'_>; - - fn range_t(&self, w: Range) -> Self::RangeType<'_> { - self.range(Self::IndexType::range(w)) - } - - fn first_t(&self) -> Option { - self.first().map(|ti| ti.t()) - } - - fn first(&self) -> Option; - - fn last_t(&self) -> Option { - self.last().map(|ti| ti.t()) - } - - fn last(&self) -> Option; - - fn iter(&self) -> Box + Send + '_>; - - fn iter_t(&self) -> Box + Send + '_> { - Box::new(self.iter().map(|time| time.t())) - } - - fn len(&self) -> usize; -} - -pub trait TimeIndexIntoOps: Sized { - type IndexType: AsTime; - type RangeType: TimeIndexIntoOps; - - fn into_range(self, w: Range) -> Self::RangeType; - - fn into_range_t(self, w: Range) -> Self::RangeType { - self.into_range(Self::IndexType::range(w)) - } - - fn into_iter(self) -> impl Iterator + Send; - - fn into_iter_t(self) -> impl Iterator + Send { - self.into_iter().map(|time| time.t()) - } -} - impl TimeIndexOps for TimeIndex { type IndexType = T; type RangeType<'a> diff --git a/raphtory/src/db/api/storage/graph/nodes/node_entry.rs b/raphtory/src/db/api/storage/graph/nodes/node_entry.rs index efc5c11c8d..690aaf3d57 100644 --- a/raphtory/src/db/api/storage/graph/nodes/node_entry.rs +++ b/raphtory/src/db/api/storage/graph/nodes/node_entry.rs @@ -69,7 +69,7 @@ impl<'b> NodeStorageEntry<'b> { self, layers: &LayerIds, dir: Direction, - ) -> impl Iterator + 'b { + ) -> impl Iterator + use<'b, '_> { match self { NodeStorageEntry::Mem(entry) => { StorageVariants::Mem(GenLockedIter::from(entry, |entry| { diff --git a/raphtory/src/db/api/view/mod.rs b/raphtory/src/db/api/view/mod.rs index 8456aa491a..dff00df179 100644 --- a/raphtory/src/db/api/view/mod.rs +++ b/raphtory/src/db/api/view/mod.rs @@ -27,15 +27,4 @@ pub use node_property_filter::NodePropertyFilterOps; pub use reset_filter::*; pub use time::*; -pub type BoxedIter = Box + Send>; -pub type BoxedLIter<'a, T> = Box + Send + 'a>; - -pub trait IntoDynBoxed<'a, T> { - fn into_dyn_boxed(self) -> BoxedLIter<'a, T>; -} - -impl<'a, T, I: Iterator + Send + 'a> IntoDynBoxed<'a, T> for I { - fn into_dyn_boxed(self) -> BoxedLIter<'a, T> { - Box::new(self) - } -} +pub use raphtory_api::iter::{BoxedIter, BoxedLIter, IntoDynBoxed}; diff --git a/raphtory/src/db/graph/graph.rs b/raphtory/src/db/graph/graph.rs index 49f83120ea..3089a45a0d 100644 --- a/raphtory/src/db/graph/graph.rs +++ b/raphtory/src/db/graph/graph.rs @@ -608,73 +608,6 @@ mod db_tests { .all(|&(_, src, dst)| g.edge(src, dst).is_some()) } - #[test] - fn prop_json_test() { - let g = Graph::new(); - let _ = g.add_node(0, "A", NO_PROPS, None).unwrap(); - let _ = g.add_node(0, "B", NO_PROPS, None).unwrap(); - let e = g.add_edge(0, "A", "B", NO_PROPS, None).unwrap(); - e.add_constant_properties(vec![("aprop".to_string(), Prop::Bool(true))], None) - .unwrap(); - let ee = g.add_edge(0, "A", "B", NO_PROPS, Some("LAYERA")).unwrap(); - ee.add_constant_properties( - vec![("aprop".to_string(), Prop::Bool(false))], - Some("LAYERA"), - ) - .unwrap(); - let json_res = g - .edge("A", "B") - .unwrap() - .properties() - .constant() - .get("aprop") - .unwrap() - .to_json(); - let json_as_map = json_res.as_object().unwrap(); - assert_eq!(json_as_map.len(), 2); - assert_eq!(json_as_map.get("LAYERA"), Some(&Value::Bool(false))); - assert_eq!(json_as_map.get("_default"), Some(&Value::Bool(true))); - - let eee = g.add_edge(0, "A", "B", NO_PROPS, Some("LAYERB")).unwrap(); - let v: Vec = vec![Prop::Bool(true), Prop::Bool(false), Prop::U64(0)]; - eee.add_constant_properties( - vec![("bprop".to_string(), Prop::List(Arc::new(v)))], - Some("LAYERB"), - ) - .unwrap(); - let json_res = g - .edge("A", "B") - .unwrap() - .properties() - .constant() - .get("bprop") - .unwrap() - .to_json(); - let list_res = json_res.as_object().unwrap().get("LAYERB").unwrap(); - assert_eq!(list_res.as_array().unwrap().len(), 3); - - let eeee = g.add_edge(0, "A", "B", NO_PROPS, Some("LAYERC")).unwrap(); - let v: HashMap = HashMap::from([ - (ArcStr::from("H".to_string()), Prop::Bool(false)), - (ArcStr::from("Y".to_string()), Prop::U64(0)), - ]); - eeee.add_constant_properties( - vec![("mymap".to_string(), Prop::Map(Arc::new(v)))], - Some("LAYERC"), - ) - .unwrap(); - let json_res = g - .edge("A", "B") - .unwrap() - .properties() - .constant() - .get("mymap") - .unwrap() - .to_json(); - let map_res = json_res.as_object().unwrap().get("LAYERC").unwrap(); - assert_eq!(map_res.as_object().unwrap().len(), 2); - } - #[test] fn import_from_another_graph() { let g = Graph::new(); diff --git a/raphtory/src/disk_graph/graph_impl/edge_storage_ops.rs b/raphtory/src/disk_graph/graph_impl/edge_storage_ops.rs index d8e8388e03..f0c124ae72 100644 --- a/raphtory/src/disk_graph/graph_impl/edge_storage_ops.rs +++ b/raphtory/src/disk_graph/graph_impl/edge_storage_ops.rs @@ -9,7 +9,6 @@ use crate::{ tprop_storage_ops::TPropOps, variants::layer_variants::LayerVariants, }, - disk_graph::graph_impl::tprops::read_tprop_column, }; use pometry_storage::{edge::Edge, tprops::DiskTProp}; use raphtory_api::core::{entities::edges::edge_ref::EdgeRef, storage::timeindex::TimeIndexEntry}; @@ -110,11 +109,12 @@ impl<'a> EdgeStorageOps<'a> for Edge<'a> { fn temporal_prop_layer(self, layer_id: usize, prop_id: usize) -> impl TPropOps<'a> + Sync + 'a { self.graph() .localize_edge_prop_id(layer_id, prop_id) - .and_then(|local_id| { - self.temporal_prop_layer_inner(layer_id, local_id) - .map(|field| (field, local_id)) + .map(|prop_id| { + self.graph() + .layer(layer_id) + .edges_storage() + .prop(self.eid(), prop_id) }) - .and_then(|(field, prop_id)| read_tprop_column(self, prop_id, layer_id, field)) .unwrap_or(DiskTProp::empty()) } diff --git a/raphtory/src/disk_graph/graph_impl/mod.rs b/raphtory/src/disk_graph/graph_impl/mod.rs index 7b9dd344bc..6a909aa574 100644 --- a/raphtory/src/disk_graph/graph_impl/mod.rs +++ b/raphtory/src/disk_graph/graph_impl/mod.rs @@ -5,7 +5,6 @@ use crate::{core::utils::errors::GraphError, disk_graph::DiskGraphStorage, prelu mod edge_storage_ops; mod interop; pub mod prop_conversion; -mod time_index_into_ops; pub mod tprops; #[derive(Debug)] diff --git a/raphtory/src/disk_graph/graph_impl/time_index_into_ops.rs b/raphtory/src/disk_graph/graph_impl/time_index_into_ops.rs deleted file mode 100644 index 74f35a4995..0000000000 --- a/raphtory/src/disk_graph/graph_impl/time_index_into_ops.rs +++ /dev/null @@ -1,176 +0,0 @@ -use crate::{ - core::storage::timeindex::{TimeIndexIntoOps, TimeIndexOps}, - db::api::view::IntoDynBoxed, -}; -use pometry_storage::{ - prelude::{ArrayOps, BaseArrayOps}, - timestamps::TimeStamps, -}; -use raphtory_api::core::storage::timeindex::TimeIndexEntry; -use std::ops::Range; - -impl<'a> TimeIndexIntoOps for TimeStamps<'a, TimeIndexEntry> { - type IndexType = TimeIndexEntry; - - type RangeType = Self; - - fn into_range(self, w: Range) -> Self { - let start = self.position(&w.start); - let end = self.position(&w.end); - let (timestamps, sec_index) = self.into_inner(); - TimeStamps::new( - timestamps.sliced(start..end), - sec_index.map(|sec_index| sec_index.sliced(start..end)), - ) - } - - #[allow(refining_impl_trait)] - fn into_iter(self) -> impl Iterator + Send + 'static { - let (timestamps, sec_index) = self.into_inner(); - let sec_iter: Box + Send> = sec_index - .map(|v| v.into_owned().map(|i| i as usize).into_dyn_boxed()) - .unwrap_or(self.timestamps().range().clone().into_dyn_boxed()); - timestamps - .into_owned() - .zip(sec_iter) - .map(|(t, s)| TimeIndexEntry(t, s)) - } -} -impl<'a> TimeIndexIntoOps for TimeStamps<'a, i64> { - type IndexType = i64; - - type RangeType = Self; - - fn into_range(self, w: Range) -> Self { - let start = self.timestamps().partition_point(|i| i < w.start); - let end = self.timestamps().partition_point(|i| i < w.end); - let (timestamps, _) = self.into_inner(); - TimeStamps::new(timestamps.sliced(start..end), None) - } - fn into_iter(self) -> impl Iterator + Send { - let (timestamps, _) = self.into_inner(); - timestamps - } -} - -impl<'a> TimeIndexOps for TimeStamps<'a, TimeIndexEntry> { - type IndexType = TimeIndexEntry; - type RangeType<'b> - = TimeStamps<'b, TimeIndexEntry> - where - Self: 'b; - - fn len(&self) -> usize { - self.timestamps().len() - } - - fn active(&self, w: Range) -> bool { - let i = self.position(&w.start); - i < self.timestamps().len() && self.get(i) < w.end - } - - fn range(&self, w: Range) -> Self::RangeType<'_> { - let start = self.position(&w.start); - let end = self.position(&w.end); - TimeStamps::new( - self.timestamps().sliced(start..end), - self.sec_index() - .map(|sec_index| sec_index.sliced(start..end)), - ) - } - - fn first_t(&self) -> Option { - (self.timestamps().len() > 0).then(|| self.timestamps().get(0)) - } - - fn first(&self) -> Option { - if self.timestamps().len() == 0 { - return None; - } - let t = self.timestamps().get(0); - let sec = self.sec_index().as_ref().map(|arr| arr.get(0)).unwrap_or(0); - - Some(TimeIndexEntry::new(t, sec as usize)) - } - - fn last_t(&self) -> Option { - (self.timestamps().len() > 0).then(|| self.timestamps().get(self.timestamps().len() - 1)) - } - - fn last(&self) -> Option { - if self.timestamps().len() == 0 { - return None; - } - let last_idx = self.timestamps().len() - 1; - - let t = self.timestamps().get(last_idx); - let sec = self - .sec_index() - .as_ref() - .map(|arr| arr.get(last_idx)) - .unwrap_or(0); - - Some(TimeIndexEntry::new(t, sec as usize)) - } - - fn iter(&self) -> Box + Send + 'a> { - let sec_iter = self - .sec_index() - .map(|v| v.map(|i| i as usize).into_dyn_boxed()) - .unwrap_or(self.timestamps().range().clone().into_dyn_boxed()); - Box::new( - self.timestamps() - .into_iter() - .zip(sec_iter) - .map(|(t, s)| TimeIndexEntry(t, s)), - ) - } -} -impl<'a> TimeIndexOps for TimeStamps<'a, i64> { - type IndexType = i64; - type RangeType<'b> - = TimeStamps<'b, i64> - where - Self: 'b; - - fn len(&self) -> usize { - self.timestamps().len() - } - fn active(&self, w: Range) -> bool { - let i = self.timestamps().insertion_point(w.start); - i < self.timestamps().len() && self.timestamps().get(i) < w.end - } - - fn range(&self, w: Range) -> Self::RangeType<'_> { - let start = self.timestamps().partition_point(|i| i < w.start); - let end = self.timestamps().partition_point(|i| i < w.end); - TimeStamps::new( - self.timestamps().sliced(start..end), - self.sec_index() - .map(|sec_index| sec_index.sliced(start..end)), - ) - } - - fn first(&self) -> Option { - if self.timestamps().len() == 0 { - return None; - } - let t = self.timestamps().get(0); - Some(t) - } - - fn last(&self) -> Option { - if self.timestamps().len() == 0 { - return None; - } - - let last_idx = self.timestamps().len() - 1; - - let t = self.timestamps().get(last_idx); - Some(t) - } - - fn iter(&self) -> Box + Send + '_> { - Box::new(self.timestamps().into_iter()) - } -} diff --git a/raphtory/src/disk_graph/graph_impl/tprops.rs b/raphtory/src/disk_graph/graph_impl/tprops.rs index 492360e9d3..9591c98174 100644 --- a/raphtory/src/disk_graph/graph_impl/tprops.rs +++ b/raphtory/src/disk_graph/graph_impl/tprops.rs @@ -1,8 +1,5 @@ use crate::{ - arrow2::{ - datatypes::{ArrowDataType as DataType, Field}, - types::{NativeType, Offset}, - }, + arrow2::types::{NativeType, Offset}, core::storage::timeindex::TimeIndexIntoOps, db::api::{storage::graph::tprop_storage_ops::TPropOps, view::IntoDynBoxed}, prelude::Prop, @@ -10,9 +7,7 @@ use crate::{ use polars_arrow::array::Array; use pometry_storage::{ chunked_array::{bool_col::ChunkedBoolCol, col::ChunkedPrimitiveCol, utf8_col::StringCol}, - edge::Edge, prelude::{ArrayOps, BaseArrayOps}, - timestamps::TimeStamps, tprops::{DiskTProp, EmptyTProp, TPropColumn}, }; use raphtory_api::core::storage::timeindex::TimeIndexEntry; @@ -187,55 +182,6 @@ impl<'a, I: Offset> TPropOps<'a> for TPropColumn<'a, StringCol<'a, I>, TimeIndex } } -fn new_tprop_column( - edge: Edge, - id: usize, - layer_id: usize, -) -> Option, TimeIndexEntry>> -where - Prop: From, -{ - let props = edge.prop_values::(id, layer_id)?; - let timestamps = TimeStamps::new(edge.timestamp_slice(layer_id), None); - Some(TPropColumn::new(props, timestamps)) -} - -pub fn read_tprop_column( - edge: Edge, - id: usize, - layer_id: usize, - field: Field, -) -> Option> { - match field.data_type() { - DataType::Boolean => { - let props = edge.prop_bool_values(id, layer_id)?; - let timestamps = TimeStamps::new(edge.timestamp_slice(layer_id), None); - Some(DiskTProp::Bool(TPropColumn::new(props, timestamps))) - } - DataType::Int64 => new_tprop_column::(edge, id, layer_id).map(DiskTProp::I64), - DataType::Int32 => new_tprop_column::(edge, id, layer_id).map(DiskTProp::I32), - DataType::UInt32 => new_tprop_column::(edge, id, layer_id).map(DiskTProp::U32), - DataType::UInt64 => new_tprop_column::(edge, id, layer_id).map(DiskTProp::U64), - DataType::Float32 => new_tprop_column::(edge, id, layer_id).map(DiskTProp::F32), - DataType::Float64 => new_tprop_column::(edge, id, layer_id).map(DiskTProp::F64), - DataType::Utf8 => { - let props = edge.prop_str_values::(id, layer_id)?; - let timestamps = TimeStamps::new(edge.timestamp_slice(layer_id), None); - Some(DiskTProp::Str32(TPropColumn::new(props, timestamps))) - } - DataType::LargeUtf8 => { - let props = edge.prop_str_values::(id, layer_id)?; - let timestamps = TimeStamps::new(edge.timestamp_slice(layer_id), None); - Some(DiskTProp::Str64(TPropColumn::new(props, timestamps))) - } - DataType::Date64 => new_tprop_column::(edge, id, layer_id).map(DiskTProp::I64), - DataType::Timestamp(_, _) => { - new_tprop_column::(edge, id, layer_id).map(DiskTProp::I64) - } - _ => todo!(), - } -} - impl<'a> TPropOps<'a> for EmptyTProp { fn last_before(&self, _t: TimeIndexEntry) -> Option<(TimeIndexEntry, Prop)> { None diff --git a/raphtory/src/disk_graph/storage_interface/node.rs b/raphtory/src/disk_graph/storage_interface/node.rs index cf99884f03..29706e57a2 100644 --- a/raphtory/src/disk_graph/storage_interface/node.rs +++ b/raphtory/src/disk_graph/storage_interface/node.rs @@ -15,7 +15,7 @@ use crate::{ }; use itertools::Itertools; use polars_arrow::datatypes::ArrowDataType; -use pometry_storage::{graph::TemporalGraph, timestamps::TimeStamps, GidRef}; +use pometry_storage::{graph::TemporalGraph, timestamps::TimeStamps, tprops::DiskTProp, GidRef}; use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator}; use std::{borrow::Cow, iter, sync::Arc}; @@ -35,11 +35,14 @@ impl<'a> DiskNode<'a> { } } - pub fn temporal_node_prop_ids(self) -> Box + 'a> { - match &self.graph.node_properties().temporal_props { - Some(props) => Box::new(props.prop_dtypes().iter().enumerate().map(|(i, _)| i)), - None => Box::new(std::iter::empty()), - } + pub fn temporal_node_prop_ids(self) -> impl Iterator + 'a { + self.graph + .prop_mapping() + .nodes() + .into_iter() + .enumerate() + .filter(|(_, exists)| exists.is_some()) + .map(|(id, _)| id) } pub(crate) fn new(graph: &'a TemporalGraph, vid: VID) -> Self { @@ -176,13 +179,14 @@ impl<'a> DiskNode<'a> { .collect::>(), }; - if let Some(props) = &self.graph.node_properties().temporal_props { + for props in self.graph.node_properties().temporal_props() { let timestamps = props.timestamps::(self.vid); if timestamps.len() > 0 { let ts = timestamps.times(); additions.push(ts); } } + NodeAdditions::Col(additions) } } @@ -239,11 +243,16 @@ impl<'a> NodeStorageOps<'a> for DiskNode<'a> { fn tprop(self, prop_id: usize) -> impl TPropOps<'a> { self.graph - .node_properties() - .temporal_props - .as_ref() - .unwrap() - .prop(self.vid, prop_id) + .prop_mapping() + .localise_node_prop_id(prop_id) + .and_then(|(layer, local_prop_id)| { + self.graph + .node_properties() + .temporal_props() + .get(layer) + .map(|t_props| t_props.prop(self.vid, local_prop_id)) + }) + .unwrap_or(DiskTProp::empty()) } fn prop(self, prop_id: usize) -> Option { @@ -268,7 +277,7 @@ impl<'a> NodeStorageOps<'a> for DiskNode<'a> { self, layers: &LayerIds, dir: Direction, - ) -> Box + Send + 'a> { + ) -> impl Iterator + Send + 'a { //FIXME: something is capturing the &LayerIds lifetime when using impl Iterator Box::new(match dir { Direction::OUT => DirectionVariants::Out(self.out_edges(layers)), diff --git a/raphtory/src/io/arrow/df_loaders.rs b/raphtory/src/io/arrow/df_loaders.rs index 80ec595780..1a54ddf88c 100644 --- a/raphtory/src/io/arrow/df_loaders.rs +++ b/raphtory/src/io/arrow/df_loaders.rs @@ -15,6 +15,7 @@ use crate::{ serialise::incremental::InternalCache, }; use bytemuck::checked::cast_slice_mut; +#[cfg(feature = "python")] use kdam::{Bar, BarBuilder, BarExt}; use raphtory_api::{ atomic_extra::atomic_usize_from_mut_slice, @@ -27,6 +28,7 @@ use raphtory_api::{ use rayon::prelude::*; use std::{collections::HashMap, sync::atomic::Ordering}; +#[cfg(feature = "python")] fn build_progress_bar(des: String, num_rows: usize) -> Result { BarBuilder::default() .desc(des) @@ -414,6 +416,7 @@ pub(crate) fn load_edge_deletions_from_df< None }; let layer_index = layer_index.transpose()?; + #[cfg(feature = "python")] let mut pb = build_progress_bar("Loading edge deletions".to_string(), df_view.num_rows)?; let mut start_idx = graph.reserve_event_ids(df_view.num_rows)?; @@ -436,6 +439,7 @@ pub(crate) fn load_edge_deletions_from_df< graph.delete_edge((time, start_idx + idx), src, dst, layer)?; Ok::<(), GraphError>(()) })?; + #[cfg(feature = "python")] let _ = pb.update(df.len()); start_idx += df.len(); } @@ -480,6 +484,7 @@ pub(crate) fn load_node_props_from_df< .collect::, GraphError>>()?, None => vec![], }; + #[cfg(feature = "python")] let mut pb = build_progress_bar("Loading node properties".to_string(), df_view.num_rows)?; for chunk in df_view.chunks { let df = chunk?; @@ -512,6 +517,7 @@ pub(crate) fn load_node_props_from_df< } Ok::<(), GraphError>(()) })?; + #[cfg(feature = "python")] let _ = pb.update(df.len()); } Ok(()) @@ -543,6 +549,7 @@ pub(crate) fn load_edges_props_from_df< None }; let layer_index = layer_index.transpose()?; + #[cfg(feature = "python")] let mut pb = build_progress_bar("Loading edge properties".to_string(), df_view.num_rows)?; let shared_constant_properties = match shared_constant_properties { None => { @@ -596,6 +603,7 @@ pub(crate) fn load_edges_props_from_df< } Ok::<(), GraphError>(()) })?; + #[cfg(feature = "python")] let _ = pb.update(df.len()); } Ok(()) diff --git a/raphtory/src/python/graph/disk_graph.rs b/raphtory/src/python/graph/disk_graph.rs index 70b9f7b084..4cb1f9fa64 100644 --- a/raphtory/src/python/graph/disk_graph.rs +++ b/raphtory/src/python/graph/disk_graph.rs @@ -14,7 +14,7 @@ use crate::{ python::{graph::graph::PyGraph, types::repr::StructReprBuilder}, }; use itertools::Itertools; -use pometry_storage::graph::load_node_const_properties; +use pometry_storage::graph::{load_node_const_properties, TemporalGraph}; use pyo3::{exceptions::PyRuntimeError, prelude::*, pybacked::PyBackedStr, types::PyDict}; use std::{ ops::Deref, @@ -262,6 +262,19 @@ impl PyDiskGraph { Self::load_from_dir(self.graph_dir().to_path_buf()) } + #[pyo3(signature = (location, chunk_size=20_000_000))] + pub fn append_node_temporal_properties( + &self, + location: &str, + chunk_size: usize, + ) -> Result { + let path = PathBuf::from_str(location).unwrap(); + let chunks = read_struct_arrays(&path, None)?; + let mut graph = TemporalGraph::new(self.graph.inner().graph_dir())?; + graph.load_temporal_node_props_from_chunks(chunks, chunk_size, false)?; + Self::load_from_dir(self.graph_dir().to_path_buf()) + } + /// Merge this graph with another `DiskGraph`. Note that both graphs should have nodes that are /// sorted by their global ids or the resulting graph will be nonsense! fn merge_by_sorted_gids( diff --git a/raphtory/src/python/packages/vectors.rs b/raphtory/src/python/packages/vectors.rs index 84604d9bbc..40fd2c4d18 100644 --- a/raphtory/src/python/packages/vectors.rs +++ b/raphtory/src/python/packages/vectors.rs @@ -1,12 +1,6 @@ use crate::{ - core::{ - utils::{errors::GraphError, time::IntoTime}, - DocumentInput, Lifespan, Prop, - }, - db::api::{ - properties::{internal::PropertiesOps, Properties}, - view::{MaterializedGraph, StaticGraphViewOps}, - }, + core::utils::{errors::GraphError, time::IntoTime}, + db::api::view::{MaterializedGraph, StaticGraphViewOps}, prelude::{EdgeViewOps, GraphViewOps, NodeViewOps}, python::{ graph::{edge::PyEdge, node::PyNode, views::graph_view::PyGraphView}, @@ -21,7 +15,6 @@ use crate::{ Document, Embedding, EmbeddingFunction, EmbeddingResult, }, }; -use chrono::DateTime; use futures_util::future::BoxFuture; use itertools::Itertools; use pyo3::{ @@ -70,17 +63,6 @@ impl<'source> FromPyObject<'source> for PyQuery { } } -fn format_time(millis: i64) -> String { - if millis == 0 { - "unknown time".to_owned() - } else { - match DateTime::from_timestamp_millis(millis) { - Some(time) => time.naive_utc().format("%Y-%m-%d %H:%M:%S").to_string(), - None => "unknown time".to_owned(), - } - } -} - impl PyDocument { pub fn extract_rust_document(&self, py: Python) -> Result { if let (Some(entity), Some(embedding)) = (&self.entity, &self.embedding) { @@ -173,68 +155,6 @@ pub fn into_py_document( } } -/// This funtions ignores the time history of temporal props if their type is Document and they have a life different than Lifespan::Inherited -fn get_documents_from_props( - properties: Properties

, - name: &str, -) -> Box> { - let prop = properties.temporal().get(name); - - match prop { - Some(prop) => { - let props = prop.into_iter(); - let docs = props - .map(|(time, prop)| prop_to_docs(&prop, Lifespan::event(time)).collect_vec()) - .flatten(); - Box::new(docs) - } - None => match properties.get(name) { - Some(prop) => Box::new( - prop_to_docs(&prop, Lifespan::Inherited) - .collect_vec() - .into_iter(), - ), - _ => Box::new(std::iter::empty()), - }, - } -} - -impl Lifespan { - fn overwrite_inherited(&self, default_lifespan: Lifespan) -> Self { - match self { - Lifespan::Inherited => default_lifespan, - other => other.clone(), - } - } -} - -fn prop_to_docs( - prop: &Prop, - default_lifespan: Lifespan, -) -> Box + '_> { - match prop { - Prop::List(docs) => Box::new( - docs.iter() - .map(move |prop| prop_to_docs(prop, default_lifespan)) - .flatten(), - ), - Prop::Map(doc_map) => Box::new( - doc_map - .values() - .map(move |prop| prop_to_docs(prop, default_lifespan)) - .flatten(), - ), - Prop::Document(document) => Box::new(std::iter::once(DocumentInput { - life: document.life.overwrite_inherited(default_lifespan), - ..document.clone() - })), - prop => Box::new(std::iter::once(DocumentInput { - content: prop.to_string(), - life: default_lifespan, - })), - } -} - #[pymethods] impl PyGraphView { /// Create a VectorisedGraph from the current graph diff --git a/raphtory/src/serialise/incremental.rs b/raphtory/src/serialise/incremental.rs index f5a92558c0..5e17929084 100644 --- a/raphtory/src/serialise/incremental.rs +++ b/raphtory/src/serialise/incremental.rs @@ -232,7 +232,7 @@ impl GraphWriter { } } -pub(crate) trait InternalCache { +pub trait InternalCache { /// Initialise the cache by pointing it at a proto file. /// Future updates will be appended to the cache. fn init_cache(&self, path: &GraphFolder) -> Result<(), GraphError>; diff --git a/scripts/activate_private_storage.py b/scripts/activate_private_storage.py index 319c5fcd09..bea1370ebe 100755 --- a/scripts/activate_private_storage.py +++ b/scripts/activate_private_storage.py @@ -14,7 +14,7 @@ if "#[private-storage]" in line: next_line = lines[i + 1] if next_line.strip().startswith("#") and "pometry-storage" in next_line: - lines[i + 1] = re.sub(r"#\s*", "", next_line, 1) + lines[i + 1] = re.sub(r"#\s*", "", next_line, count=1) if "#[public-storage]" in line: next_line = lines[i + 1] if next_line.strip().startswith("pometry-storage"):