Skip to content

Commit

Permalink
Sparse Node temporal props (#1848)
Browse files Browse the repository at this point in the history
* Add support for sparse temporal properties for nodes

* remove the need for name when adding sparse properties layer

* remove some unused code and relegate progress bars to python

* add time to PartialEq check on Properties

* fix py3.13 warning and update storage
  • Loading branch information
fabianmurariu authored Nov 20, 2024
1 parent a12e398 commit 9cdacae
Show file tree
Hide file tree
Showing 23 changed files with 135 additions and 512 deletions.
2 changes: 1 addition & 1 deletion pometry-storage-private
56 changes: 56 additions & 0 deletions raphtory-api/src/core/storage/timeindex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,62 @@ pub trait AsTime: fmt::Debug + Copy + Ord + Eq + Send + Sync + 'static {
fn new(t: i64, s: usize) -> Self;
}

pub trait TimeIndexIntoOps: Sized {
type IndexType: AsTime;
type RangeType: TimeIndexIntoOps<IndexType = Self::IndexType>;

fn into_range(self, w: Range<Self::IndexType>) -> Self::RangeType;

fn into_range_t(self, w: Range<i64>) -> Self::RangeType {
self.into_range(Self::IndexType::range(w))
}

fn into_iter(self) -> impl Iterator<Item = Self::IndexType> + Send;

fn into_iter_t(self) -> impl Iterator<Item = i64> + Send {
self.into_iter().map(|time| time.t())
}
}

pub trait TimeIndexOps: Send + Sync {
type IndexType: AsTime;
type RangeType<'a>: TimeIndexOps<IndexType = Self::IndexType> + 'a
where
Self: 'a;

fn active(&self, w: Range<Self::IndexType>) -> bool;

fn active_t(&self, w: Range<i64>) -> bool {
self.active(Self::IndexType::range(w))
}

fn range(&self, w: Range<Self::IndexType>) -> Self::RangeType<'_>;

fn range_t(&self, w: Range<i64>) -> Self::RangeType<'_> {
self.range(Self::IndexType::range(w))
}

fn first_t(&self) -> Option<i64> {
self.first().map(|ti| ti.t())
}

fn first(&self) -> Option<Self::IndexType>;

fn last_t(&self) -> Option<i64> {
self.last().map(|ti| ti.t())
}

fn last(&self) -> Option<Self::IndexType>;

fn iter(&self) -> Box<dyn Iterator<Item = Self::IndexType> + Send + '_>;

fn iter_t(&self) -> Box<dyn Iterator<Item = i64> + Send + '_> {
Box::new(self.iter().map(|time| time.t()))
}

fn len(&self) -> usize;
}

impl From<i64> for TimeIndexEntry {
fn from(value: i64) -> Self {
Self::start(value)
Expand Down
12 changes: 12 additions & 0 deletions raphtory-api/src/iter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
pub type BoxedIter<T> = Box<dyn Iterator<Item = T> + Send>;
pub type BoxedLIter<'a, T> = Box<dyn Iterator<Item = T> + Send + 'a>;

pub trait IntoDynBoxed<'a, T> {
fn into_dyn_boxed(self) -> BoxedLIter<'a, T>;
}

impl<'a, T, I: Iterator<Item = T> + Send + 'a> IntoDynBoxed<'a, T> for I {
fn into_dyn_boxed(self) -> BoxedLIter<'a, T> {
Box::new(self)
}
}
2 changes: 2 additions & 0 deletions raphtory-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ pub mod compute;
pub mod core;
#[cfg(feature = "python")]
pub mod python;

pub mod iter;
6 changes: 3 additions & 3 deletions raphtory-cypher/src/executor/table_provider/edge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl EdgeListTableProvider {
.as_ref()
.layer(layer_id)
.edges_storage()
.time()
.time_col()
.values()
.len();

Expand Down Expand Up @@ -186,7 +186,7 @@ fn produce_record_batch(
let layer = graph.as_ref().layer(layer_id);
let edges = layer.edges_storage();

let chunked_lists_ts = edges.time();
let chunked_lists_ts = edges.time_col();
let offsets = chunked_lists_ts.offsets();
// FIXME: potentially implement into_iter_chunks() for chunked arrays to avoid having to collect these chunks, if it turns out to be a problem
let time_values_chunks = chunked_lists_ts
Expand Down Expand Up @@ -257,7 +257,7 @@ fn produce_record_batch(

let column_ids = layer
.edges_storage()
.data_type()
.prop_dtypes()
.iter()
.enumerate()
.skip(1) // first one is supposed to be time
Expand Down
2 changes: 1 addition & 1 deletion raphtory-cypher/src/transpiler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ fn scan_edges_as_sql_cte(

// TODO: this needs to match the schema from EdgeListTableProvider
fn full_layer_fields(graph: &DiskGraphStorage, layer_id: usize) -> Option<Fields> {
let dt = graph.as_ref().layer(layer_id).edges_props_data_type();
let dt = graph.as_ref().layer(layer_id).edges_props_data_type()?;
let arr_dt: arrow_schema::DataType = dt.clone().into();
match arr_dt {
arrow_schema::DataType::Struct(fields) => {
Expand Down
2 changes: 1 addition & 1 deletion raphtory/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ quad-rand = { workspace = true }
serde_json = { workspace = true }
ouroboros = { workspace = true }
either = { workspace = true }
kdam = { workspace = true }
kdam = { workspace = true, optional = true}
bytemuck = { workspace = true }
tracing = { workspace = true }

Expand Down
4 changes: 3 additions & 1 deletion raphtory/src/core/entities/properties/graph_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ use raphtory_api::core::storage::{
FxDashMap,
};
use serde::{Deserialize, Serialize};
use std::ops::{Deref, DerefMut};
#[cfg(feature = "proto")]
use std::ops::Deref;
use std::ops::DerefMut;

#[derive(Serialize, Deserialize, Debug)]
pub struct GraphMeta {
Expand Down
34 changes: 0 additions & 34 deletions raphtory/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use chrono::{DateTime, NaiveDateTime, Utc};
use itertools::Itertools;
use raphtory_api::core::storage::arc_str::ArcStr;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::{
cmp::Ordering,
collections::HashMap,
Expand Down Expand Up @@ -171,39 +170,6 @@ impl PartialOrd for Prop {
}

impl Prop {
pub fn to_json(&self) -> Value {
match self {
Prop::Str(value) => Value::String(value.to_string()),
Prop::U8(value) => Value::Number((*value).into()),
Prop::U16(value) => Value::Number((*value).into()),
Prop::I32(value) => Value::Number((*value).into()),
Prop::I64(value) => Value::Number((*value).into()),
Prop::U32(value) => Value::Number((*value).into()),
Prop::U64(value) => Value::Number((*value).into()),
Prop::F32(value) => Value::Number(serde_json::Number::from_f64(*value as f64).unwrap()),
Prop::F64(value) => Value::Number(serde_json::Number::from_f64(*value).unwrap()),
Prop::Bool(value) => Value::Bool(*value),
Prop::List(value) => {
let vec: Vec<Value> = value.iter().map(|v| v.to_json()).collect();
Value::Array(vec)
}
Prop::Map(value) => {
let map: serde_json::Map<String, Value> = value
.iter()
.map(|(k, v)| (k.to_string(), v.to_json()))
.collect();
Value::Object(map)
}
Prop::DTime(value) => Value::String(value.to_string()),
Prop::NDTime(value) => Value::String(value.to_string()),
Prop::Graph(_) => Value::String("Graph cannot be converted to JSON".to_string()),
Prop::PersistentGraph(_) => {
Value::String("Persistent Graph cannot be converted to JSON".to_string())
}
Prop::Document(DocumentInput { content, .. }) => Value::String(content.to_owned()), // TODO: return Value::Object ??
}
}

pub fn dtype(&self) -> PropType {
match self {
Prop::Str(_) => PropType::Str,
Expand Down
56 changes: 0 additions & 56 deletions raphtory/src/core/storage/timeindex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,62 +281,6 @@ impl<'a, T: AsTime, Ops: TimeIndexOps<IndexType = T>, V: AsRef<Vec<Ops>> + Send
}
}

pub trait TimeIndexOps: Send + Sync {
type IndexType: AsTime;
type RangeType<'a>: TimeIndexOps<IndexType = Self::IndexType> + 'a
where
Self: 'a;

fn active(&self, w: Range<Self::IndexType>) -> bool;

fn active_t(&self, w: Range<i64>) -> bool {
self.active(Self::IndexType::range(w))
}

fn range(&self, w: Range<Self::IndexType>) -> Self::RangeType<'_>;

fn range_t(&self, w: Range<i64>) -> Self::RangeType<'_> {
self.range(Self::IndexType::range(w))
}

fn first_t(&self) -> Option<i64> {
self.first().map(|ti| ti.t())
}

fn first(&self) -> Option<Self::IndexType>;

fn last_t(&self) -> Option<i64> {
self.last().map(|ti| ti.t())
}

fn last(&self) -> Option<Self::IndexType>;

fn iter(&self) -> Box<dyn Iterator<Item = Self::IndexType> + Send + '_>;

fn iter_t(&self) -> Box<dyn Iterator<Item = i64> + Send + '_> {
Box::new(self.iter().map(|time| time.t()))
}

fn len(&self) -> usize;
}

pub trait TimeIndexIntoOps: Sized {
type IndexType: AsTime;
type RangeType: TimeIndexIntoOps<IndexType = Self::IndexType>;

fn into_range(self, w: Range<Self::IndexType>) -> Self::RangeType;

fn into_range_t(self, w: Range<i64>) -> Self::RangeType {
self.into_range(Self::IndexType::range(w))
}

fn into_iter(self) -> impl Iterator<Item = Self::IndexType> + Send;

fn into_iter_t(self) -> impl Iterator<Item = i64> + Send {
self.into_iter().map(|time| time.t())
}
}

impl<T: AsTime> TimeIndexOps for TimeIndex<T> {
type IndexType = T;
type RangeType<'a>
Expand Down
2 changes: 1 addition & 1 deletion raphtory/src/db/api/storage/graph/nodes/node_entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl<'b> NodeStorageEntry<'b> {
self,
layers: &LayerIds,
dir: Direction,
) -> impl Iterator<Item = EdgeRef> + 'b {
) -> impl Iterator<Item = EdgeRef> + use<'b, '_> {
match self {
NodeStorageEntry::Mem(entry) => {
StorageVariants::Mem(GenLockedIter::from(entry, |entry| {
Expand Down
13 changes: 1 addition & 12 deletions raphtory/src/db/api/view/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,4 @@ pub use node_property_filter::NodePropertyFilterOps;
pub use reset_filter::*;
pub use time::*;

pub type BoxedIter<T> = Box<dyn Iterator<Item = T> + Send>;
pub type BoxedLIter<'a, T> = Box<dyn Iterator<Item = T> + Send + 'a>;

pub trait IntoDynBoxed<'a, T> {
fn into_dyn_boxed(self) -> BoxedLIter<'a, T>;
}

impl<'a, T, I: Iterator<Item = T> + Send + 'a> IntoDynBoxed<'a, T> for I {
fn into_dyn_boxed(self) -> BoxedLIter<'a, T> {
Box::new(self)
}
}
pub use raphtory_api::iter::{BoxedIter, BoxedLIter, IntoDynBoxed};
67 changes: 0 additions & 67 deletions raphtory/src/db/graph/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -608,73 +608,6 @@ mod db_tests {
.all(|&(_, src, dst)| g.edge(src, dst).is_some())
}

#[test]
fn prop_json_test() {
let g = Graph::new();
let _ = g.add_node(0, "A", NO_PROPS, None).unwrap();
let _ = g.add_node(0, "B", NO_PROPS, None).unwrap();
let e = g.add_edge(0, "A", "B", NO_PROPS, None).unwrap();
e.add_constant_properties(vec![("aprop".to_string(), Prop::Bool(true))], None)
.unwrap();
let ee = g.add_edge(0, "A", "B", NO_PROPS, Some("LAYERA")).unwrap();
ee.add_constant_properties(
vec![("aprop".to_string(), Prop::Bool(false))],
Some("LAYERA"),
)
.unwrap();
let json_res = g
.edge("A", "B")
.unwrap()
.properties()
.constant()
.get("aprop")
.unwrap()
.to_json();
let json_as_map = json_res.as_object().unwrap();
assert_eq!(json_as_map.len(), 2);
assert_eq!(json_as_map.get("LAYERA"), Some(&Value::Bool(false)));
assert_eq!(json_as_map.get("_default"), Some(&Value::Bool(true)));

let eee = g.add_edge(0, "A", "B", NO_PROPS, Some("LAYERB")).unwrap();
let v: Vec<Prop> = vec![Prop::Bool(true), Prop::Bool(false), Prop::U64(0)];
eee.add_constant_properties(
vec![("bprop".to_string(), Prop::List(Arc::new(v)))],
Some("LAYERB"),
)
.unwrap();
let json_res = g
.edge("A", "B")
.unwrap()
.properties()
.constant()
.get("bprop")
.unwrap()
.to_json();
let list_res = json_res.as_object().unwrap().get("LAYERB").unwrap();
assert_eq!(list_res.as_array().unwrap().len(), 3);

let eeee = g.add_edge(0, "A", "B", NO_PROPS, Some("LAYERC")).unwrap();
let v: HashMap<ArcStr, Prop> = HashMap::from([
(ArcStr::from("H".to_string()), Prop::Bool(false)),
(ArcStr::from("Y".to_string()), Prop::U64(0)),
]);
eeee.add_constant_properties(
vec![("mymap".to_string(), Prop::Map(Arc::new(v)))],
Some("LAYERC"),
)
.unwrap();
let json_res = g
.edge("A", "B")
.unwrap()
.properties()
.constant()
.get("mymap")
.unwrap()
.to_json();
let map_res = json_res.as_object().unwrap().get("LAYERC").unwrap();
assert_eq!(map_res.as_object().unwrap().len(), 2);
}

#[test]
fn import_from_another_graph() {
let g = Graph::new();
Expand Down
10 changes: 5 additions & 5 deletions raphtory/src/disk_graph/graph_impl/edge_storage_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use crate::{
tprop_storage_ops::TPropOps,
variants::layer_variants::LayerVariants,
},
disk_graph::graph_impl::tprops::read_tprop_column,
};
use pometry_storage::{edge::Edge, tprops::DiskTProp};
use raphtory_api::core::{entities::edges::edge_ref::EdgeRef, storage::timeindex::TimeIndexEntry};
Expand Down Expand Up @@ -110,11 +109,12 @@ impl<'a> EdgeStorageOps<'a> for Edge<'a> {
fn temporal_prop_layer(self, layer_id: usize, prop_id: usize) -> impl TPropOps<'a> + Sync + 'a {
self.graph()
.localize_edge_prop_id(layer_id, prop_id)
.and_then(|local_id| {
self.temporal_prop_layer_inner(layer_id, local_id)
.map(|field| (field, local_id))
.map(|prop_id| {
self.graph()
.layer(layer_id)
.edges_storage()
.prop(self.eid(), prop_id)
})
.and_then(|(field, prop_id)| read_tprop_column(self, prop_id, layer_id, field))
.unwrap_or(DiskTProp::empty())
}

Expand Down
1 change: 0 additions & 1 deletion raphtory/src/disk_graph/graph_impl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use crate::{core::utils::errors::GraphError, disk_graph::DiskGraphStorage, prelu
mod edge_storage_ops;
mod interop;
pub mod prop_conversion;
mod time_index_into_ops;
pub mod tprops;

#[derive(Debug)]
Expand Down
Loading

0 comments on commit 9cdacae

Please sign in to comment.