From 36538937ec9e5075f9fc50452bdc90459fd40f8d Mon Sep 17 00:00:00 2001 From: Shivam Kapoor <4599890+iamsmkr@users.noreply.github.com> Date: Fri, 24 May 2024 16:27:25 +0100 Subject: [PATCH] fix arrowgraph for gql --- Cargo.lock | 1 + raphtory-graphql/Cargo.toml | 3 + raphtory-graphql/src/lib.rs | 82 +++++++- raphtory-graphql/src/server.rs | 4 +- raphtory/src/arrow/graph_impl/core_ops.rs | 2 +- raphtory/src/arrow/graph_impl/mod.rs | 182 ++++++++++++++++-- .../src/arrow/graph_impl/prop_conversion.rs | 1 + .../src/db/api/view/internal/materialize.rs | 11 +- raphtory/src/python/graph/views/graph_view.rs | 1 + raphtory/src/search/mod.rs | 15 +- 10 files changed, 281 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a385ab7f09..ade6e9e08f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3559,6 +3559,7 @@ dependencies = [ "polars-error", "polars-utils", "ryu", + "serde", "simdutf8", "streaming-iterator", "strength_reduce", diff --git a/raphtory-graphql/Cargo.toml b/raphtory-graphql/Cargo.toml index b787fcc79b..8c47b69b59 100644 --- a/raphtory-graphql/Cargo.toml +++ b/raphtory-graphql/Cargo.toml @@ -45,3 +45,6 @@ toml = { workspace = true } [dev-dependencies] tempfile = { workspace = true } + +[features] +arrow = ["raphtory/arrow"] diff --git a/raphtory-graphql/src/lib.rs b/raphtory-graphql/src/lib.rs index c6372c3748..f8d04e36f6 100644 --- a/raphtory-graphql/src/lib.rs +++ b/raphtory-graphql/src/lib.rs @@ -40,12 +40,13 @@ mod graphql_test { use async_graphql::UploadValue; use dynamic_graphql::{Request, Variables}; use raphtory::{ + arrow::graph_impl::ArrowGraph, db::{api::view::IntoDynamic, graph::views::deletion_graph::PersistentGraph}, prelude::*, }; use serde_json::json; - use std::collections::HashMap; - use tempfile::tempdir; + use std::{collections::HashMap, path::Path}; + use tempfile::{tempdir, TempDir}; #[tokio::test] async fn search_for_gandalf_query() { @@ -67,6 +68,7 @@ mod graphql_test { ) .expect("Could not add node!"); + let graph: MaterializedGraph = graph.into(); let graphs = HashMap::from([("lotr".to_string(), graph)]); let data = Data::from_map(graphs); let schema = App::create_schema().data(data).finish().unwrap(); @@ -105,6 +107,7 @@ mod graphql_test { .add_node(0, 11, NO_PROPS, None) .expect("Could not add node!"); + let graph: MaterializedGraph = graph.into(); let graphs = HashMap::from([("lotr".to_string(), graph)]); let data = Data::from_map(graphs); @@ -152,7 +155,7 @@ mod graphql_test { None, ) .unwrap(); - + let graph: MaterializedGraph = graph.into(); let graphs = HashMap::from([("graph".to_string(), graph)]); let data = Data::from_map(graphs); let schema = App::create_schema().data(data).finish().unwrap(); @@ -474,4 +477,77 @@ mod graphql_test { let graph_roundtrip = url_decode_graph(graph_encoded).unwrap().into_dynamic(); assert_eq!(g, graph_roundtrip); } + + #[cfg(feature = "arrow")] + #[tokio::test] + async fn test_arrow_graph() { + let graph = Graph::new(); + graph.add_constant_properties([("name", "graph")]).unwrap(); + graph.add_node(1, 1, NO_PROPS, Some("a")).unwrap(); + graph.add_node(1, 2, NO_PROPS, Some("b")).unwrap(); + graph.add_node(1, 3, NO_PROPS, Some("b")).unwrap(); + graph.add_node(1, 4, NO_PROPS, Some("a")).unwrap(); + graph.add_node(1, 5, NO_PROPS, Some("c")).unwrap(); + graph.add_node(1, 6, NO_PROPS, Some("e")).unwrap(); + graph.add_edge(22, 1, 2, NO_PROPS, Some("a")).unwrap(); + graph.add_edge(22, 3, 2, NO_PROPS, Some("a")).unwrap(); + graph.add_edge(22, 2, 4, NO_PROPS, Some("a")).unwrap(); + graph.add_edge(22, 4, 5, NO_PROPS, Some("a")).unwrap(); + graph.add_edge(22, 4, 5, NO_PROPS, Some("a")).unwrap(); + graph.add_edge(22, 5, 6, NO_PROPS, Some("a")).unwrap(); + graph.add_edge(22, 3, 6, NO_PROPS, Some("a")).unwrap(); + + let test_dir = TempDir::new().unwrap(); + let arrow_graph = ArrowGraph::from_graph(&graph, test_dir.path()).unwrap(); + let graph: MaterializedGraph = arrow_graph.into(); + + let graphs = HashMap::from([("graph".to_string(), graph)]); + let data = Data::from_map(graphs); + let schema = App::create_schema().data(data).finish().unwrap(); + + let req = r#" + { + graph(name: "graph") { + nodes { + list { + name + } + } + } + } + "#; + + let req = Request::new(req); + let res = schema.execute(req).await; + let data = res.data.into_json().unwrap(); + assert_eq!( + data, + json!({ + "graph": { + "nodes": { + "list": [ + { + "name": "1" + }, + { + "name": "2" + }, + { + "name": "3" + }, + { + "name": "4" + }, + { + "name": "5" + }, + { + "name": "6" + } + ] + } + } + }), + ); + } } diff --git a/raphtory-graphql/src/server.rs b/raphtory-graphql/src/server.rs index bc40d85aee..b78531ff68 100644 --- a/raphtory-graphql/src/server.rs +++ b/raphtory-graphql/src/server.rs @@ -320,7 +320,7 @@ mod server_tests { use chrono::prelude::*; use raphtory::{ core::Prop, - prelude::{AdditionOps, Graph, GraphViewOps}, + prelude::{AdditionOps, Graph}, }; use std::collections::HashMap; use tokio::time::{sleep, Duration}; @@ -336,7 +336,7 @@ mod server_tests { None, ) .unwrap(); - let g = graph.materialize().unwrap(); + let g = graph.into(); let graphs = HashMap::from([("test".to_owned(), g)]); let server = RaphtoryServer::from_map(graphs); println!("calling start at time {}", Local::now()); diff --git a/raphtory/src/arrow/graph_impl/core_ops.rs b/raphtory/src/arrow/graph_impl/core_ops.rs index b9d9c455f9..bf8c8ec6a3 100644 --- a/raphtory/src/arrow/graph_impl/core_ops.rs +++ b/raphtory/src/arrow/graph_impl/core_ops.rs @@ -112,7 +112,7 @@ impl CoreGraphOps for ArrowGraph { } fn node_type(&self, _v: VID) -> Option { - todo!("Node types are not supported on arrow yet") + None } fn internalise_node(&self, v: NodeRef) -> Option { diff --git a/raphtory/src/arrow/graph_impl/mod.rs b/raphtory/src/arrow/graph_impl/mod.rs index e1f7f541b8..c22ad54327 100644 --- a/raphtory/src/arrow/graph_impl/mod.rs +++ b/raphtory/src/arrow/graph_impl/mod.rs @@ -1,6 +1,6 @@ use std::{ fmt::{Display, Formatter}, - path::Path, + path::{Path, PathBuf}, sync::Arc, }; @@ -9,6 +9,7 @@ use raphtory_arrow::{ load::ExternalEdgeList, RAError, }; use rayon::prelude::*; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; use crate::{ arrow::{graph_impl::prop_conversion::make_node_properties_from_graph, Error}, @@ -16,12 +17,20 @@ use crate::{ array::{PrimitiveArray, StructArray}, datatypes::{ArrowDataType as DataType, Field}, }, - core::entities::{ - properties::{graph_meta::GraphMeta, props::Meta}, - LayerIds, + core::{ + entities::{ + graph::tgraph::InternalGraph, + properties::{graph_meta::GraphMeta, props::Meta}, + LayerIds, EID, VID, + }, + utils::errors::GraphError, + Prop, PropType, }, - db::api::view::{internal::Immutable, DynamicGraph, IntoDynamic}, - prelude::{Graph, GraphViewOps}, + db::api::{ + mutation::internal::{InternalAdditionOps, InternalPropertyAdditionOps}, + view::{internal::Immutable, DynamicGraph, IntoDynamic}, + }, + prelude::{Graph, GraphViewOps, TimeIndexEntry}, }; pub mod const_properties_ops; @@ -56,6 +65,30 @@ pub struct ArrowGraph { node_meta: Arc, edge_meta: Arc, graph_props: Arc, + graph_dir: PathBuf, +} + +impl Serialize for ArrowGraph { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let path = self.graph_dir.clone(); + path.serialize(serializer) + } +} + +impl<'de> Deserialize<'de> for ArrowGraph { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let path = PathBuf::deserialize(deserializer)?; + let graph_result = ArrowGraph::load_from_dir(&path).map_err(|err| { + serde::de::Error::custom(format!("Failed to load ArrowGraph: {:?}", err)) + })?; + Ok(graph_result) + } } impl Display for ArrowGraph { @@ -146,7 +179,7 @@ impl ArrowGraph { .expect("failed to create graph") } - fn new(inner_graph: TemporalGraph) -> Self { + fn new(inner_graph: TemporalGraph, graph_dir: PathBuf) -> Self { let node_meta = Meta::new(); let mut edge_meta = Meta::new(); let graph_meta = GraphMeta::new(); @@ -194,6 +227,7 @@ impl ArrowGraph { node_meta: Arc::new(node_meta), edge_meta: Arc::new(edge_meta), graph_props: Arc::new(graph_meta), + graph_dir, } } @@ -201,7 +235,7 @@ impl ArrowGraph { let inner_graph = TemporalGraph::from_graph(graph, graph_dir.as_ref(), || { make_node_properties_from_graph(graph, graph_dir.as_ref()) })?; - Ok(Self::new(inner_graph)) + Ok(Self::new(inner_graph, graph_dir.as_ref().to_path_buf())) } pub fn load_from_edge_lists( @@ -213,6 +247,7 @@ impl ArrowGraph { dst_col_idx: usize, time_col_idx: usize, ) -> Result { + let path = graph_dir.as_ref().to_path_buf(); let inner = TemporalGraph::from_sorted_edge_list( graph_dir, src_col_idx, @@ -222,12 +257,13 @@ impl ArrowGraph { t_props_chunk_size, edge_list, )?; - Ok(Self::new(inner)) + Ok(Self::new(inner, path)) } pub fn load_from_dir(graph_dir: impl AsRef) -> Result { + let path = graph_dir.as_ref().to_path_buf(); let inner = TemporalGraph::new(graph_dir)?; - Ok(Self::new(inner)) + Ok(Self::new(inner, path)) } pub fn load_from_parquets>( @@ -272,7 +308,7 @@ impl ArrowGraph { layered_edge_list, node_properties.as_ref().map(|p| p.as_ref()), )?; - Ok(Self::new(t_graph)) + Ok(Self::new(t_graph, graph_dir.as_ref().to_path_buf())) } pub fn filtered_layers_par<'a>( @@ -300,6 +336,7 @@ impl ArrowGraph { } pub fn from_layer(layer: TempColGraphFragment) -> Self { + let path = layer.graph_dir().to_path_buf(); let global_ordering = layer.nodes_storage().gids().clone(); let global_order = ArrowHashMap::from_sorted_dedup(global_ordering.clone()) @@ -311,7 +348,128 @@ impl ArrowGraph { vec![layer], vec!["_default".to_string()], ); - Self::new(inner) + Self::new(inner, path) + } +} + +impl InternalAdditionOps for ArrowGraph { + fn next_event_id(&self) -> usize { + unimplemented!("ArrowGraph is immutable") + } + + fn resolve_layer(&self, layer: Option<&str>) -> usize { + // Will check this + unimplemented!("ArrowGraph is immutable") + } + + fn resolve_node_type(&self, v_id: VID, node_type: Option<&str>) -> Result { + unimplemented!("ArrowGraph is immutable") + } + + fn resolve_node(&self, id: u64, name: Option<&str>) -> VID { + unimplemented!("ArrowGraph is immutable") + } + + fn resolve_graph_property(&self, prop: &str, is_static: bool) -> usize { + unimplemented!("ArrowGraph is immutable") + } + + fn resolve_node_property( + &self, + prop: &str, + dtype: PropType, + is_static: bool, + ) -> Result { + unimplemented!("ArrowGraph is immutable") + } + + fn resolve_edge_property( + &self, + prop: &str, + dtype: PropType, + is_static: bool, + ) -> Result { + unimplemented!("ArrowGraph is immutable") + } + + fn process_prop_value(&self, prop: Prop) -> Prop { + unimplemented!("ArrowGraph is immutable") + } + + fn internal_add_node( + &self, + t: TimeIndexEntry, + v: VID, + props: Vec<(usize, Prop)>, + node_type_id: usize, + ) -> Result<(), GraphError> { + unimplemented!("ArrowGraph is immutable") + } + + fn internal_add_edge( + &self, + t: TimeIndexEntry, + src: VID, + dst: VID, + props: Vec<(usize, Prop)>, + layer: usize, + ) -> Result { + unimplemented!("ArrowGraph is immutable") + } +} + +impl InternalPropertyAdditionOps for ArrowGraph { + fn internal_add_properties( + &self, + t: TimeIndexEntry, + props: Vec<(usize, Prop)>, + ) -> Result<(), GraphError> { + unimplemented!("ArrowGraph is immutable") + } + + fn internal_add_static_properties(&self, props: Vec<(usize, Prop)>) -> Result<(), GraphError> { + unimplemented!("ArrowGraph is immutable") + } + + fn internal_update_static_properties( + &self, + props: Vec<(usize, Prop)>, + ) -> Result<(), GraphError> { + unimplemented!("ArrowGraph is immutable") + } + + fn internal_add_constant_node_properties( + &self, + vid: VID, + props: Vec<(usize, Prop)>, + ) -> Result<(), GraphError> { + unimplemented!("ArrowGraph is immutable") + } + + fn internal_update_constant_node_properties( + &self, + vid: VID, + props: Vec<(usize, Prop)>, + ) -> Result<(), GraphError> { + unimplemented!("ArrowGraph is immutable") + } + + fn internal_add_constant_edge_properties( + &self, + eid: EID, + layer: usize, + props: Vec<(usize, Prop)>, + ) -> Result<(), GraphError> { + unimplemented!("ArrowGraph is immutable") + } + + fn internal_update_constant_edge_properties( + &self, + eid: EID, + layer: usize, + props: Vec<(usize, Prop)>, + ) -> Result<(), GraphError> { + unimplemented!("ArrowGraph is immutable") } } diff --git a/raphtory/src/arrow/graph_impl/prop_conversion.rs b/raphtory/src/arrow/graph_impl/prop_conversion.rs index 648e559e24..eda11f99e6 100644 --- a/raphtory/src/arrow/graph_impl/prop_conversion.rs +++ b/raphtory/src/arrow/graph_impl/prop_conversion.rs @@ -94,6 +94,7 @@ pub fn make_node_properties_from_graph( let props = builder.build().map(Some)?; Ok(props) } + pub fn arrow_dtype_from_prop_type(prop_type: PropType) -> DataType { match prop_type { PropType::Str => DataType::LargeUtf8, diff --git a/raphtory/src/db/api/view/internal/materialize.rs b/raphtory/src/db/api/view/internal/materialize.rs index 7e61190e52..fb5e7a943c 100644 --- a/raphtory/src/db/api/view/internal/materialize.rs +++ b/raphtory/src/db/api/view/internal/materialize.rs @@ -40,6 +40,9 @@ use enum_dispatch::enum_dispatch; use serde::{de::Error, Deserialize, Deserializer, Serialize}; use std::path::Path; +#[cfg(feature = "arrow")] +use crate::arrow::graph_impl::ArrowGraph; + #[enum_dispatch(CoreGraphOps)] #[enum_dispatch(InternalLayerOps)] #[enum_dispatch(ListOps)] @@ -54,6 +57,8 @@ use std::path::Path; #[enum_dispatch(InternalPropertyAdditionOps)] #[derive(Serialize, Deserialize, Clone)] pub enum MaterializedGraph { + #[cfg(feature = "arrow")] + ArrowEventGraph(ArrowGraph), EventGraph(Graph), PersistentGraph(PersistentGraph), } @@ -64,7 +69,7 @@ where { let version = u32::deserialize(deserializer)?; if version != BINCODE_VERSION { - return Err(D::Error::custom(GraphError::BincodeVersionError( + return Err(Error::custom(GraphError::BincodeVersionError( version, BINCODE_VERSION, ))); @@ -86,10 +91,14 @@ impl MaterializedGraph { match self { MaterializedGraph::EventGraph(g) => Some(g), MaterializedGraph::PersistentGraph(_) => None, + #[cfg(feature = "arrow")] + MaterializedGraph::ArrowEventGraph(_) => None, } } pub fn into_persistent(self) -> Option { match self { + #[cfg(feature = "arrow")] + MaterializedGraph::ArrowEventGraph(_) => None, MaterializedGraph::EventGraph(_) => None, MaterializedGraph::PersistentGraph(g) => Some(g), } diff --git a/raphtory/src/python/graph/views/graph_view.rs b/raphtory/src/python/graph/views/graph_view.rs index c593a37868..ee79aac768 100644 --- a/raphtory/src/python/graph/views/graph_view.rs +++ b/raphtory/src/python/graph/views/graph_view.rs @@ -36,6 +36,7 @@ use pyo3::{prelude::*, types::PyBytes}; impl IntoPy for MaterializedGraph { fn into_py(self, py: Python<'_>) -> PyObject { match self { + MaterializedGraph::ArrowEventGraph(g) => g.into_py(py), MaterializedGraph::EventGraph(g) => g.into_py(py), MaterializedGraph::PersistentGraph(g) => g.into_py(py), } diff --git a/raphtory/src/search/mod.rs b/raphtory/src/search/mod.rs index 1aacb03e1b..2e12609094 100644 --- a/raphtory/src/search/mod.rs +++ b/raphtory/src/search/mod.rs @@ -204,7 +204,13 @@ impl<'graph, G: GraphViewOps<'graph>> IndexedGraph { if prop_names_set.is_empty() { break; } - let mut found_props: HashSet = HashSet::default(); + let mut found_props: HashSet = HashSet::from([ + fields::TIME.into(), + fields::VERTEX_ID.into(), + fields::VERTEX_ID_REV.into(), + fields::NAME.into(), + fields::NODE_TYPE.into(), + ]); found_props.insert("name".into()); for prop in prop_names_set.iter() { @@ -257,7 +263,12 @@ impl<'graph, G: GraphViewOps<'graph>> IndexedGraph { if prop_names_set.is_empty() { break; } - let mut found_props: HashSet = HashSet::new(); + let mut found_props: HashSet = HashSet::from([ + fields::TIME.into(), + fields::SOURCE.into(), + fields::DESTINATION.into(), + fields::EDGE_ID.into(), + ]); for prop in prop_names_set.iter() { // load temporal props