Skip to content

Commit

Permalink
Todos/graphql (#1672)
Browse files Browse the repository at this point in the history
* impl gqlgraphs

* properties, unique layers to gqlgraphs

* impl default configs, impl config load precedence, add tests

* impl auth configs

* wait for server to come online when started, also expose an api on client to see if server is already online, add tests

* add client tests

* fix tests

* add load graphs tests

* rid duplicate tests

* wait internalised

* rid silly auth config defaults
  • Loading branch information
shivam-880 authored Jun 28, 2024
1 parent b09f4fb commit 6528121
Show file tree
Hide file tree
Showing 13 changed files with 623 additions and 476 deletions.
194 changes: 87 additions & 107 deletions python/src/graphql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use raphtory_graphql::{
algorithm_entry_point::AlgorithmEntryPoint, document::GqlDocument,
global_plugins::GlobalPlugins, vector_algorithms::VectorAlgorithms,
},
server_config::CacheConfig,
server_config::*,
url_encode_graph, RaphtoryServer,
};
use reqwest::Client;
Expand Down Expand Up @@ -222,14 +222,17 @@ impl PyRaphtoryServer {
impl PyRaphtoryServer {
#[new]
#[pyo3(
signature = (work_dir, graphs = None, graph_paths = None, cache_capacity = 30, cache_tti_seconds = 900)
signature = (work_dir, graphs = None, graph_paths = None, cache_capacity = 30, cache_tti_seconds = 900, client_id = None, client_secret = None, tenant_id = None)
)]
fn py_new(
work_dir: String,
graphs: Option<HashMap<String, MaterializedGraph>>,
graph_paths: Option<Vec<String>>,
cache_capacity: u64,
cache_tti_seconds: u64,
client_id: Option<String>,
client_secret: Option<String>,
tenant_id: Option<String>,
) -> PyResult<Self> {
let graph_paths = graph_paths.map(|paths| paths.into_iter().map(PathBuf::from).collect());
let server = RaphtoryServer::new(
Expand All @@ -240,6 +243,12 @@ impl PyRaphtoryServer {
capacity: cache_capacity,
tti_seconds: cache_tti_seconds,
}),
Some(AuthConfig {
client_id,
client_secret,
tenant_id,
}),
None,
);
Ok(PyRaphtoryServer::new(server))
}
Expand Down Expand Up @@ -350,15 +359,21 @@ impl PyRaphtoryServer {
///
/// Arguments:
/// * `port`: the port to use (defaults to 1736).
/// * `log_level`: set log level (defaults to INFO).
/// * `enable_tracing`: enable tracing (defaults to False).
/// * `enable_auth`: enable authentication (defaults to False).
/// * `timeout_in_milliseconds`: wait for server to be online (defaults to 5000). The server is stopped if not online within timeout_in_milliseconds but manages to come online as soon as timeout_in_milliseconds finishes!
#[pyo3(
signature = (port = 1736, log_level = "INFO".to_string(), enable_tracing = false, enable_auth = false)
signature = (port = 1736, log_level = "INFO".to_string(), enable_tracing = false, enable_auth = false, timeout_in_milliseconds = None)
)]
pub fn start(
slf: PyRefMut<Self>,
py: Python,
port: u16,
log_level: String,
enable_tracing: bool,
enable_auth: bool,
timeout_in_milliseconds: Option<u64>,
) -> PyResult<PyRunningRaphtoryServer> {
let (sender, receiver) = crossbeam_channel::bounded::<BridgeCommand>(1);
let server = take_server_ownership(slf)?;
Expand Down Expand Up @@ -389,15 +404,29 @@ impl PyRaphtoryServer {
})
});

Ok(PyRunningRaphtoryServer::new(join_handle, sender, port))
let mut server = PyRunningRaphtoryServer::new(join_handle, sender, port);
if let Some(server_handler) = &server.server_handler {
match PyRunningRaphtoryServer::wait_for_server_online(
&server_handler.client.url,
timeout_in_milliseconds,
) {
Ok(_) => return Ok(server),
Err(e) => {
PyRunningRaphtoryServer::stop_server(&mut server, py)?;
Err(e)
}
}
} else {
Err(PyException::new_err("Failed to start server"))
}
}

/// Run the server until completion.
///
/// Arguments:
/// * `port`: the port to use (defaults to 1736).
#[pyo3(
signature = (port = 1736, log_level = "INFO".to_string(), enable_tracing = false, enable_auth = false)
signature = (port = 1736, log_level = "INFO".to_string(), enable_tracing = false, enable_auth = false, timeout_in_milliseconds = None)
)]
pub fn run(
slf: PyRefMut<Self>,
Expand All @@ -406,9 +435,18 @@ impl PyRaphtoryServer {
log_level: String,
enable_tracing: bool,
enable_auth: bool,
timeout_in_milliseconds: Option<u64>,
) -> PyResult<()> {
let mut server =
Self::start(slf, port, log_level, enable_tracing, enable_auth)?.server_handler;
let mut server = Self::start(
slf,
py,
port,
log_level,
enable_tracing,
enable_auth,
timeout_in_milliseconds,
)?
.server_handler;
py.allow_threads(|| wait_server(&mut server))
}
}
Expand Down Expand Up @@ -495,93 +533,62 @@ impl PyRunningRaphtoryServer {
None => Err(PyException::new_err(RUNNING_SERVER_CONSUMED_MSG)),
}
}
}

#[pymethods]
impl PyRunningRaphtoryServer {
/// Stop the server.
pub(crate) fn stop(&self) -> PyResult<()> {
self.apply_if_alive(|handler| {
fn wait_for_server_online(url: &String, timeout_in_milliseconds: Option<u64>) -> PyResult<()> {
let millis = timeout_in_milliseconds.unwrap_or(5000);
let num_intervals = millis / WAIT_CHECK_INTERVAL_MILLIS;

for _ in 0..num_intervals {
if is_online(url)? {
return Ok(());
} else {
sleep(Duration::from_millis(WAIT_CHECK_INTERVAL_MILLIS))
}
}

Err(PyException::new_err(format!(
"Failed to start server in {} milliseconds",
millis
)))
}

fn stop_server(&mut self, py: Python) -> PyResult<()> {
Self::apply_if_alive(self, |handler| {
handler
.sender
.send(BridgeCommand::StopServer)
.expect("Failed when sending cancellation signal");
Ok(())
})
}

/// Wait until server completion.
pub(crate) fn wait(mut slf: PyRefMut<Self>, py: Python) -> PyResult<()> {
let server = &mut slf.server_handler;
})?;
let server = &mut self.server_handler;
py.allow_threads(|| wait_server(server))
}
}

/// Wait for the server to be online.
///
/// Arguments:
/// * `timeout_millis`: the timeout in milliseconds (default 5000).
fn wait_for_online(&self, timeout_millis: Option<u64>) -> PyResult<()> {
self.apply_if_alive(|handler| handler.client.wait_for_online(timeout_millis))
#[pymethods]
impl PyRunningRaphtoryServer {
pub(crate) fn get_client(&self) -> PyResult<PyRaphtoryClient> {
self.apply_if_alive(|handler| Ok(handler.client.clone()))
}

/// Make a graphQL query against the server.
///
/// Arguments:
/// * `query`: the query to make.
/// * `variables`: a dict of variables present on the query and their values.
///
/// Returns:
/// The `data` field from the graphQL response.
fn query(
&self,
py: Python,
query: String,
variables: Option<HashMap<String, PyObject>>,
) -> PyResult<HashMap<String, PyObject>> {
self.apply_if_alive(|handler| handler.client.query(py, query, variables))
}

/// Send a graph to the server.
///
/// Arguments:
/// * `name`: the name of the graph sent.
/// * `graph`: the graph to send.
///
/// Returns:
/// The `data` field from the graphQL response after executing the mutation.
fn send_graph(
&self,
py: Python,
name: String,
graph: MaterializedGraph,
) -> PyResult<HashMap<String, PyObject>> {
self.apply_if_alive(|handler| handler.client.send_graph(py, name, graph))
/// Stop the server and wait for it to finish
pub(crate) fn stop(mut slf: PyRefMut<Self>, py: Python) -> PyResult<()> {
slf.stop_server(py)
}
}

/// Set the server to load all the graphs from its path `path`.
///
/// Arguments:
/// * `path`: the path to load the graphs from.
/// * `overwrite`: whether or not to overwrite existing graphs (defaults to False)
///
/// Returns:
/// The `data` field from the graphQL response after executing the mutation.
#[pyo3(signature = (path, overwrite = false))]
fn load_graphs_from_path(
&self,
py: Python,
path: String,
overwrite: bool,
) -> PyResult<HashMap<String, PyObject>> {
self.apply_if_alive(|handler| handler.client.load_graphs_from_path(py, path, overwrite))
fn is_online(url: &String) -> PyResult<bool> {
match reqwest::blocking::get(url) {
Ok(response) => Ok(response.status().as_u16() == 200),
_ => Ok(false),
}
}

/// A client for handling GraphQL operations in the context of Raphtory.
#[derive(Clone)]
#[pyclass(name = "RaphtoryClient")]
pub(crate) struct PyRaphtoryClient {
url: String,
pub(crate) url: String,
}

impl PyRaphtoryClient {
Expand Down Expand Up @@ -659,13 +666,6 @@ impl PyRaphtoryClient {
))),
}
}

fn is_online(&self) -> bool {
match reqwest::blocking::get(&self.url) {
Ok(response) => response.status().as_u16() == 200,
_ => false,
}
}
}

const WAIT_CHECK_INTERVAL_MILLIS: u64 = 200;
Expand All @@ -677,32 +677,12 @@ impl PyRaphtoryClient {
Self { url }
}

/// Wait for the server to be online.
/// Check if the server is online.
///
/// Arguments:
/// * `millis`: the minimum number of milliseconds to wait (default 5000).
fn wait_for_online(&self, millis: Option<u64>) -> PyResult<()> {
let millis = millis.unwrap_or(5000);
let num_intervals = millis / WAIT_CHECK_INTERVAL_MILLIS;

let mut online = false;
for _ in 0..num_intervals {
if self.is_online() {
online = true;
break;
} else {
sleep(Duration::from_millis(WAIT_CHECK_INTERVAL_MILLIS))
}
}

if online {
Ok(())
} else {
Err(PyException::new_err(format!(
"Failed to connect to the server after {} milliseconds",
millis
)))
}
/// Returns:
/// Returns true if server is online otherwise false.
fn is_server_online(&self) -> PyResult<bool> {
is_online(&self.url)
}

/// Make a graphQL query against the server.
Expand Down Expand Up @@ -775,7 +755,7 @@ impl PyRaphtoryClient {
///
/// Arguments:
/// * `path`: the path to load the graphs from.
/// * `overwrite`: whether or not to overwrite existing graphs (defaults to False)
/// * `overwrite`: overwrite existing graphs (defaults to False)
///
/// Returns:
/// The `data` field from the graphQL response after executing the mutation.
Expand Down
1 change: 0 additions & 1 deletion python/tests/test_algorithms.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,6 @@ def test_algo_result():
expected_result = pd.DataFrame({"Key": [1], "Value": [1]})
row_with_one = df[df["Key"] == 1]
row_with_one.reset_index(inplace=True, drop=True)
print(row_with_one)
assert row_with_one.equals(expected_result)
# Algo Str u64
actual = algorithms.weakly_connected_components(g)
Expand Down
6 changes: 0 additions & 6 deletions python/tests/test_graphdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -1872,7 +1872,6 @@ def test_layers_earliest_time():
g = Graph()
e = g.add_edge(1, 1, 2, layer="test")
e = g.edge(1, 2)
print(e)
assert e.earliest_time == 1


Expand All @@ -1895,7 +1894,6 @@ def test_edge_explode_layers():
e_layers = [ee.layer_names for ee in layered_edges]
e_layer_prop = [[str(ee.properties["layer"])] for ee in layered_edges]
assert e_layers == e_layer_prop
print(e_layers)

nested_layered_edges = g.nodes.out_edges.explode_layers()
e_layers = [[ee.layer_names for ee in edges] for edges in nested_layered_edges]
Expand All @@ -1904,11 +1902,8 @@ def test_edge_explode_layers():
for layered_edges in nested_layered_edges
]
assert e_layers == e_layer_prop
print(e_layers)

print(g.nodes.out_neighbours.collect)
nested_layered_edges = g.nodes.out_neighbours.out_edges.explode_layers()
print(nested_layered_edges)
e_layers = [
[ee.layer_names for ee in layered_edges]
for layered_edges in nested_layered_edges
Expand All @@ -1918,7 +1913,6 @@ def test_edge_explode_layers():
for layered_edges in nested_layered_edges
]
assert e_layers == e_layer_prop
print(e_layers)


def test_starend_edges():
Expand Down
Loading

0 comments on commit 6528121

Please sign in to comment.