Skip to content

Commit

Permalink
Todos/graphql (#1676)
Browse files Browse the repository at this point in the history
* remove timeout from run and set a sensible default

* redo appconfig building

* skip graphs that don't have name as property instead of failing

* rename graphs names gql to name

* remove Graphs hashmap/paths from the RaphtoryServer so all uploads are done via the client

* rid expects

* impl graph file upload

* impl load graph from path

* impl overwrite graph

* impl namespace

* handle more namespace validation cases, add tests

* impl get_graph to provide namespace as arg, modified gqlgraphs to hold both names, and namespaces but not graphs saving memory, impl path api on both gqlgraph and gqlgraphs, fixed and added tests

* refactor and fix save_graph

* impl load_graph_from_path ns

* fix issue with dir creation

* add rename graph tests, rid saving graph path as graph property, add parent_graph_namespace to save graph and rename graph, add more validation in these apis

* add tests for get graphs

* add received graph tests

* add test for get graph

* add test for update last opened

* add isarchive tests

* fix save and add tests

* fix save graph issue

* impl create_graph, update_graph and add tests for update graph with new name

* impl tests for update graph

* add tests for send graph

* fix upload graph to accept namespace and add tests

* fix load graph and add tests

* impl tests for get graph

* impl tests for get graphs

* fix receive graph versioning issue and add tests

* impl rename graph tests

* fix archive tests

* fix rename graph and add comments

* Change graph_nodes to be a vector of string than string, fix issues with send_graph from py and gql, receive graph versioning, refactors

* rename renamegraph to movegraph and impl copy graph, add tests for move graph

* add tests fro copy graph

* impl delete graph gql api and add tests

* add properties to nodes and edges to test if they are carried forward when creating new graph with update_graph

* fix create graph and add tests

* rid dependency of graph name as graph prop, fix tests
  • Loading branch information
shivamka1 authored Jul 16, 2024
1 parent 6528121 commit 364aea5
Show file tree
Hide file tree
Showing 20 changed files with 3,756 additions and 1,080 deletions.
2 changes: 1 addition & 1 deletion python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pyo3 = { workspace = true }
raphtory_core = { path = "../raphtory", version = "0.9.1", features = ["python", "search", "vectors"], package = "raphtory" }
raphtory-graphql = { path = "../raphtory-graphql", version = "0.9.1" }
serde_json = { workspace = true }
reqwest = { workspace = true }
reqwest = { workspace = true, features = ["multipart"] }
tokio = { workspace = true }
crossbeam-channel = { workspace = true }
serde = { workspace = true }
Expand Down
265 changes: 183 additions & 82 deletions python/src/graphql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,21 @@ use raphtory_graphql::{
global_plugins::GlobalPlugins, vector_algorithms::VectorAlgorithms,
},
server_config::*,
url_encode_graph, RaphtoryServer,
url_encode::url_encode_graph,
RaphtoryServer,
};
use reqwest::Client;
use reqwest::{multipart, multipart::Part, Client};
use serde_json::{json, Map, Number, Value as JsonValue};
use std::{
collections::HashMap,
fs::File,
io::Read,
path::{Path, PathBuf},
thread,
thread::{sleep, JoinHandle},
time::Duration,
};
use tokio::{self, io::Result as IoResult};
use tokio::{self, io::Result as IoResult, runtime::Runtime};

/// A class for accessing graphs hosted in a Raphtory GraphQL server and running global search for
/// graph documents
Expand Down Expand Up @@ -140,7 +143,7 @@ impl PyRaphtoryServer {
&PathBuf::from(cache),
Some(template),
)
.await;
.await?;
Ok(Self::new(new_server))
})
}
Expand Down Expand Up @@ -222,34 +225,40 @@ impl PyRaphtoryServer {
impl PyRaphtoryServer {
#[new]
#[pyo3(
signature = (work_dir, graphs = None, graph_paths = None, cache_capacity = 30, cache_tti_seconds = 900, client_id = None, client_secret = None, tenant_id = None)
signature = (work_dir, cache_capacity = None, cache_tti_seconds = None, client_id = None, client_secret = None, tenant_id = None, log_level = None, config_path = None)
)]
fn py_new(
work_dir: String,
graphs: Option<HashMap<String, MaterializedGraph>>,
graph_paths: Option<Vec<String>>,
cache_capacity: u64,
cache_tti_seconds: u64,
work_dir: PathBuf,
cache_capacity: Option<u64>,
cache_tti_seconds: Option<u64>,
client_id: Option<String>,
client_secret: Option<String>,
tenant_id: Option<String>,
log_level: Option<String>,
config_path: Option<PathBuf>,
) -> PyResult<Self> {
let graph_paths = graph_paths.map(|paths| paths.into_iter().map(PathBuf::from).collect());
let server = RaphtoryServer::new(
Path::new(&work_dir),
graphs,
graph_paths,
Some(CacheConfig {
capacity: cache_capacity,
tti_seconds: cache_tti_seconds,
}),
Some(AuthConfig {
client_id,
client_secret,
tenant_id,
}),
None,
);
let mut app_config_builder = AppConfigBuilder::new();
if let Some(log_level) = log_level {
app_config_builder = app_config_builder.with_log_level(log_level);
}
if let Some(cache_capacity) = cache_capacity {
app_config_builder = app_config_builder.with_cache_capacity(cache_capacity);
}
if let Some(cache_tti_seconds) = cache_tti_seconds {
app_config_builder = app_config_builder.with_cache_tti_seconds(cache_tti_seconds);
}
if let Some(client_id) = client_id {
app_config_builder = app_config_builder.with_auth_client_id(client_id);
}
if let Some(client_secret) = client_secret {
app_config_builder = app_config_builder.with_auth_client_secret(client_secret);
}
if let Some(tenant_id) = tenant_id {
app_config_builder = app_config_builder.with_auth_tenant_id(tenant_id);
}
let app_config = Some(app_config_builder.build());

let server = RaphtoryServer::new(work_dir, app_config, config_path)?;
Ok(PyRaphtoryServer::new(server))
}

Expand Down Expand Up @@ -364,13 +373,12 @@ impl PyRaphtoryServer {
/// * `enable_auth`: enable authentication (defaults to False).
/// * `timeout_in_milliseconds`: wait for server to be online (defaults to 5000). The server is stopped if not online within timeout_in_milliseconds but manages to come online as soon as timeout_in_milliseconds finishes!
#[pyo3(
signature = (port = 1736, log_level = "INFO".to_string(), enable_tracing = false, enable_auth = false, timeout_in_milliseconds = None)
signature = (port = 1736, enable_tracing = false, enable_auth = false, timeout_in_milliseconds = None)
)]
pub fn start(
slf: PyRefMut<Self>,
py: Python,
port: u16,
log_level: String,
enable_tracing: bool,
enable_auth: bool,
timeout_in_milliseconds: Option<u64>,
Expand All @@ -386,9 +394,8 @@ impl PyRaphtoryServer {
.build()
.unwrap()
.block_on(async move {
let handler =
server.start_with_port(port, Some(&log_level), enable_tracing, enable_auth);
let running_server = handler.await;
let handler = server.start_with_port(port, enable_tracing, enable_auth);
let running_server = handler.await?;
let tokio_sender = running_server._get_sender().clone();
tokio::task::spawn_blocking(move || {
match receiver.recv().expect("Failed to wait for cancellation") {
Expand Down Expand Up @@ -426,27 +433,17 @@ impl PyRaphtoryServer {
/// Arguments:
/// * `port`: the port to use (defaults to 1736).
#[pyo3(
signature = (port = 1736, log_level = "INFO".to_string(), enable_tracing = false, enable_auth = false, timeout_in_milliseconds = None)
signature = (port = 1736, enable_tracing = false, enable_auth = false)
)]
pub fn run(
slf: PyRefMut<Self>,
py: Python,
port: u16,
log_level: String,
enable_tracing: bool,
enable_auth: bool,
timeout_in_milliseconds: Option<u64>,
) -> PyResult<()> {
let mut server = Self::start(
slf,
py,
port,
log_level,
enable_tracing,
enable_auth,
timeout_in_milliseconds,
)?
.server_handler;
let mut server =
Self::start(slf, py, port, enable_tracing, enable_auth, Some(180000))?.server_handler;
py.allow_threads(|| wait_server(&mut server))
}
}
Expand All @@ -464,7 +461,7 @@ fn adapt_graphql_value(value: &ValueAccessor, py: Python) -> PyObject {
}
GraphqlValue::String(value) => value.to_object(py),
GraphqlValue::Boolean(value) => value.to_object(py),
value => panic!("graphql input value {value} has an unsuported type"),
value => panic!("graphql input value {value} has an unsupported type"),
}
}

Expand Down Expand Up @@ -642,30 +639,6 @@ impl PyRaphtoryClient {
.map_err(|err| adapt_err_value(&err))
.map(|json| (request_body, json))
}

fn load_graphs(
&self,
py: Python,
path: String,
overwrite: bool,
) -> PyResult<HashMap<String, PyObject>> {
let query =
format!("mutation {{ loadGraphsFromPath(path: \"{path}\", overwrite: {overwrite}) }}");
let variables = [];

let data = self.query_with_json_variables(query.clone(), variables.into())?;

match data.get("loadGraphsFromPath") {
Some(JsonValue::Array(loads)) => {
let num_graphs = loads.len();
println!("Loaded {num_graphs} graph(s)");
translate_map_to_python(py, data)
}
_ => Err(PyException::new_err(format!(
"Error while reading server response for query:\n\t{query}\nGot data:\n\t'{data:?}'"
))),
}
}
}

const WAIT_CHECK_INTERVAL_MILLIS: u64 = 200;
Expand Down Expand Up @@ -710,31 +683,38 @@ impl PyRaphtoryClient {
translate_map_to_python(py, data)
}

/// Send a graph to the server.
/// Send a graph to the server
///
/// Arguments:
/// * `name`: the name of the graph sent.
/// * `graph`: the graph to send.
/// * `name`: the name of the graph
/// * `graph`: the graph to send
/// * `overwrite`: overwrite existing graph (defaults to False)
/// * `namespace`: the namespace of the graph
///
/// Returns:
/// The `data` field from the graphQL response after executing the mutation.
#[pyo3(signature = (name, graph, overwrite = false, namespace = None))]
fn send_graph(
&self,
py: Python,
name: String,
graph: MaterializedGraph,
overwrite: bool,
namespace: Option<String>,
) -> PyResult<HashMap<String, PyObject>> {
let encoded_graph = encode_graph(graph)?;

let query = r#"
mutation SendGraph($name: String!, $graph: String!) {
sendGraph(name: $name, graph: $graph)
mutation SendGraph($name: String!, $graph: String!, $overwrite: Boolean!, $namespace: String) {
sendGraph(name: $name, graph: $graph, overwrite: $overwrite, namespace: $namespace)
}
"#
.to_owned();
.to_owned();
let variables = [
("name".to_owned(), json!(name)),
("graph".to_owned(), json!(encoded_graph)),
("overwrite".to_owned(), json!(overwrite)),
("namespace".to_owned(), json!(namespace)),
];

let data = self.query_with_json_variables(query, variables.into())?;
Expand All @@ -751,22 +731,143 @@ impl PyRaphtoryClient {
}
}

/// Set the server to load all the graphs from its path `path`.
/// Upload graph file from a path `file_path` on the client
///
/// Arguments:
/// * `name`: the name of the graph
/// * `file_path`: the path of the graph on the client
/// * `overwrite`: overwrite existing graph (defaults to False)
/// * `namespace`: the namespace of the graph
///
/// Returns:
/// The `data` field from the graphQL response after executing the mutation.
#[pyo3(signature = (name, file_path, overwrite = false, namespace = None))]
fn upload_graph(
&self,
py: Python,
name: String,
file_path: String,
overwrite: bool,
namespace: Option<String>,
) -> PyResult<HashMap<String, PyObject>> {
let rt = Runtime::new().unwrap();
rt.block_on(async {
let client = Client::new();

let mut file = File::open(Path::new(&file_path)).map_err(|err| adapt_err_value(&err))?;

let mut buffer = Vec::new();
file.read_to_end(&mut buffer).map_err(|err| adapt_err_value(&err))?;

let mut variables = format!(
r#""name": "{}", "overwrite": {}, "graph": null"#,
name, overwrite
);

if let Some(ns) = &namespace {
variables = format!(r#""namespace": "{}", {}"#, ns, variables);
}

let operations = format!(
r#"{{
"query": "mutation UploadGraph($name: String!, $graph: Upload!, $overwrite: Boolean!{}) {{ uploadGraph(name: $name, graph: $graph, overwrite: $overwrite{}) }}",
"variables": {{ {} }}
}}"#,
if namespace.is_some() { ", $namespace: String" } else { "" },
if namespace.is_some() { ", namespace: $namespace" } else { "" },
variables
);

let form = multipart::Form::new()
.text("operations", operations)
.text("map", r#"{"0": ["variables.graph"]}"#)
.part("0", Part::bytes(buffer).file_name(file_path.clone()));

let response = client
.post(&self.url)
.multipart(form)
.send()
.await
.map_err(|err| adapt_err_value(&err))?;

let status = response.status();
let text = response.text().await.map_err(|err| adapt_err_value(&err))?;

if !status.is_success() {
return Err(PyException::new_err(format!(
"Error Uploading Graph. Status: {}. Response: {}",
status, text
)));
}

let mut data: HashMap<String, JsonValue> = serde_json::from_str(&text).map_err(|err| {
PyException::new_err(format!(
"Failed to parse JSON response: {}. Response text: {}",
err, text
))
})?;

match data.remove("data") {
Some(JsonValue::Object(data)) => {
let mut result_map = HashMap::new();
for (key, value) in data {
result_map.insert(key, translate_to_python(py, value)?);
}
Ok(result_map)
}
_ => match data.remove("errors") {
Some(JsonValue::Array(errors)) => Err(PyException::new_err(format!(
"Error Uploading Graph. Got errors:\n\t{:#?}",
errors
))),
_ => Err(PyException::new_err(format!(
"Error Uploading Graph. Unexpected response: {}",
text
))),
},
}
})
}

/// Load graph from a path `path` on the server.
///
/// Arguments:
/// * `path`: the path to load the graphs from.
/// * `overwrite`: overwrite existing graphs (defaults to False)
/// * `file_path`: the path to load the graph from.
/// * `overwrite`: overwrite existing graph (defaults to False)
///
/// Returns:
/// The `data` field from the graphQL response after executing the mutation.
#[pyo3(signature = (path, overwrite = false))]
fn load_graphs_from_path(
#[pyo3(signature = (file_path, overwrite = false, namespace = None))]
fn load_graph(
&self,
py: Python,
path: String,
file_path: String,
overwrite: bool,
namespace: Option<String>,
) -> PyResult<HashMap<String, PyObject>> {
self.load_graphs(py, path, overwrite)
let query = r#"
mutation LoadGraph($file_path: String!, $overwrite: Boolean!, $namespace: String) {
loadGraphFromPath(filePath: $file_path, overwrite: $overwrite, namespace: $namespace)
}
"#
.to_owned();
let variables = [
("file_path".to_owned(), json!(file_path)),
("overwrite".to_owned(), json!(overwrite)),
("namespace".to_owned(), json!(namespace)),
];

let data = self.query_with_json_variables(query.clone(), variables.into())?;

match data.get("loadGraphFromPath") {
Some(JsonValue::String(name)) => {
println!("Loaded graph: '{name}'");
translate_map_to_python(py, data)
}
_ => Err(PyException::new_err(format!(
"Error while reading server response for query:\n\t{query}\nGot data:\n\t'{data:?}'"
))),
}
}
}

Expand Down
Loading

0 comments on commit 364aea5

Please sign in to comment.