Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix arrowgraph for gql #1619

Merged
merged 13 commits into from
May 31, 2024
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion python/tests/test_graphql.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ def test_windows_and_layers():
after(time: 500) {
history
neighbours {
list {
list {
name
before(time: 300) {
history
Expand Down
3 changes: 3 additions & 0 deletions raphtory-graphql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,6 @@ toml = { workspace = true }

[dev-dependencies]
tempfile = { workspace = true }

[features]
arrow = ["raphtory/arrow"]
97 changes: 76 additions & 21 deletions raphtory-graphql/src/data.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use parking_lot::RwLock;
#[cfg(feature = "arrow")]
use raphtory::arrow::graph_impl::ArrowGraph;
use raphtory::{
core::Prop,
db::api::view::MaterializedGraph,
prelude::{GraphViewOps, PropUnwrap, PropertyAdditionOps},
search::IndexedGraph,
vectors::vectorised_graph::DynamicVectorisedGraph,
};
use std::{collections::HashMap, path::Path, sync::Arc};
use std::{collections::HashMap, fs, path::Path, sync::Arc};
use walkdir::WalkDir;

#[derive(Default)]
Expand Down Expand Up @@ -85,41 +87,94 @@ impl Data {
}

pub fn load_from_file(path: &str) -> HashMap<String, IndexedGraph<MaterializedGraph>> {
let valid_entries = WalkDir::new(path).into_iter().filter_map(|e| {
let entry = e.ok()?;
let path = entry.path();
let filename = path.file_name().and_then(|name| name.to_str())?;
(path.is_file() && !filename.starts_with('.')).then_some(entry)
});
fn get_graph_name(path: &Path, graph: &MaterializedGraph) -> String {
graph
.properties()
.get("name")
.into_str()
.map(|v| v.to_string())
.unwrap_or_else(|| path.file_name().unwrap().to_str().unwrap().to_owned())
}

let mut graphs: HashMap<String, IndexedGraph<MaterializedGraph>> = HashMap::default();
fn is_arrow_graph_dir(path: &Path) -> bool {
// Check if the directory contains files specific to arrow graphs
let files = fs::read_dir(path).unwrap();
let mut has_arrow_files = false;
for file in files {
let file_name = file.unwrap().file_name().into_string().unwrap();
if file_name.ends_with(".ipc") {
has_arrow_files = true;
break;
}
}
has_arrow_files
}

for entry in valid_entries {
let path = entry.path();
fn load_bincode_graph(path: &Path) -> (String, MaterializedGraph) {
let path_string = path.display().to_string();
println!("loading graph from {path_string}");
let graph =
MaterializedGraph::load_from_file(path, false).expect("Unable to load from graph");
let graph_name = graph
.properties()
.get("name")
.into_str()
.map(|v| v.to_string())
.unwrap_or_else(|| path.file_name().unwrap().to_str().unwrap().to_owned());
let graph_name = get_graph_name(path, &graph);
graph
.update_constant_properties([("path".to_string(), Prop::str(path_string.clone()))])
.expect("Failed to add static property");

(graph_name, graph)
}

#[cfg(feature = "arrow")]
fn load_arrow_graph(path: &Path) -> (String, MaterializedGraph) {
let arrow_graph =
ArrowGraph::load_from_dir(path).expect("Unable to load from arrow graph");
let graph: MaterializedGraph = arrow_graph.into();
let graph_name = get_graph_name(path, &graph);

(graph_name, graph)
}

#[cfg(not(feature = "arrow"))]
fn load_arrow_graph(path: &Path) -> (String, MaterializedGraph) {
unimplemented!("Arrow feature not enabled, cannot load from arrow graph")
}

fn add_to_graphs(
graphs: &mut HashMap<String, IndexedGraph<MaterializedGraph>>,
graph_name: &str,
graph: &MaterializedGraph,
) {
if let Some(old_graph) = graphs.insert(
graph_name,
IndexedGraph::from_graph(&graph).expect("Unable to index graph"),
graph_name.to_string(),
IndexedGraph::from_graph(graph).expect("Unable to index graph"),
) {
// insertion returns the old value if the entry already existed
let old_path = old_graph.properties().get("path").unwrap_str();
let name = old_graph.properties().get("name").unwrap_str();
panic!("Graph with name {name} defined multiple times, first file: {old_path}, second file: {path_string}")
panic!(
"Graph with name {} defined multiple times, first file: {}, second file: {}",
name, old_path, graph_name
);
}
}

let mut graphs: HashMap<String, IndexedGraph<MaterializedGraph>> = HashMap::default();

for entry in fs::read_dir(path).unwrap() {
let entry = entry.unwrap();
let path = entry.path();
if path.is_dir() {
println!("Arrow Graph loaded = {}", path.display());
if is_arrow_graph_dir(&path) {
if let (graph_name, graph) = load_arrow_graph(&path) {
add_to_graphs(&mut graphs, &graph_name, &graph);
}
}
} else {
println!("Graph loaded = {}", path.display());
if let (graph_name, graph) = load_bincode_graph(&path) {
add_to_graphs(&mut graphs, &graph_name, &graph);
}
}
}

graphs
}
}
99 changes: 97 additions & 2 deletions raphtory-graphql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,18 @@ mod graphql_test {
use crate::{data::Data, model::App};
use async_graphql::UploadValue;
use dynamic_graphql::{Request, Variables};
#[cfg(feature = "arrow")]
use raphtory::arrow::graph_impl::ArrowGraph;
use raphtory::{
db::{api::view::IntoDynamic, graph::views::deletion_graph::PersistentGraph},
prelude::*,
};
use serde_json::json;
use std::collections::{HashMap, HashSet};
use tempfile::tempdir;
use std::{
collections::{HashMap, HashSet},
path::Path,
};
use tempfile::{tempdir, TempDir};

#[tokio::test]
async fn search_for_gandalf_query() {
Expand All @@ -68,6 +73,7 @@ mod graphql_test {
)
.expect("Could not add node!");

let graph: MaterializedGraph = graph.into();
let graphs = HashMap::from([("lotr".to_string(), graph)]);
let data = Data::from_map(graphs);
let schema = App::create_schema().data(data).finish().unwrap();
Expand Down Expand Up @@ -106,6 +112,7 @@ mod graphql_test {
.add_node(0, 11, NO_PROPS, None)
.expect("Could not add node!");

let graph: MaterializedGraph = graph.into();
let graphs = HashMap::from([("lotr".to_string(), graph)]);
let data = Data::from_map(graphs);

Expand Down Expand Up @@ -153,6 +160,7 @@ mod graphql_test {
None,
)
.unwrap();
let graph: MaterializedGraph = graph.into();

let graphs = HashMap::from([("graph".to_string(), graph)]);
let data = Data::from_map(graphs);
Expand Down Expand Up @@ -1085,4 +1093,91 @@ mod graphql_test {
}),
);
}

#[cfg(feature = "arrow")]
#[tokio::test]
async fn test_arrow_graph() {
let graph = Graph::new();
graph.add_constant_properties([("name", "graph")]).unwrap();
graph.add_node(1, 1, NO_PROPS, Some("a")).unwrap();
graph.add_node(1, 2, NO_PROPS, Some("b")).unwrap();
graph.add_node(1, 3, NO_PROPS, Some("b")).unwrap();
graph.add_node(1, 4, NO_PROPS, Some("a")).unwrap();
graph.add_node(1, 5, NO_PROPS, Some("c")).unwrap();
graph.add_node(1, 6, NO_PROPS, Some("e")).unwrap();
graph.add_edge(22, 1, 2, NO_PROPS, Some("a")).unwrap();
graph.add_edge(22, 3, 2, NO_PROPS, Some("a")).unwrap();
graph.add_edge(22, 2, 4, NO_PROPS, Some("a")).unwrap();
graph.add_edge(22, 4, 5, NO_PROPS, Some("a")).unwrap();
graph.add_edge(22, 4, 5, NO_PROPS, Some("a")).unwrap();
graph.add_edge(22, 5, 6, NO_PROPS, Some("a")).unwrap();
graph.add_edge(22, 3, 6, NO_PROPS, Some("a")).unwrap();

let test_dir = TempDir::new().unwrap();
let arrow_graph = ArrowGraph::from_graph(&graph, test_dir.path()).unwrap();
let graph: MaterializedGraph = arrow_graph.into();

let graphs = HashMap::from([("graph".to_string(), graph)]);
let data = Data::from_map(graphs);
let schema = App::create_schema().data(data).finish().unwrap();

let req = r#"
{
graph(name: "graph") {
nodes {
list {
name
}
}
}
}
"#;

let req = Request::new(req);
let res = schema.execute(req).await;
let data = res.data.into_json().unwrap();
assert_eq!(
data,
json!({
"graph": {
"nodes": {
"list": [
{
"name": "1"
},
{
"name": "2"
},
{
"name": "3"
},
{
"name": "4"
},
{
"name": "5"
},
{
"name": "6"
}
]
}
}
}),
);

let req = &format!(
r#"mutation {{
updateGraphLastOpened(graphName: "{}")
}}"#,
"graph"
);

let req = Request::new(req);
let res = schema.execute(req).await;
let data = res.errors;
let error_message = &data[0].message;
let expected_error_message = "Arrow Graph is immutable";
assert_eq!(error_message, expected_error_message);
}
}
34 changes: 31 additions & 3 deletions raphtory-graphql/src/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ impl Display for MissingGraph {

impl Error for MissingGraph {}

#[derive(thiserror::Error, Debug)]
pub enum GqlGraphError {
#[error("Arrow Graph is immutable")]
ImmutableArrowGraph,
}

#[derive(ResolvedObject)]
#[graphql(root)]
pub(crate) struct QueryRoot;
Expand Down Expand Up @@ -132,10 +138,16 @@ impl Mut {
.into());
}

if new_graph_name.ne(&graph_name) && parent_graph_name.ne(&graph_name) {
let mut data = ctx.data_unchecked::<Data>().graphs.write();
let mut data = ctx.data_unchecked::<Data>().graphs.write();

let subgraph = data.get(&graph_name).ok_or("Graph not found")?;

#[cfg(feature = "arrow")]
if subgraph.clone().graph.into_arrow().is_some() {
return Err(GqlGraphError::ImmutableArrowGraph.into());
}

let subgraph = data.get(&graph_name).ok_or("Graph not found")?;
if new_graph_name.ne(&graph_name) && parent_graph_name.ne(&graph_name) {
let path = subgraph
.properties()
.constant()
Expand Down Expand Up @@ -181,6 +193,11 @@ impl Mut {

let subgraph = data.get(&graph_name).ok_or("Graph not found")?;

#[cfg(feature = "arrow")]
if subgraph.clone().graph.into_arrow().is_some() {
return Err(GqlGraphError::ImmutableArrowGraph.into());
}

let dt = Utc::now();
let timestamp: i64 = dt.timestamp();

Expand Down Expand Up @@ -212,6 +229,11 @@ impl Mut {
let parent_graph = data.get(&parent_graph_name).ok_or("Graph not found")?;
let subgraph = data.get(&graph_name).ok_or("Graph not found")?;

#[cfg(feature = "arrow")]
if subgraph.clone().graph.into_arrow().is_some() {
return Err(GqlGraphError::ImmutableArrowGraph.into());
}

let path = match data.get(&new_graph_name) {
Some(new_graph) => new_graph
.properties()
Expand Down Expand Up @@ -359,6 +381,12 @@ impl Mut {
) -> Result<bool> {
let data = ctx.data_unchecked::<Data>().graphs.write();
let subgraph = data.get(&graph_name).ok_or("Graph not found")?;

#[cfg(feature = "arrow")]
if subgraph.clone().graph.into_arrow().is_some() {
return Err(GqlGraphError::ImmutableArrowGraph.into());
}

subgraph.update_constant_properties([("isArchive", Prop::U8(is_archive))])?;

let path = subgraph
Expand Down
4 changes: 2 additions & 2 deletions raphtory-graphql/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ mod server_tests {
use chrono::prelude::*;
use raphtory::{
core::Prop,
prelude::{AdditionOps, Graph, GraphViewOps},
prelude::{AdditionOps, Graph},
};
use std::collections::HashMap;
use tokio::time::{sleep, Duration};
Expand All @@ -336,7 +336,7 @@ mod server_tests {
None,
)
.unwrap();
let g = graph.materialize().unwrap();
let g = graph.into();
let graphs = HashMap::from([("test".to_owned(), g)]);
let server = RaphtoryServer::from_map(graphs);
println!("calling start at time {}", Local::now());
Expand Down
2 changes: 1 addition & 1 deletion raphtory/src/arrow/graph_impl/core_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl CoreGraphOps for ArrowGraph {
}

fn node_type(&self, _v: VID) -> Option<ArcStr> {
todo!("Node types are not supported on arrow yet")
None
}

fn internalise_node(&self, v: NodeRef) -> Option<VID> {
Expand Down
Loading
Loading