From 652812146bf9e312b1e035b961f210b86c2d082d Mon Sep 17 00:00:00 2001 From: Shivam Kapoor <4599890+iamsmkr@users.noreply.github.com> Date: Fri, 28 Jun 2024 10:21:37 +0100 Subject: [PATCH] Todos/graphql (#1672) * impl gqlgraphs * properties, unique layers to gqlgraphs * impl default configs, impl config load precedence, add tests * impl auth configs * wait for server to come online when started, also expose an api on client to see if server is already online, add tests * add client tests * fix tests * add load graphs tests * rid duplicate tests * wait internalised * rid silly auth config defaults --- python/src/graphql.rs | 194 ++++----- python/tests/test_algorithms.py | 1 - python/tests/test_graphdb.py | 6 - python/tests/test_graphql.py | 461 +++++++++------------ raphtory-graphql/config.toml | 6 - raphtory-graphql/src/data.rs | 34 +- raphtory-graphql/src/lib.rs | 38 +- raphtory-graphql/src/main.rs | 4 +- raphtory-graphql/src/model/graph/graphs.rs | 48 +++ raphtory-graphql/src/model/graph/mod.rs | 1 + raphtory-graphql/src/model/mod.rs | 14 +- raphtory-graphql/src/server.rs | 89 ++-- raphtory-graphql/src/server_config.rs | 203 ++++++++- 13 files changed, 623 insertions(+), 476 deletions(-) delete mode 100644 raphtory-graphql/config.toml create mode 100644 raphtory-graphql/src/model/graph/graphs.rs diff --git a/python/src/graphql.rs b/python/src/graphql.rs index e099b9caf5..8334bca67a 100644 --- a/python/src/graphql.rs +++ b/python/src/graphql.rs @@ -31,7 +31,7 @@ use raphtory_graphql::{ algorithm_entry_point::AlgorithmEntryPoint, document::GqlDocument, global_plugins::GlobalPlugins, vector_algorithms::VectorAlgorithms, }, - server_config::CacheConfig, + server_config::*, url_encode_graph, RaphtoryServer, }; use reqwest::Client; @@ -222,7 +222,7 @@ impl PyRaphtoryServer { impl PyRaphtoryServer { #[new] #[pyo3( - signature = (work_dir, graphs = None, graph_paths = None, cache_capacity = 30, cache_tti_seconds = 900) + signature = (work_dir, graphs = None, graph_paths = None, cache_capacity = 30, cache_tti_seconds = 900, client_id = None, client_secret = None, tenant_id = None) )] fn py_new( work_dir: String, @@ -230,6 +230,9 @@ impl PyRaphtoryServer { graph_paths: Option>, cache_capacity: u64, cache_tti_seconds: u64, + client_id: Option, + client_secret: Option, + tenant_id: Option, ) -> PyResult { let graph_paths = graph_paths.map(|paths| paths.into_iter().map(PathBuf::from).collect()); let server = RaphtoryServer::new( @@ -240,6 +243,12 @@ impl PyRaphtoryServer { capacity: cache_capacity, tti_seconds: cache_tti_seconds, }), + Some(AuthConfig { + client_id, + client_secret, + tenant_id, + }), + None, ); Ok(PyRaphtoryServer::new(server)) } @@ -350,15 +359,21 @@ impl PyRaphtoryServer { /// /// Arguments: /// * `port`: the port to use (defaults to 1736). + /// * `log_level`: set log level (defaults to INFO). + /// * `enable_tracing`: enable tracing (defaults to False). + /// * `enable_auth`: enable authentication (defaults to False). + /// * `timeout_in_milliseconds`: wait for server to be online (defaults to 5000). The server is stopped if not online within timeout_in_milliseconds but manages to come online as soon as timeout_in_milliseconds finishes! #[pyo3( - signature = (port = 1736, log_level = "INFO".to_string(), enable_tracing = false, enable_auth = false) + signature = (port = 1736, log_level = "INFO".to_string(), enable_tracing = false, enable_auth = false, timeout_in_milliseconds = None) )] pub fn start( slf: PyRefMut, + py: Python, port: u16, log_level: String, enable_tracing: bool, enable_auth: bool, + timeout_in_milliseconds: Option, ) -> PyResult { let (sender, receiver) = crossbeam_channel::bounded::(1); let server = take_server_ownership(slf)?; @@ -389,7 +404,21 @@ impl PyRaphtoryServer { }) }); - Ok(PyRunningRaphtoryServer::new(join_handle, sender, port)) + let mut server = PyRunningRaphtoryServer::new(join_handle, sender, port); + if let Some(server_handler) = &server.server_handler { + match PyRunningRaphtoryServer::wait_for_server_online( + &server_handler.client.url, + timeout_in_milliseconds, + ) { + Ok(_) => return Ok(server), + Err(e) => { + PyRunningRaphtoryServer::stop_server(&mut server, py)?; + Err(e) + } + } + } else { + Err(PyException::new_err("Failed to start server")) + } } /// Run the server until completion. @@ -397,7 +426,7 @@ impl PyRaphtoryServer { /// Arguments: /// * `port`: the port to use (defaults to 1736). #[pyo3( - signature = (port = 1736, log_level = "INFO".to_string(), enable_tracing = false, enable_auth = false) + signature = (port = 1736, log_level = "INFO".to_string(), enable_tracing = false, enable_auth = false, timeout_in_milliseconds = None) )] pub fn run( slf: PyRefMut, @@ -406,9 +435,18 @@ impl PyRaphtoryServer { log_level: String, enable_tracing: bool, enable_auth: bool, + timeout_in_milliseconds: Option, ) -> PyResult<()> { - let mut server = - Self::start(slf, port, log_level, enable_tracing, enable_auth)?.server_handler; + let mut server = Self::start( + slf, + py, + port, + log_level, + enable_tracing, + enable_auth, + timeout_in_milliseconds, + )? + .server_handler; py.allow_threads(|| wait_server(&mut server)) } } @@ -495,85 +533,54 @@ impl PyRunningRaphtoryServer { None => Err(PyException::new_err(RUNNING_SERVER_CONSUMED_MSG)), } } -} -#[pymethods] -impl PyRunningRaphtoryServer { - /// Stop the server. - pub(crate) fn stop(&self) -> PyResult<()> { - self.apply_if_alive(|handler| { + fn wait_for_server_online(url: &String, timeout_in_milliseconds: Option) -> PyResult<()> { + let millis = timeout_in_milliseconds.unwrap_or(5000); + let num_intervals = millis / WAIT_CHECK_INTERVAL_MILLIS; + + for _ in 0..num_intervals { + if is_online(url)? { + return Ok(()); + } else { + sleep(Duration::from_millis(WAIT_CHECK_INTERVAL_MILLIS)) + } + } + + Err(PyException::new_err(format!( + "Failed to start server in {} milliseconds", + millis + ))) + } + + fn stop_server(&mut self, py: Python) -> PyResult<()> { + Self::apply_if_alive(self, |handler| { handler .sender .send(BridgeCommand::StopServer) .expect("Failed when sending cancellation signal"); Ok(()) - }) - } - - /// Wait until server completion. - pub(crate) fn wait(mut slf: PyRefMut, py: Python) -> PyResult<()> { - let server = &mut slf.server_handler; + })?; + let server = &mut self.server_handler; py.allow_threads(|| wait_server(server)) } +} - /// Wait for the server to be online. - /// - /// Arguments: - /// * `timeout_millis`: the timeout in milliseconds (default 5000). - fn wait_for_online(&self, timeout_millis: Option) -> PyResult<()> { - self.apply_if_alive(|handler| handler.client.wait_for_online(timeout_millis)) +#[pymethods] +impl PyRunningRaphtoryServer { + pub(crate) fn get_client(&self) -> PyResult { + self.apply_if_alive(|handler| Ok(handler.client.clone())) } - /// Make a graphQL query against the server. - /// - /// Arguments: - /// * `query`: the query to make. - /// * `variables`: a dict of variables present on the query and their values. - /// - /// Returns: - /// The `data` field from the graphQL response. - fn query( - &self, - py: Python, - query: String, - variables: Option>, - ) -> PyResult> { - self.apply_if_alive(|handler| handler.client.query(py, query, variables)) - } - - /// Send a graph to the server. - /// - /// Arguments: - /// * `name`: the name of the graph sent. - /// * `graph`: the graph to send. - /// - /// Returns: - /// The `data` field from the graphQL response after executing the mutation. - fn send_graph( - &self, - py: Python, - name: String, - graph: MaterializedGraph, - ) -> PyResult> { - self.apply_if_alive(|handler| handler.client.send_graph(py, name, graph)) + /// Stop the server and wait for it to finish + pub(crate) fn stop(mut slf: PyRefMut, py: Python) -> PyResult<()> { + slf.stop_server(py) } +} - /// Set the server to load all the graphs from its path `path`. - /// - /// Arguments: - /// * `path`: the path to load the graphs from. - /// * `overwrite`: whether or not to overwrite existing graphs (defaults to False) - /// - /// Returns: - /// The `data` field from the graphQL response after executing the mutation. - #[pyo3(signature = (path, overwrite = false))] - fn load_graphs_from_path( - &self, - py: Python, - path: String, - overwrite: bool, - ) -> PyResult> { - self.apply_if_alive(|handler| handler.client.load_graphs_from_path(py, path, overwrite)) +fn is_online(url: &String) -> PyResult { + match reqwest::blocking::get(url) { + Ok(response) => Ok(response.status().as_u16() == 200), + _ => Ok(false), } } @@ -581,7 +588,7 @@ impl PyRunningRaphtoryServer { #[derive(Clone)] #[pyclass(name = "RaphtoryClient")] pub(crate) struct PyRaphtoryClient { - url: String, + pub(crate) url: String, } impl PyRaphtoryClient { @@ -659,13 +666,6 @@ impl PyRaphtoryClient { ))), } } - - fn is_online(&self) -> bool { - match reqwest::blocking::get(&self.url) { - Ok(response) => response.status().as_u16() == 200, - _ => false, - } - } } const WAIT_CHECK_INTERVAL_MILLIS: u64 = 200; @@ -677,32 +677,12 @@ impl PyRaphtoryClient { Self { url } } - /// Wait for the server to be online. + /// Check if the server is online. /// - /// Arguments: - /// * `millis`: the minimum number of milliseconds to wait (default 5000). - fn wait_for_online(&self, millis: Option) -> PyResult<()> { - let millis = millis.unwrap_or(5000); - let num_intervals = millis / WAIT_CHECK_INTERVAL_MILLIS; - - let mut online = false; - for _ in 0..num_intervals { - if self.is_online() { - online = true; - break; - } else { - sleep(Duration::from_millis(WAIT_CHECK_INTERVAL_MILLIS)) - } - } - - if online { - Ok(()) - } else { - Err(PyException::new_err(format!( - "Failed to connect to the server after {} milliseconds", - millis - ))) - } + /// Returns: + /// Returns true if server is online otherwise false. + fn is_server_online(&self) -> PyResult { + is_online(&self.url) } /// Make a graphQL query against the server. @@ -775,7 +755,7 @@ impl PyRaphtoryClient { /// /// Arguments: /// * `path`: the path to load the graphs from. - /// * `overwrite`: whether or not to overwrite existing graphs (defaults to False) + /// * `overwrite`: overwrite existing graphs (defaults to False) /// /// Returns: /// The `data` field from the graphQL response after executing the mutation. diff --git a/python/tests/test_algorithms.py b/python/tests/test_algorithms.py index 8c9880c8bf..35ea620beb 100644 --- a/python/tests/test_algorithms.py +++ b/python/tests/test_algorithms.py @@ -192,7 +192,6 @@ def test_algo_result(): expected_result = pd.DataFrame({"Key": [1], "Value": [1]}) row_with_one = df[df["Key"] == 1] row_with_one.reset_index(inplace=True, drop=True) - print(row_with_one) assert row_with_one.equals(expected_result) # Algo Str u64 actual = algorithms.weakly_connected_components(g) diff --git a/python/tests/test_graphdb.py b/python/tests/test_graphdb.py index fe732bd179..e507b76638 100644 --- a/python/tests/test_graphdb.py +++ b/python/tests/test_graphdb.py @@ -1872,7 +1872,6 @@ def test_layers_earliest_time(): g = Graph() e = g.add_edge(1, 1, 2, layer="test") e = g.edge(1, 2) - print(e) assert e.earliest_time == 1 @@ -1895,7 +1894,6 @@ def test_edge_explode_layers(): e_layers = [ee.layer_names for ee in layered_edges] e_layer_prop = [[str(ee.properties["layer"])] for ee in layered_edges] assert e_layers == e_layer_prop - print(e_layers) nested_layered_edges = g.nodes.out_edges.explode_layers() e_layers = [[ee.layer_names for ee in edges] for edges in nested_layered_edges] @@ -1904,11 +1902,8 @@ def test_edge_explode_layers(): for layered_edges in nested_layered_edges ] assert e_layers == e_layer_prop - print(e_layers) - print(g.nodes.out_neighbours.collect) nested_layered_edges = g.nodes.out_neighbours.out_edges.explode_layers() - print(nested_layered_edges) e_layers = [ [ee.layer_names for ee in layered_edges] for layered_edges in nested_layered_edges @@ -1918,7 +1913,6 @@ def test_edge_explode_layers(): for layered_edges in nested_layered_edges ] assert e_layers == e_layer_prop - print(e_layers) def test_starend_edges(): diff --git a/python/tests/test_graphql.py b/python/tests/test_graphql.py index ec3a5a6193..6cf0bc51b5 100644 --- a/python/tests/test_graphql.py +++ b/python/tests/test_graphql.py @@ -1,53 +1,99 @@ -import sys import tempfile -from raphtory import Graph from raphtory.graphql import RaphtoryServer, RaphtoryClient +from raphtory import graph_loader +from raphtory import Graph +import json + + +def test_failed_server_start_in_time(): + tmp_work_dir = tempfile.mkdtemp() + server = None + try: + server = RaphtoryServer(tmp_work_dir).start(timeout_in_milliseconds=1) + except Exception as e: + assert str(e) == "Failed to start server in 1 milliseconds" + finally: + if server: + server.stop() + + +def test_successful_server_start_in_time(): + tmp_work_dir = tempfile.mkdtemp() + server = RaphtoryServer(tmp_work_dir).start(timeout_in_milliseconds=3000) + client = server.get_client() + assert client.is_server_online() + server.stop() + assert not client.is_server_online() + + +def test_server_start_on_default_port(): + g = Graph() + g.add_edge(1, "ben", "hamza") + g.add_edge(2, "haaroon", "hamza") + g.add_edge(3, "ben", "haaroon") + + graphs = {"g": g} + tmp_work_dir = tempfile.mkdtemp() + server = RaphtoryServer(tmp_work_dir, graphs=graphs).start() + client = RaphtoryClient("http://localhost:1736") + + query = """{graph(name: "g") {nodes {list {name}}}}""" + assert client.query(query) == { + "graph": { + "nodes": {"list": [{"name": "ben"}, {"name": "hamza"}, {"name": "haaroon"}]} + } + } + + server.stop() + +def test_server_start_on_custom_port(): + g = Graph() + g.add_edge(1, "ben", "hamza") + g.add_edge(2, "haaroon", "hamza") + g.add_edge(3, "ben", "haaroon") + + graphs = {"g": g} + tmp_work_dir = tempfile.mkdtemp() + server = RaphtoryServer(tmp_work_dir, graphs=graphs).start(port=1737) + client = RaphtoryClient("http://localhost:1737") + + query = """{graph(name: "g") {nodes {list {name}}}}""" + assert client.query(query) == { + "graph": { + "nodes": {"list": [{"name": "ben"}, {"name": "hamza"}, {"name": "haaroon"}]} + } + } + + server.stop() + -def test_graphql2(): +def test_load_graphs_from_graph_paths_when_starting_server(): g1 = Graph() g1.add_edge(1, "ben", "hamza") g1.add_edge(2, "haaroon", "hamza") g1.add_edge(3, "ben", "haaroon") - + g1_file_path = tempfile.mkdtemp() + "/g1" + g1.save_to_file(g1_file_path) g2 = Graph() g2.add_edge(1, "Naomi", "Shivam") g2.add_edge(2, "Shivam", "Pedro") g2.add_edge(3, "Pedro", "Rachel") - graphs = {"g1": g1, "g2": g2} - - g3 = Graph() - g3.add_edge(1, "ben_saved", "hamza_saved") - g3.add_edge(2, "haaroon_saved", "hamza_saved") - g3.add_edge(3, "ben_saved", "haaroon_saved") - - g4 = Graph() - g4.add_edge(1, "Naomi_saved", "Shivam_saved") - g4.add_edge(2, "Shivam_saved", "Pedro_saved") - g4.add_edge(3, "Pedro_saved", "Rachel_saved") - - temp_dir = tempfile.mkdtemp() - - g3.save_to_file(temp_dir + "/g3") - g4.save_to_file(temp_dir + "/g4") + g2_file_path = tempfile.mkdtemp() + "/g2" + g2.save_to_file(g2_file_path) tmp_work_dir = tempfile.mkdtemp() - server = RaphtoryServer(tmp_work_dir, graphs=graphs).start(port=1751) - server.wait_for_online() + server = RaphtoryServer(tmp_work_dir, graph_paths=[g1_file_path, g2_file_path]).start() + client = server.get_client() query_g1 = """{graph(name: "g1") {nodes {list {name}}}}""" - query_g1_window = """{graph(name: "g1") {nodes {before(time: 2) {list {name}}}}}""" query_g2 = """{graph(name: "g2") {nodes {list {name}}}}""" - - assert server.query(query_g1) == { + assert client.query(query_g1) == { "graph": { "nodes": {"list": [{"name": "ben"}, {"name": "hamza"}, {"name": "haaroon"}]} } } - assert server.query(query_g1_window) == { - "graph": {"nodes": {"before": {"list": [{"name": "ben"}, {"name": "hamza"}]}}} - } - assert server.query(query_g2) == { + assert client.query(query_g2) == { "graph": { "nodes": { "list": [ @@ -61,116 +107,132 @@ def test_graphql2(): } server.stop() - server.wait() + +def test_send_graphs_to_server(): + g = Graph() + g.add_edge(1, "ben", "hamza") + g.add_edge(2, "haaroon", "hamza") + g.add_edge(3, "ben", "haaroon") + + tmp_work_dir = tempfile.mkdtemp() + server = RaphtoryServer(tmp_work_dir).start() + client = RaphtoryClient("http://localhost:1736") + client.send_graph(name="g", graph=g) -def test_graphqlclient(): - temp_dir = tempfile.mkdtemp() + query = """{graph(name: "g") {nodes {list {name}}}}""" + assert client.query(query) == { + "graph": { + "nodes": {"list": [{"name": "ben"}, {"name": "hamza"}, {"name": "haaroon"}]} + } + } + + server.stop() + +def test_load_graphs_from_path(): g1 = Graph() g1.add_edge(1, "ben", "hamza") g1.add_edge(2, "haaroon", "hamza") g1.add_edge(3, "ben", "haaroon") - graph_path = temp_dir + "/g1.bincode" - g1.save_to_file(graph_path) + g2 = Graph() + g2.add_edge(1, "Naomi", "Shivam") + g2.add_edge(2, "Shivam", "Pedro") + g2.add_edge(3, "Pedro", "Rachel") tmp_work_dir = tempfile.mkdtemp() - server = RaphtoryServer(tmp_work_dir, graph_paths=[graph_path]).start(port=1740) - client = RaphtoryClient("http://localhost:1740") - generic_client_test(client, temp_dir) - server.stop() - server.wait() + graphs = {"g1": g1, "g2": g2} + server = RaphtoryServer(tmp_work_dir, graphs=graphs).start() + client = server.get_client() - server2 = RaphtoryServer(tmp_work_dir, graph_paths=[graph_path]).start(port=1741) - client2 = RaphtoryClient("http://localhost:1741") - generic_client_test(client2, temp_dir) - server2.stop() - server2.wait() + g2 = Graph() + g2.add_edge(1, "shifu", "po") + g2.add_edge(2, "oogway", "phi") + g2.add_edge(3, "phi", "po") + tmp_dir = tempfile.mkdtemp() + g2_file_path = tmp_dir + "/g2" + g2.save_to_file(g2_file_path) - server3 = RaphtoryServer(tmp_work_dir, graph_paths=[graph_path]).start(port=1742) - client3 = RaphtoryClient("http://localhost:1742") - generic_client_test(client3, temp_dir) - server3.stop() - server3.wait() + # Since overwrite is False by default, it will not overwrite the existing graph g2 + client.load_graphs_from_path(tmp_dir) + query_g1 = """{graph(name: "g1") {nodes {list {name}}}}""" + query_g2 = """{graph(name: "g2") {nodes {list {name}}}}""" + assert client.query(query_g1) == { + "graph": { + "nodes": {"list": [{"name": "ben"}, {"name": "hamza"}, {"name": "haaroon"}]} + } + } + assert client.query(query_g2) == { + "graph": { + "nodes": { + "list": [ + {"name": "Naomi"}, + {"name": "Shivam"}, + {"name": "Pedro"}, + {"name": "Rachel"}, + ] + } + } + } -def generic_client_test(client, temp_dir): - client.wait_for_online() + server.stop() - # load a graph into the client from a path - res = client.load_graphs_from_path(temp_dir, True) - assert res == {"loadGraphsFromPath": ["g1.bincode"]} - # run a get nodes query and check the results - query = """query GetNodes($graphname: String!) { - graph(name: $graphname) { - nodes { - list { - name - } - } - } - }""" - variables = {"graphname": "g1.bincode"} - res = client.query(query, variables) - assert res == { +def test_load_graphs_from_path_overwrite(): + g1 = Graph() + g1.add_edge(1, "ben", "hamza") + g1.add_edge(2, "haaroon", "hamza") + g1.add_edge(3, "ben", "haaroon") + g2 = Graph() + g2.add_edge(1, "Naomi", "Shivam") + g2.add_edge(2, "Shivam", "Pedro") + g2.add_edge(3, "Pedro", "Rachel") + tmp_dir = tempfile.mkdtemp() + g2_file_path = tmp_dir + "/g2" + g2.save_to_file(g2_file_path) + + tmp_work_dir = tempfile.mkdtemp() + graphs = {"g1": g1, "g2": g2} + server = RaphtoryServer(tmp_work_dir, graphs=graphs).start() + client = server.get_client() + client.load_graphs_from_path(tmp_dir, True) + + query_g1 = """{graph(name: "g1") {nodes {list {name}}}}""" + query_g2 = """{graph(name: "g2") {nodes {list {name}}}}""" + assert client.query(query_g1) == { "graph": { "nodes": {"list": [{"name": "ben"}, {"name": "hamza"}, {"name": "haaroon"}]} } } - - # load a new graph into the client from a path - multi_graph_temp_dir = tempfile.mkdtemp() - g2 = Graph() - g2.add_edge(1, "ben", "hamza") - g2.add_edge(2, "haaroon", "hamza") - g2.save_to_file(multi_graph_temp_dir + "/g2.bincode") - g3 = Graph() - g3.add_edge(1, "shivam", "rachel") - g3.add_edge(2, "lucas", "shivam") - g3.save_to_file(multi_graph_temp_dir + "/g3.bincode") - res = client.load_graphs_from_path(multi_graph_temp_dir, True) - result_sorted = {"loadGraphsFromPath": sorted(res["loadGraphsFromPath"])} - assert result_sorted == {"loadGraphsFromPath": ["g2.bincode", "g3.bincode"]} - - # upload a graph - g4 = Graph() - g4.add_node(0, 1) - res = client.send_graph("hello", g4) - assert res == {"sendGraph": "hello"} - # Ensure the sent graph can be queried - query = """query GetNodes($graphname: String!) { - graph(name: $graphname) { - nodes { - list { - name - } + assert client.query(query_g2) == { + "graph": { + "nodes": { + "list": [ + {"name": "Naomi"}, + {"name": "Shivam"}, + {"name": "Pedro"}, + {"name": "Rachel"}, + ] } } - }""" - variables = {"graphname": "hello"} - res = client.query(query, variables) - assert res == {"graph": {"nodes": {"list": [{"name": "1"}]}}} - - -def test_windows_and_layers(): - from raphtory import graph_loader - from raphtory import Graph - import time - import json - from raphtory.graphql import RaphtoryServer - - g_lotr = graph_loader.lotr_graph() - g_lotr.add_constant_properties({"name": "lotr"}) - g_layers = Graph() - g_layers.add_constant_properties({"name": "layers"}) - g_layers.add_edge(1, 1, 2, layer="layer1") - g_layers.add_edge(1, 2, 3, layer="layer2") - hm = {"lotr": g_lotr, "layers": g_layers} + } + + server.stop() + + +def test_graph_windows_and_layers_query(): + g1 = graph_loader.lotr_graph() + g1.add_constant_properties({"name": "lotr"}) + g2 = Graph() + g2.add_constant_properties({"name": "layers"}) + g2.add_edge(1, 1, 2, layer="layer1") + g2.add_edge(1, 2, 3, layer="layer2") + tmp_work_dir = tempfile.mkdtemp() - server = RaphtoryServer(tmp_work_dir, graphs=hm).start() - client = RaphtoryClient("http://localhost:1736") - client.wait_for_online() + graphs = {"lotr": g1, "layers": g2} + server = RaphtoryServer(tmp_work_dir, graphs=graphs).start() + client = server.get_client() q = """ query GetEdges { graph(name: "lotr") { @@ -181,9 +243,7 @@ def test_windows_and_layers(): neighbours { list { name - before(time: 300) { - history - } + before(time: 300) { history } } } } @@ -198,30 +258,11 @@ def test_windows_and_layers(): "window": { "node": { "after": { - "history": [ - 555, - 562 - ], + "history": [555, 562], "neighbours": { "list": [ - { - "name": "Gandalf", - "before": { - "history": [ - 270 - ] - } - }, - { - "name": "Bilbo", - "before": { - "history": [ - 205, - 270, - 286 - ] - } - } + {"name": "Gandalf", "before": {"history": [270]}}, + {"name": "Bilbo", "before": {"history": [205, 270, 286]}} ] } } @@ -244,13 +285,7 @@ def test_windows_and_layers(): neighbours { list { name - layer(name: "layer2") { - neighbours { - list { - name - } - } - } + layer(name: "layer2") { neighbours { list { name } } } } } } @@ -265,20 +300,10 @@ def test_windows_and_layers(): "layer": { "name": "1", "neighbours": { - "list": [ - { + "list": [{ "name": "2", - "layer": { - "neighbours": { - "list": [ - { - "name": "3" - } - ] - } - } - } - ] + "layer": {"neighbours": {"list": [{ "name": "3" }]}} + }] } } } @@ -286,78 +311,43 @@ def test_windows_and_layers(): } """ - a = json.dumps(server.query(q)) + a = json.dumps(client.query(q)) json_a = json.loads(a) json_ra = json.loads(ra) assert json_a == json_ra + server.stop() - server.wait() - -def test_properties(): - from raphtory import Graph - import json - from raphtory.graphql import RaphtoryServer +def test_graph_properties_query(): g = Graph() - g.add_constant_properties({"name": "graph"}) - g.add_node( - 1, - 1, - { - "prop1": "val1", - "prop2": "val1", - "prop3": "val1", - "prop4": "val1", - }, - ) - g.add_node( - 2, - 1, - { - "prop1": "val2", - "prop2": "val2", - "prop3": "val2", - "prop4": "val2", - }, - ) - n = g.add_node( - 3, - 1, - { - "prop1": "val3", - "prop2": "val3", - "prop3": "val3", - "prop4": "val3", - }, - ) - n.add_constant_properties( - {"prop5": "val4", "prop6": "val4", "prop7": "val4", "prop8": "val4"} - ) - # g.save_to_file('/tmp/graphs/graph') + g.add_constant_properties({"name": "g"}) + g.add_node(1, 1, {"prop1": "val1", "prop2": "val1"}) + g.add_node(2, 1, {"prop1": "val2", "prop2": "val2"}) + n = g.add_node(3, 1, {"prop1": "val3", "prop2": "val3"}) + n.add_constant_properties({"prop5": "val4"}) tmp_work_dir = tempfile.mkdtemp() - server = RaphtoryServer(tmp_work_dir, graphs={"graph": g}).start() - client = RaphtoryClient("http://localhost:1736") - client.wait_for_online() + server = RaphtoryServer(tmp_work_dir, graphs={"g": g}).start() + client = server.get_client() q = """ query GetEdges { - graph(name: "graph") { + graph(name: "g") { nodes { - list{ + list { properties { - values(keys:["prop1","prop2"]){ + values(keys:["prop1"]) { key asString } - temporal{ - values(keys:["prop3","prop4"]){ + temporal { + values(keys:["prop2"]) { key history } } - constant{ - values(keys:["prop4","prop5","prop6"]){ + constant { + values(keys:["prop5"]) { key value } @@ -375,47 +365,12 @@ def test_properties(): "list": [ { "properties": { - "values": [ - { - "key": "prop2", - "asString": "val3" - }, - { - "key": "prop1", - "asString": "val3" - } - ], + "values": [{ "key": "prop1", "asString": "val3" }], "temporal": { - "values": [ - { - "key": "prop4", - "history": [ - 1, - 2, - 3 - ] - }, - { - "key": "prop3", - "history": [ - 1, - 2, - 3 - ] - } - ] + "values": [{"key": "prop2", "history": [1, 2, 3]}] }, "constant": { - "values": [ - { - "key": "prop5", - "value": "val4" - }, - { - "key": "prop6", - "value": "val4" - } - ] + "values": [{"key": "prop5", "value": "val4"}] } } } @@ -427,7 +382,6 @@ def test_properties(): s = client.query(q) json_a = json.loads(json.dumps(s)) json_ra = json.loads(r) - print(json_a) assert sorted( json_a["graph"]["nodes"]["list"][0]["properties"]["constant"]["values"], key=lambda x: x["key"], @@ -450,4 +404,3 @@ def test_properties(): key=lambda x: x["key"], ) server.stop() - server.wait() diff --git a/raphtory-graphql/config.toml b/raphtory-graphql/config.toml deleted file mode 100644 index 9ca6d7a71d..0000000000 --- a/raphtory-graphql/config.toml +++ /dev/null @@ -1,6 +0,0 @@ -[logging] -log_level = "INFO" - -[cache] -capacity = 30 -tti_seconds = 900 diff --git a/raphtory-graphql/src/data.rs b/raphtory-graphql/src/data.rs index 4a8e2802f6..cb2ad095ae 100644 --- a/raphtory-graphql/src/data.rs +++ b/raphtory-graphql/src/data.rs @@ -1,6 +1,6 @@ use crate::{ model::algorithms::global_plugins::GlobalPlugins, - server_config::{load_config, CacheConfig}, + server_config::{load_config, AppConfig, CacheConfig}, }; use async_graphql::Error; use dynamic_graphql::Result; @@ -31,16 +31,11 @@ pub struct Data { impl Data { pub fn new( work_dir: &Path, - maybe_graphs: Option>, - maybe_graph_paths: Option>, - maybe_cache_config: Option, + graphs: Option>, + graph_paths: Option>, + configs: &AppConfig, ) -> Self { - let cache_configs = if maybe_cache_config.is_some() { - maybe_cache_config.unwrap() - } else { - let app_config = load_config().expect("Failed to load config file"); - app_config.cache - }; + let cache_configs = &configs.cache; let graphs_cache_builder = Cache::builder() .max_capacity(cache_configs.capacity) @@ -50,10 +45,10 @@ impl Data { let graphs_cache: Arc>> = Arc::new(graphs_cache_builder); - save_graphs_to_work_dir(work_dir, &maybe_graphs.unwrap_or_default()) + save_graphs_to_work_dir(work_dir, &graphs.unwrap_or_default()) .expect("Failed to save graphs to work dir"); - load_graphs_from_paths(work_dir, maybe_graph_paths.unwrap_or_default(), true) + load_graphs_from_paths(work_dir, graph_paths.unwrap_or_default(), true) .expect("Failed to save graph paths to work dir"); Self { @@ -341,7 +336,7 @@ mod data_tests { get_graph_from_path, get_graphs_from_work_dir, load_graph_from_path, load_graphs_from_path, load_graphs_from_paths, save_graphs_to_work_dir, Data, }, - server_config::CacheConfig, + server_config::{AppConfig, CacheConfig, LoggingConfig}, }; use itertools::Itertools; #[cfg(feature = "storage")] @@ -901,14 +896,19 @@ mod data_tests { let graph_path3 = tmp_graph_dir.path().join("test_g2"); graph.save_to_file(&graph_path3).unwrap(); + let configs = AppConfig { + logging: LoggingConfig::default(), + cache: CacheConfig { + capacity: 1, + tti_seconds: 2, + }, + }; + let data = Data::new( tmp_work_dir.path(), None, Some(vec![graph_path1, graph_path2, graph_path3]), - Some(CacheConfig { - capacity: 1, - tti_seconds: 2, - }), + configs, ); assert!(!data.graphs.contains_key("test_dg")); diff --git a/raphtory-graphql/src/lib.rs b/raphtory-graphql/src/lib.rs index 7f0a4eeb3c..ffe4c8b3d2 100644 --- a/raphtory-graphql/src/lib.rs +++ b/raphtory-graphql/src/lib.rs @@ -38,7 +38,7 @@ pub fn url_decode_graph>(graph: T) -> Result>, +} + +impl GqlGraphs { + pub fn new(graphs: Vec) -> Self { + Self { + graphs: graphs + .into_iter() + .map(|g| g.into_dynamic_indexed()) + .collect(), + } + } +} + +#[ResolvedObjectFields] +impl GqlGraphs { + async fn names(&self) -> Vec { + self.graphs + .iter() + .map(|g| g.properties().constant().get("name").unwrap().to_string()) + .collect() + } + + async fn properties(&self) -> Vec { + self.graphs + .iter() + .map(|g| Into::::into(g.properties()).into()) + .collect() + } + + async fn unique_layers(&self) -> Vec> { + self.graphs + .iter() + .map(|g| g.unique_layers().map_into().collect()) + .collect() + } +} diff --git a/raphtory-graphql/src/model/graph/mod.rs b/raphtory-graphql/src/model/graph/mod.rs index 5d41731dce..7d67eb65c9 100644 --- a/raphtory-graphql/src/model/graph/mod.rs +++ b/raphtory-graphql/src/model/graph/mod.rs @@ -1,6 +1,7 @@ pub(crate) mod edge; mod edges; pub(crate) mod graph; +pub(crate) mod graphs; pub(crate) mod node; mod nodes; mod path_from_node; diff --git a/raphtory-graphql/src/model/mod.rs b/raphtory-graphql/src/model/mod.rs index 637cc10f34..8ca9c5694e 100644 --- a/raphtory-graphql/src/model/mod.rs +++ b/raphtory-graphql/src/model/mod.rs @@ -2,7 +2,7 @@ use crate::{ data::{load_graphs_from_path, Data}, model::{ algorithms::global_plugins::GlobalPlugins, - graph::{graph::GqlGraph, vectorised_graph::GqlVectorisedGraph}, + graph::{graph::GqlGraph, graphs::GqlGraphs, vectorised_graph::GqlVectorisedGraph}, }, }; use async_graphql::Context; @@ -17,7 +17,7 @@ use raphtory::{ core::{utils::errors::GraphError, ArcStr, Prop}, db::api::view::MaterializedGraph, prelude::{GraphViewOps, ImportOps, NodeViewOps, PropertyAdditionOps}, - search::IndexedGraph, + search::{into_indexed::DynamicIndexedGraph, IndexedGraph}, }; use serde_json::Value; use std::{ @@ -82,14 +82,14 @@ impl QueryRoot { Some(g.into()) } - // TODO: Impl this as GqlGraphs instead - async fn graphs<'a>(ctx: &Context<'a>) -> Result> { + async fn graphs<'a>(ctx: &Context<'a>) -> Result> { let data = ctx.data_unchecked::(); - Ok(data + let graphs = data .get_graphs()? .iter() - .map(|(name, g)| GqlGraph::new(name.to_string(), (*g).clone())) - .collect_vec()) + .map(|(_, g)| (*g).clone()) + .collect_vec(); + Ok(Some(GqlGraphs::new(graphs))) } async fn plugins<'a>(ctx: &Context<'a>) -> GlobalPlugins { diff --git a/raphtory-graphql/src/server.rs b/raphtory-graphql/src/server.rs index d1430b927a..31b700a01e 100644 --- a/raphtory-graphql/src/server.rs +++ b/raphtory-graphql/src/server.rs @@ -35,11 +35,11 @@ use raphtory::{ use crate::{ data::get_graphs_from_work_dir, - server_config::{load_config, CacheConfig}, + server_config::{load_config, AppConfig, AuthConfig, CacheConfig, LoggingConfig}, }; use std::{ collections::HashMap, - env, fs, + fs, path::{Path, PathBuf}, sync::{Arc, Mutex}, }; @@ -61,25 +61,28 @@ use tracing_subscriber::{ /// A struct for defining and running a Raphtory GraphQL server pub struct RaphtoryServer { data: Data, + configs: AppConfig, } impl RaphtoryServer { pub fn new( work_dir: &Path, - maybe_graphs: Option>, - maybe_graph_paths: Option>, - maybe_cache_config: Option, + graphs: Option>, + graph_paths: Option>, + cache_config: Option, + auth_config: Option, + config_path: Option<&Path>, ) -> Self { if !work_dir.exists() { fs::create_dir_all(work_dir).unwrap(); } - let data = Data::new( - work_dir, - maybe_graphs, - maybe_graph_paths, - maybe_cache_config, - ); - Self { data } + + let configs = + load_config(cache_config, auth_config, config_path).expect("Failed to load configs"); + + let data = Data::new(work_dir, graphs, graph_paths, &configs); + + Self { data, configs } } /// Vectorise a subset of the graphs of the server. @@ -162,11 +165,11 @@ impl RaphtoryServer { /// Start the server on the default port and return a handle to it. pub async fn start( self, - maybe_log_level: Option<&str>, + log_level: Option<&str>, enable_tracing: bool, enable_auth: bool, ) -> RunningRaphtoryServer { - self.start_with_port(1736, maybe_log_level, enable_tracing, enable_auth) + self.start_with_port(1736, log_level, enable_tracing, enable_auth) .await } @@ -174,7 +177,7 @@ impl RaphtoryServer { pub async fn start_with_port( self, port: u16, - maybe_log_level: Option<&str>, + log_level: Option<&str>, enable_tracing: bool, enable_auth: bool, ) -> RunningRaphtoryServer { @@ -202,9 +205,9 @@ impl RaphtoryServer { } } - fn setup_logger_from_config() { - let app_config = load_config().expect("Failed to load config file"); - let filter = EnvFilter::new(&app_config.logging.log_level); + fn setup_logger_from_config(configs: &LoggingConfig) { + let log_level = &configs.log_level; + let filter = EnvFilter::new(log_level); let subscriber = FmtSubscriber::builder().with_env_filter(filter).finish(); if let Err(err) = tracing::subscriber::set_global_default(subscriber) { eprintln!( @@ -214,19 +217,19 @@ impl RaphtoryServer { } } - fn configure_logger(maybe_log_level: Option<&str>) { - if let Some(log_level) = maybe_log_level { + fn configure_logger(log_level: Option<&str>, configs: &LoggingConfig) { + if let Some(log_level) = log_level { if let Some(log_level) = parse_log_level(log_level) { setup_logger_from_loglevel(log_level); } else { - setup_logger_from_config(); + setup_logger_from_config(configs); } } else { - setup_logger_from_config(); + setup_logger_from_config(configs); } } - configure_logger(maybe_log_level); + configure_logger(log_level, &self.configs.logging); let registry = Registry::default().with(tracing_subscriber::fmt::layer().pretty()); let env_filter = EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO")); @@ -299,22 +302,25 @@ impl RaphtoryServer { }; dotenv().ok(); - println!("Loading env"); - let client_id_str = env::var("CLIENT_ID").expect("CLIENT_ID not set"); - let client_secret_str = env::var("CLIENT_SECRET").expect("CLIENT_SECRET not set"); - let tenant_id_str = env::var("TENANT_ID").expect("TENANT_ID not set"); + let client_id = self.configs.auth.client_id.expect("No client id provided"); + let client_secret = self + .configs + .auth + .client_secret + .expect("No client secret provided"); + let tenant_id = self.configs.auth.tenant_id.expect("No tenant id provided"); - let client_id = ClientId::new(client_id_str); - let client_secret = ClientSecret::new(client_secret_str); + let client_id = ClientId::new(client_id); + let client_secret = ClientSecret::new(client_secret); let auth_url = AuthUrl::new(format!( "https://login.microsoftonline.com/{}/oauth2/v2.0/authorize", - tenant_id_str.clone() + tenant_id.clone() )) .expect("Invalid authorization endpoint URL"); let token_url = TokenUrl::new(format!( "https://login.microsoftonline.com/{}/oauth2/v2.0/token", - tenant_id_str.clone() + tenant_id.clone() )) .expect("Invalid token endpoint URL"); @@ -370,8 +376,8 @@ impl RaphtoryServer { } /// Run the server on the default port until completion. - pub async fn run(self, maybe_log_level: Option<&str>, enable_tracing: bool) -> IoResult<()> { - self.start(maybe_log_level, enable_tracing, false) + pub async fn run(self, log_level: Option<&str>, enable_tracing: bool) -> IoResult<()> { + self.start(log_level, enable_tracing, false) .await .wait() .await @@ -379,10 +385,10 @@ impl RaphtoryServer { pub async fn run_with_auth( self, - maybe_log_level: Option<&str>, + log_level: Option<&str>, enable_tracing: bool, ) -> IoResult<()> { - self.start(maybe_log_level, enable_tracing, true) + self.start(log_level, enable_tracing, true) .await .wait() .await @@ -392,10 +398,10 @@ impl RaphtoryServer { pub async fn run_with_port( self, port: u16, - maybe_log_level: Option<&str>, + log_level: Option<&str>, enable_tracing: bool, ) -> IoResult<()> { - self.start_with_port(port, maybe_log_level, enable_tracing, false) + self.start_with_port(port, log_level, enable_tracing, false) .await .wait() .await @@ -464,17 +470,12 @@ mod server_tests { use crate::server::RaphtoryServer; use chrono::prelude::*; - use raphtory::{ - core::Prop, - prelude::{AdditionOps, Graph}, - }; - use std::collections::HashMap; use tokio::time::{sleep, Duration}; #[tokio::test] - async fn test_server_stop() { + async fn test_server_start_stop() { let tmp_dir = tempfile::tempdir().unwrap(); - let server = RaphtoryServer::new(tmp_dir.path(), None, None, None); + let server = RaphtoryServer::new(tmp_dir.path(), None, None, None, None, None); println!("calling start at time {}", Local::now()); let handler = server.start_with_port(0, None, false, false); sleep(Duration::from_secs(1)).await; diff --git a/raphtory-graphql/src/server_config.rs b/raphtory-graphql/src/server_config.rs index 6375a11047..89b22e4d72 100644 --- a/raphtory-graphql/src/server_config.rs +++ b/raphtory-graphql/src/server_config.rs @@ -1,28 +1,201 @@ -use config::{Config, ConfigError, Environment, File}; +use config::{Config, ConfigError, File}; use serde::Deserialize; +use std::path::Path; -#[derive(Debug, Deserialize)] -pub(crate) struct LoggingConfig { - pub(crate) log_level: String, +#[derive(Debug, Deserialize, PartialEq)] +pub struct LoggingConfig { + pub log_level: String, } -#[derive(Debug, Deserialize)] +impl Default for LoggingConfig { + fn default() -> Self { + LoggingConfig { + log_level: "INFO".to_string(), + } + } +} + +#[derive(Debug, Deserialize, PartialEq)] pub struct CacheConfig { pub capacity: u64, pub tti_seconds: u64, } -#[derive(Debug, Deserialize)] -pub(crate) struct AppConfig { - pub(crate) logging: LoggingConfig, - pub(crate) cache: CacheConfig, +impl Default for CacheConfig { + fn default() -> Self { + CacheConfig { + capacity: 30, + tti_seconds: 900, + } + } +} + +#[derive(Debug, Deserialize, PartialEq)] +pub struct AuthConfig { + pub client_id: Option, + pub client_secret: Option, + pub tenant_id: Option, +} + +impl Default for AuthConfig { + fn default() -> Self { + AuthConfig { + client_id: None, + client_secret: None, + tenant_id: None, + } + } +} + +#[derive(Debug, Deserialize, PartialEq)] +pub struct AppConfig { + pub logging: LoggingConfig, + pub cache: CacheConfig, + pub auth: AuthConfig, +} + +impl Default for AppConfig { + fn default() -> Self { + AppConfig { + logging: Default::default(), + cache: Default::default(), + auth: Default::default(), + } + } +} + +// Order of precedence of config loading: config args >> config path >> config default +// Note: Since config args takes precedence over config path, ensure not to provide config args when starting server from a compile rust instance. +// This would cause configs from config paths to be ignored. The reason it has been implemented so is to avoid having to pass all the configs as +// args from the python instance i.e., being able to provide configs from config path as default configs and yet give precedence to config args. +pub fn load_config( + cache_config: Option, + auth_config: Option, + config_path: Option<&Path>, +) -> Result { + let mut config_builder = Config::builder(); + if let Some(config_path) = config_path { + config_builder = config_builder.add_source(File::from(config_path)); + } + let settings = config_builder.build()?; + + // Load default configs + let mut loaded_config = AppConfig::default(); + + // Override with provided configs from config file if any + if let Some(log_level) = settings.get::("logging.log_level").ok() { + loaded_config.logging.log_level = log_level; + } + if let Some(capacity) = settings.get::("cache.capacity").ok() { + loaded_config.cache.capacity = capacity; + } + if let Some(tti_seconds) = settings.get::("cache.tti_seconds").ok() { + loaded_config.cache.tti_seconds = tti_seconds; + } + loaded_config.auth.client_id = settings.get::("auth.client_id").ok(); + loaded_config.auth.client_secret = settings.get::("auth.client_secret").ok(); + loaded_config.auth.tenant_id = settings.get::("auth.tenant_id").ok(); + + // Override with provided cache configs if any + if let Some(cache_config) = cache_config { + loaded_config.cache = cache_config; + } + + // Override with provided auth configs if any + if let Some(auth_config) = auth_config { + loaded_config.auth = auth_config; + } + + Ok(loaded_config) } -pub fn load_config() -> Result { - let settings = Config::builder() - .add_source(File::with_name("config")) - .add_source(Environment::with_prefix("APP")) - .build()?; +#[cfg(test)] +mod tests { + use super::*; + use std::fs; + + #[test] + fn test_load_config_from_toml() { + // Prepare a test TOML configuration file + let config_toml = r#" + [logging] + log_level = "DEBUG" + + [cache] + tti_seconds = 1000 + "#; + let config_path = Path::new("test_config.toml"); + fs::write(config_path, config_toml).unwrap(); + + // Load config using the test TOML file + let result = load_config(None, None, Some(config_path)); + let expected_config = AppConfig { + logging: LoggingConfig { + log_level: "DEBUG".to_string(), + }, + cache: CacheConfig { + capacity: 30, + tti_seconds: 1000, + }, + auth: AuthConfig::default(), + }; + + assert_eq!(result.unwrap(), expected_config); + + // Cleanup: delete the test TOML file + fs::remove_file(config_path).unwrap(); + } + + #[test] + fn test_load_config_with_custom_cache() { + // Prepare a custom cache configuration + let custom_cache = CacheConfig { + capacity: 50, + tti_seconds: 1200, + }; + + // Load config with custom cache configuration + let result = load_config(Some(custom_cache), None, None); + let expected_config = AppConfig { + logging: LoggingConfig { + log_level: "INFO".to_string(), + }, // Default logging level + cache: CacheConfig { + capacity: 50, + tti_seconds: 1200, + }, + auth: AuthConfig::default(), + }; + + assert_eq!(result.unwrap(), expected_config); + } + + #[test] + fn test_load_config_with_custom_auth() { + // Prepare a custom cache configuration + let custom_auth = AuthConfig { + client_id: Some("custom_client_id".to_string()), + client_secret: Some("custom_client_secret".to_string()), + tenant_id: Some("custom_tenant_id".to_string()), + }; + + // Load config with custom cache configuration + let result = load_config(None, Some(custom_auth), None); + let expected_config = AppConfig { + logging: LoggingConfig { + log_level: "INFO".to_string(), + }, // Default logging level + cache: CacheConfig { + capacity: 30, + tti_seconds: 900, + }, + auth: AuthConfig { + client_id: Some("custom_client_id".to_string()), + client_secret: Some("custom_client_secret".to_string()), + tenant_id: Some("custom_tenant_id".to_string()), + }, + }; - settings.try_deserialize::() + assert_eq!(result.unwrap(), expected_config); + } }