diff --git a/.github/workflows/_release_rust.yml b/.github/workflows/_release_rust.yml index ffa4783552..a05fcdceb9 100644 --- a/.github/workflows/_release_rust.yml +++ b/.github/workflows/_release_rust.yml @@ -54,8 +54,12 @@ jobs: with: command: install args: cargo-release --force - - name: "Hide arrow dependencies" - run: ./scripts/hide_arrow_dep.sh + - name: "Publish raphtory-arrow to crates.io" + if: ${{ !inputs.dry_run }} + uses: actions-rs/cargo@v1 + with: + command: publish + args: --token ${{ secrets.CRATES_TOKEN }} --package raphtory-arrow --allow-dirty - name: "Publish raphtory to crates.io" if: ${{ !inputs.dry_run }} uses: actions-rs/cargo@v1 diff --git a/.github/workflows/test_python_workflow.yml b/.github/workflows/test_python_workflow.yml index b78676a02f..02aa9eaa50 100644 --- a/.github/workflows/test_python_workflow.yml +++ b/.github/workflows/test_python_workflow.yml @@ -59,6 +59,8 @@ jobs: with: python-version: ${{ matrix.python }} cache: 'pip' + - name: Flip raphtory-arrow in Cargo.toml + run: python ./scripts/flip_ra.py Cargo.toml - name: Run Maturin develop uses: PyO3/maturin-action@v1 with: diff --git a/.github/workflows/test_rust_workflow.yml b/.github/workflows/test_rust_workflow.yml index 07c5d68df7..680df601e2 100644 --- a/.github/workflows/test_rust_workflow.yml +++ b/.github/workflows/test_rust_workflow.yml @@ -52,6 +52,8 @@ jobs: name: Cargo cache with: cache-all-crates: true + - name: Flip raphtory-arrow in Cargo.toml + run: python ./scripts/flip_ra.py - name: Install bininstall uses: cargo-bins/cargo-binstall@main - name: Install nextest @@ -61,7 +63,7 @@ jobs: RUSTFLAGS: -Awarnings TEMPDIR: ${{ runner.temp }} run: | - cargo nextest run --all --no-default-features + cargo nextest run --all --no-default-features --features "arrow" - name: Run Tests (features=io) env: RUSTFLAGS: -Awarnings diff --git a/Cargo.lock b/Cargo.lock index a385ab7f09..bd9dc4cfbb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2122,16 +2122,6 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" -[[package]] -name = "graph500" -version = "0.8.1" -dependencies = [ - "raphtory", - "raphtory-arrow", - "raphtory-storage", - "serde_json", -] - [[package]] name = "h2" version = "0.3.26" @@ -2525,16 +2515,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "lanl" -version = "0.8.1" -dependencies = [ - "pyo3", - "raphtory", - "raphtory-arrow", - "raphtory-storage", -] - [[package]] name = "lazy_static" version = "1.4.0" @@ -4031,42 +4011,25 @@ dependencies = [ [[package]] name = "raphtory-arrow" version = "0.8.1" -source = "git+ssh://git@github.com/Pometry/raphtory-arrow.git?branch=master#a50946715f4ed7e7644f2eea851a64d938137142" -dependencies = [ - "ahash", - "bincode", - "bytemuck", - "itertools 0.12.1", - "memmap2", - "num-traits", - "once_cell", - "parking_lot", - "polars-arrow", - "polars-parquet", - "polars-utils", - "rayon", - "serde", - "serde_json", - "strum 0.26.2", - "tempfile", - "thiserror", - "tracing", - "tracing-subscriber", - "twox-hash", -] [[package]] name = "raphtory-benchmark" version = "0.8.1" dependencies = [ + "chrono", + "clap", "criterion", + "csv", + "flate2", "polars-arrow", "rand 0.8.5", "raphtory", "raphtory-arrow", + "raphtory-graphql", "rayon", "sorted_vector_map", "tempfile", + "tokio", ] [[package]] @@ -4148,43 +4111,12 @@ dependencies = [ "pyo3-build-config", "raphtory", "raphtory-graphql", - "raphtory-storage", "reqwest", "serde", "serde_json", "tokio", ] -[[package]] -name = "raphtory-rust-benchmark" -version = "0.8.1" -dependencies = [ - "chrono", - "clap", - "csv", - "flate2", - "raphtory", -] - -[[package]] -name = "raphtory-storage" -version = "0.8.1" -dependencies = [ - "ahash", - "dashmap", - "itertools 0.12.1", - "parking_lot", - "polars-arrow", - "proptest", - "pyo3", - "raphtory", - "raphtory-arrow", - "rayon", - "serde", - "serde_json", - "tempfile", -] - [[package]] name = "raw-cpuid" version = "11.0.2" @@ -4413,14 +4345,6 @@ dependencies = [ "serde", ] -[[package]] -name = "run_server" -version = "0.8.1" -dependencies = [ - "raphtory-graphql", - "tokio", -] - [[package]] name = "rust-ini" version = "0.19.0" diff --git a/Cargo.toml b/Cargo.toml index a7798b7bc7..b0c51900e5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,18 +1,14 @@ [workspace] members = [ "raphtory", - "raphtory-storage", "raphtory-cypher", "raphtory-benchmark", + "raphtory-arrow", "examples/rust", "examples/netflow", "python", "js-raphtory", "raphtory-graphql", - "comparison-benchmark/rust/raphtory-rust-benchmark", - "comparison-benchmark/graphql-benchmark/run_server", - "examples/lanl", - "examples/graph500" ] default-members = ["raphtory"] resolver = "2" @@ -134,9 +130,6 @@ arrow-schema = { version = "50" } arrow-data = { version = "50" } arrow-array = { version = "50" } -# raphtory-arrow = { path = "../raphtory-arrow" } -raphtory-arrow = { git = "ssh://git@github.com/Pometry/raphtory-arrow.git", branch = "master" } - # Make sure that transitive dependencies stick to arrow 50 [patch.crates-io] arrow = { git = "https://github.com/apache/arrow-rs.git", tag = "50.0.0" } diff --git a/Makefile b/Makefile index b1b4fa8f30..706a308552 100644 --- a/Makefile +++ b/Makefile @@ -10,7 +10,6 @@ rust-build-docs: cargo doc --no-deps -p raphtory -q rust-build-readthedocs: - ./scripts/hide_arrow_dep.sh cargo doc --no-deps -p raphtory -q --target-dir $(RUST_READTHEDOCS_DOCS_TARGET) rm -rf $(RUST_READTHEDOCS_DOCS_TARGET)/debug mv $(RUST_READTHEDOCS_DOCS_TARGET)/doc/* $(RUST_READTHEDOCS_DOCS_TARGET) diff --git a/comparison-benchmark/graphql-benchmark/run_server/Cargo.toml b/comparison-benchmark/graphql-benchmark/run_server/Cargo.toml deleted file mode 100644 index 73903efdc6..0000000000 --- a/comparison-benchmark/graphql-benchmark/run_server/Cargo.toml +++ /dev/null @@ -1,19 +0,0 @@ -[package] -name = "run_server" -version.workspace = true -documentation.workspace = true -repository.workspace = true -license.workspace = true -readme.workspace = true -homepage.workspace = true -keywords.workspace = true -authors.workspace = true -rust-version.workspace = true -edition.workspace = true - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -raphtory-graphql = { path = "../../../raphtory-graphql", version = "0.8.1" } -tokio = { workspace = true } - diff --git a/comparison-benchmark/rust/raphtory-rust-benchmark/Cargo.toml b/comparison-benchmark/rust/raphtory-rust-benchmark/Cargo.toml deleted file mode 100644 index 9ec32646b9..0000000000 --- a/comparison-benchmark/rust/raphtory-rust-benchmark/Cargo.toml +++ /dev/null @@ -1,22 +0,0 @@ -[package] -name = "raphtory-rust-benchmark" -description = "Raphtory Quick Rust Benchmark" -edition.workspace = true -rust-version.workspace = true -version.workspace = true -keywords.workspace = true -authors.workspace = true -documentation.workspace = true -repository.workspace = true -license.workspace = true -readme.workspace = true -homepage.workspace = true - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -raphtory = { path = "../../../raphtory", package = "raphtory", features = ["io"] } -chrono = { workspace = true } -clap = { workspace = true } -csv = { workspace = true } -flate2 = { workspace = true } diff --git a/examples/graph500/Cargo.toml b/examples/graph500/Cargo.toml deleted file mode 100644 index f5e6b93b5e..0000000000 --- a/examples/graph500/Cargo.toml +++ /dev/null @@ -1,20 +0,0 @@ -[package] -name = "graph500" -version.workspace = true -documentation.workspace = true -repository.workspace = true -license.workspace = true -readme.workspace = true -homepage.workspace = true -keywords.workspace = true -authors.workspace = true -rust-version.workspace = true -edition.workspace = true - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -raphtory = {path = "../../raphtory", features = ["python"]} -raphtory-arrow.workspace = true -raphtory-storage = {path = "../../raphtory-storage"} -serde_json.workspace = true diff --git a/examples/graph500/python/main.py b/examples/graph500/python/main.py deleted file mode 100644 index dc570fe022..0000000000 --- a/examples/graph500/python/main.py +++ /dev/null @@ -1,128 +0,0 @@ -import argparse -from raphtory import ArrowGraph -from raphtory import algorithms -import time -from typing import TypeVar, Callable -from tqdm import tqdm -import os -import sys -from raphtory import ArrowGraph, Query, State - - -B = TypeVar('B') - -def measure(name: str, f: Callable[..., B], *args, print_result: bool = True) -> B: - start_time = time.time() - result = f(*args) - elapsed_time = time.time() - start_time - - time_unit = "s" - elapsed_time_display = elapsed_time - if elapsed_time < 1: - time_unit = "ms" - elapsed_time_display *= 1000 - - if print_result: - print(f"Running {name}: time: {elapsed_time_display:.3f}{time_unit}, result: {result}") - else: - print(f"Running {name}: time: {elapsed_time_display:.3f}{time_unit}") - - return result - -def hop_queries(graph, nodes, layer, hops, temporal, start=None, keep_path=False): - q = Query.from_node_ids(nodes) - - for _ in range(hops): - q = q.out(layer) - - if temporal: - max_duration = 2**31 - 1 - state = State.path_window(keep_path=keep_path, start_t=start, duration=max_duration) - else: - state = State.no_state() - - q.run(graph, state) - -def static_multi_hop_single_node_query(graph: ArrowGraph): - hop_queries(graph, [0], "default", 3, False) - -def static_multi_hop_multi_node_query(graph: ArrowGraph): - nodes = list(range(100)) - hop_queries(graph, nodes, "default", 3, False) - -def multi_hop_single_node_query(graph: ArrowGraph): - hop_queries(graph, [0], "default", 3, True, 10, True) - -def multi_hop_multi_node_query(graph: ArrowGraph): - nodes = list(range(100)) - hop_queries(graph, nodes, "default", 3, True, 10, True) - -def main(graph_dir, resources_dir, chunk_size, t_props_chunk_size, read_chunk_size, concurrent_files, num_threads): - - if resources_dir is None or graph_dir is None: - raise ValueError("Both 'resources_dir' and 'target_dir' environment variables must be set.") - - print(f"Resources directory: {resources_dir}") - print(f"Target directory: {graph_dir}") - print(f"Chunk size: {chunk_size}") - print(f"t_props chunk size: {t_props_chunk_size}") - print(f"Read chunk size: {read_chunk_size}") - print(f"Concurrent files: {concurrent_files}") - print(f"Number of threads: {num_threads}") - - layer_parquet_cols = [ - { - "parquet_dir": resources_dir, - "layer": "default", - "src_col": "src", - "dst_col": "dst", - "time_col": "time", - } - ] - - # # Read the Parquet file - # table = pq.read_table(parquet_dir + '/part-00000-8b31eaa4-2bd9-4f07-b61c-a353aed2af22-c000.snappy.parquet') - # print(table.schema) - - print() - try: - g = measure("Graph load from dir", ArrowGraph.load_from_dir, graph_dir, print_result=False) - except Exception as e: - g = measure( - "Graph load from parquets", - ArrowGraph.load_from_parquets, - graph_dir, - layer_parquet_cols, - None, - chunk_size, - t_props_chunk_size, - read_chunk_size, - concurrent_files, - num_threads, - print_result=False - ) - - print("Nodes count =", g.count_nodes()) - print("Edges count =", g.count_edges()) - print("Earliest time =", g.earliest_time) - print("Latest time =", g.latest_time) - - measure("Static Multi Hop Single Node Query", static_multi_hop_single_node_query, g, print_result=False) - measure("Static Multi Hop Multi Node Query", static_multi_hop_multi_node_query, g, print_result=False) - measure("Temporal Multi Hop Singl Node Query", multi_hop_single_node_query, g, print_result=False) - measure("Temporal Multi Hop Multi Node Query", multi_hop_multi_node_query, g, print_result=False) - measure("CC", algorithms.connected_components, g, print_result=False) - measure("Page Rank", algorithms.pagerank, g, 100, print_result=False) - - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description='Run LANL example queries') - parser.add_argument('--graph-dir', type=str, help='The directory of the graph') - parser.add_argument('--resources-dir', type=str, help='paths to the parquet directory') - parser.add_argument('--chunk-size', type=int, default=268435456, help='Chunk size') - parser.add_argument('--t-props-chunk-size', type=int, default=20000000, help='t_props chunk size') - parser.add_argument('--read-chunk-size', type=int, default=4000000, help='Read chunk size') - parser.add_argument('--concurrent-files', type=int, default=1, help='Concurrent files') - parser.add_argument('--num-threads', type=int, default=4, help='Number of threads') - args = parser.parse_args() - main(args.graph_dir, args.resources_dir, args.chunk_size, args.t_props_chunk_size, args.read_chunk_size, args.concurrent_files, args.num_threads) diff --git a/examples/graph500/src/main.rs b/examples/graph500/src/main.rs deleted file mode 100644 index e94d7b8d0a..0000000000 --- a/examples/graph500/src/main.rs +++ /dev/null @@ -1,203 +0,0 @@ -use raphtory::{ - algorithms::{ - centrality::pagerank::unweighted_page_rank, components::weakly_connected_components, - }, - arrow::{ - graph_impl::{ArrowGraph, ParquetLayerCols}, - query::{ast::Query, executors::rayon2, state::NoState, ForwardState, NodeSource}, - }, - core::entities::VID, - prelude::GraphViewOps, -}; -use raphtory_arrow::algorithms::connected_components; -use raphtory_storage::lanl::*; -use std::{any::Any, env, io::Write, num::NonZeroUsize, vec}; - -fn main() { - // Retrieve named parameters from environment variables - let resources_dir = - env::var("resources_dir").expect("Environment variable 'resources_dir' not set"); - let target_dir = env::var("target_dir").expect("Environment variable 'target_dir' not set"); - - // Set default values and then try to override them with environment variables if they exist - let chunk_size: usize = env::var("chunk_size") - .unwrap_or_else(|_| "268435456".to_string()) - .parse() - .expect("Invalid value for 'chunk_size'"); - let t_props_chunk_size: usize = env::var("t_props_chunk_size") - .unwrap_or_else(|_| "20000000".to_string()) - .parse() - .expect("Invalid value for 't_props_chunk_size'"); - let read_chunk_size: usize = env::var("read_chunk_size") - .unwrap_or_else(|_| "4000000".to_string()) - .parse() - .expect("Invalid value for 'read_chunk_size'"); - let concurrent_files: usize = env::var("concurrent_files") - .unwrap_or_else(|_| "1".to_string()) - .parse() - .expect("Invalid value for 'concurrent_files'"); - let num_threads: usize = env::var("num_threads") - .unwrap_or_else(|_| NonZeroUsize::new(1).unwrap().to_string()) - .parse() - .expect("Invalid value for 'num_threads'"); - - println!("Resources directory: {}", resources_dir); - println!("Target directory: {}", target_dir); - println!("Chunk Size: {}", chunk_size); - println!("T Props Chunk Size: {}", t_props_chunk_size); - println!("Read Chunk Size: {}", read_chunk_size); - println!("Concurrent Files: {}", concurrent_files); - println!("Num Threads: {}", num_threads); - - let graph_dir: &str = &target_dir.to_string(); - - let parquet_dirs = vec![format!("{}", resources_dir)]; - - let layer_parquet_cols: Vec = vec![ParquetLayerCols { - parquet_dir: &parquet_dirs[0], - layer: "default", - src_col: "src", - dst_col: "dst", - time_col: "time", - }]; - - let graph = match measure_without_print_results("Graph load from dir", || { - ArrowGraph::load_from_dir(graph_dir) - }) { - Ok(g) => g, - Err(e) => { - println!("Failed to load saved graph. Attempting to load from parquet files..."); - measure_without_print_results("Graph load from parquets", || { - ArrowGraph::load_from_parquets( - graph_dir, - layer_parquet_cols, - None, - chunk_size, - t_props_chunk_size, - Some(read_chunk_size), - Some(concurrent_files), - num_threads, - ) - }) - .expect("Failed to load the graph from parquet files") - } - }; - - println!("Node count = {}", graph.count_nodes()); - println!("Edge count = {}", graph.count_edges()); - println!("Earliest time = {}", graph.earliest_time().unwrap()); - println!("Latest time = {}", graph.latest_time().unwrap()); - - fn iterate_all_edges(g: &ArrowGraph) { - for e in g.edges() { - e.edge.src().0; - } - } - - measure_without_print_results("Iterate All Edges", || iterate_all_edges(&graph)); - - measure_without_print_results("CC", || { - connected_components::connected_components(graph.as_ref()) - }); - measure_without_print_results("Weakly CC", || { - weakly_connected_components(&graph, 20, None) - }); - measure_without_print_results("Page Rank", || { - unweighted_page_rank(&graph, Some(100), Some(1), None, true, None) - }); - - measure_without_print_results("Static Multi Hop Query (1 node)", || { - static_multi_hop_single_node_query(&graph) - }); - measure_without_print_results("Static Multi Hop Query (100 node)", || { - static_multi_hop_multi_node_query(&graph) - }); - measure_without_print_results("Temporal Multi Hop Query (1 node)", || { - multi_hop_single_node_query(&graph) - }); - measure_without_print_results("Temporal Multi Hop Query (100 node)", || { - multi_hop_multi_node_query(&graph) - }); -} - -fn static_multi_hop_single_node_query(graph: &ArrowGraph) { - let nodes = vec![VID(3000)]; - - let (sender, receiver) = std::sync::mpsc::channel(); - let query = Query::new().out("default").out("default").channel([sender]); - - let result = rayon2::execute::(query, NodeSource::NodeIds(nodes), &graph, |_| NoState); - assert!(result.is_ok()); - - let _ = receiver.into_iter().collect::>(); -} - -fn static_multi_hop_multi_node_query(graph: &ArrowGraph) { - let nodes: Vec = (0..100).map(VID).collect(); - - let (sender, receiver) = std::sync::mpsc::channel(); - let query = Query::new() - .out_limit("default", 100) - .out_limit("default", 100) - .out_limit("default", 100) - .out_limit("default", 100) - .channel([sender]); - - let result = rayon2::execute::(query, NodeSource::NodeIds(nodes), &graph, |_| { - NoState::new() - }); - assert!(result.is_ok()); - - match receiver.recv() { - Ok((_, vid)) => { - println!("VID {}", vid.0); - } - Err(e) => { - println!("Error receiving result: {:?}", e); - } - } -} - -fn multi_hop_single_node_query(graph: &ArrowGraph) { - let nodes = vec![VID(0)]; - - let query: Query = Query::new() - .out_limit("default", 100) - .out_limit("default", 100) - .out_limit("default", 100) - .out_limit("default", 100) - .path("hop", |mut writer, state: ForwardState| { - serde_json::to_writer(&mut writer, &state.path).unwrap(); - write!(writer, "\n").unwrap(); - }); - - let result = - rayon2::execute::(query, NodeSource::NodeIds(nodes), graph, |node| { - let earliest = node.earliest(); - ForwardState::at_time(node, earliest, 100) - }); - - assert!(result.is_ok()); -} - -fn multi_hop_multi_node_query(graph: &ArrowGraph) { - let nodes: Vec = (0..100).map(VID).collect(); - - let query: Query = Query::new() - .out_limit("default", 100) - .out_limit("default", 100) - .out_limit("default", 100) - .out_limit("default", 100) - .path("hop", |mut writer, state: ForwardState| { - serde_json::to_writer(&mut writer, &state.path).unwrap(); - write!(writer, "\n").unwrap(); - }); - - let result = - rayon2::execute::(query, NodeSource::NodeIds(nodes), graph, |node| { - let earliest = node.earliest(); - ForwardState::at_time(node, earliest, 100) - }); - - assert!(result.is_ok()); -} diff --git a/examples/lanl/Cargo.toml b/examples/lanl/Cargo.toml deleted file mode 100644 index 7df71353b4..0000000000 --- a/examples/lanl/Cargo.toml +++ /dev/null @@ -1,20 +0,0 @@ -[package] -name = "lanl" -version.workspace = true -documentation.workspace = true -repository.workspace = true -license.workspace = true -readme.workspace = true -homepage.workspace = true -keywords.workspace = true -authors.workspace = true -rust-version.workspace = true -edition.workspace = true - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -raphtory = { path = "../../raphtory", features = ["python"], version = "0.8.0" } -raphtory-arrow.workspace = true -raphtory-storage = { path = "../../raphtory-storage", version = "0.8.0" } -pyo3 = "0.20.0" diff --git a/examples/lanl/python/main.py b/examples/lanl/python/main.py deleted file mode 100644 index 4dda0dd653..0000000000 --- a/examples/lanl/python/main.py +++ /dev/null @@ -1,121 +0,0 @@ -import argparse -from raphtory import ArrowGraph -from raphtory.lanl import lanl_query1, lanl_query2, lanl_query3, lanl_query3b, lanl_query3c, lanl_query4, exfilteration_query1, exfilteration_count_query_total, exfiltration_list_query_count -from raphtory import algorithms -import time -from typing import TypeVar, Callable -import os -import sys - - -B = TypeVar('B') - -def measure(name: str, f: Callable[..., B], *args, print_result: bool = True) -> B: - start_time = time.time() - result = f(*args) - elapsed_time = time.time() - start_time - - time_unit = "s" - elapsed_time_display = elapsed_time - if elapsed_time < 1: - time_unit = "ms" - elapsed_time_display *= 1000 - - if print_result: - print(f"Running {name}: time: {elapsed_time_display:.3f}{time_unit}, result: {result}") - else: - print(f"Running {name}: time: {elapsed_time_display:.3f}{time_unit}") - - return result - - -def main(graph_dir, resources_dir, chunk_size, t_props_chunk_size, read_chunk_size, concurrent_files, num_threads): - - if resources_dir is None or graph_dir is None: - raise ValueError("Both 'resources_dir' and 'target_dir' environment variables must be set.") - - print(f"Resources directory: {resources_dir}") - print(f"Target directory: {graph_dir}") - print(f"Chunk size: {chunk_size}") - print(f"t_props chunk size: {t_props_chunk_size}") - print(f"Read chunk size: {read_chunk_size}") - print(f"Concurrent files: {concurrent_files}") - print(f"Number of threads: {num_threads}") - - layer_parquet_cols = [ - { - "parquet_dir": os.path.join(resources_dir, "nft_sorted"), - "layer": "netflow", - "src_col": "src", - "dst_col": "dst", - "time_col": "epoch_time", - }, - { - "parquet_dir": os.path.join(resources_dir, "v1_sorted"), - "layer": "events_1v", - "src_col": "src", - "dst_col": "dst", - "time_col": "epoch_time", - }, - { - "parquet_dir": os.path.join(resources_dir, "v2_sorted"), - "layer": "events_2v", - "src_col": "src", - "dst_col": "dst", - "time_col": "epoch_time", - } - ] - - # # Read the Parquet file - # table = pq.read_table(parquet_dir + '/part-00000-8b31eaa4-2bd9-4f07-b61c-a353aed2af22-c000.snappy.parquet') - # print(table.schema) - - try: - g = measure("Graph load from dir", ArrowGraph.load_from_dir, graph_dir, print_result=False) - except Exception as e: - g = measure( - "Graph load from parquets", - ArrowGraph.load_from_parquets, - graph_dir, - layer_parquet_cols, - None, - chunk_size, - t_props_chunk_size, - read_chunk_size, - concurrent_files, - num_threads, - print_result=False - ) - - print("Nodes count =", g.count_nodes()) - print("Edges count =", g.count_edges()) - print("Earliest time =", g.earliest_time) - print("Latest time =", g.latest_time) - - measure("Query 1", lanl_query1, g) - measure("Query 2", lanl_query2, g) - measure("Query 3", lanl_query3, g) - measure("Query 3b", lanl_query3b, g) - # assert(measure("Query 3c", lanl_query3c, g) == 0) - measure("Query 4", lanl_query4, g) - - measure("CC", algorithms.connected_components, g, print_result=False) - measure("Weakly CC Layer", algorithms.weakly_connected_components, g.layer("netflow"), 20, print_result=False) - measure("Weakly CC", algorithms.weakly_connected_components, g, 20, print_result=False) - measure("Page Rank", algorithms.pagerank, g, 100, print_result=False) - - measure("Exfilteration Query 1", exfilteration_query1, g) - measure("Exfilteration Count Query Total", exfilteration_count_query_total, g, 30) - measure("Exfilteration List Query Count", exfiltration_list_query_count, g, 30) - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description='Run LANL example queries') - parser.add_argument('--graph-dir', type=str, help='The directory of the graph') - parser.add_argument('--resources-dir', type=str, help='paths to the parquet directory') - parser.add_argument('--chunk-size', type=int, default=268435456, help='Chunk size') - parser.add_argument('--t-props-chunk-size', type=int, default=20000000, help='t_props chunk size') - parser.add_argument('--read-chunk-size', type=int, default=4000000, help='Read chunk size') - parser.add_argument('--concurrent-files', type=int, default=1, help='Concurrent files') - parser.add_argument('--num-threads', type=int, default=4, help='Number of threads') - args = parser.parse_args() - main(args.graph_dir, args.resources_dir, args.chunk_size, args.t_props_chunk_size, args.read_chunk_size, args.concurrent_files, args.num_threads) diff --git a/examples/lanl/src/main.rs b/examples/lanl/src/main.rs deleted file mode 100644 index cf39291b32..0000000000 --- a/examples/lanl/src/main.rs +++ /dev/null @@ -1,139 +0,0 @@ -use raphtory::{ - algorithms::{ - centrality::pagerank::unweighted_page_rank, components::weakly_connected_components, - }, - arrow::graph_impl::{ArrowGraph, ParquetLayerCols}, - prelude::{LayerOps, *}, -}; -use raphtory_arrow::algorithms::connected_components; -use raphtory_storage::lanl::{exfiltration, *}; -use std::{env, num::NonZeroUsize, vec}; - -fn main() { - // Retrieve named parameters from environment variables - let resources_dir = - env::var("resources_dir").expect("Environment variable 'resources_dir' not set"); - let target_dir = env::var("target_dir").expect("Environment variable 'target_dir' not set"); - - // Set default values and then try to override them with environment variables if they exist - let chunk_size: usize = env::var("chunk_size") - .unwrap_or_else(|_| "268435456".to_string()) - .parse() - .expect("Invalid value for 'chunk_size'"); - let t_props_chunk_size: usize = env::var("t_props_chunk_size") - .unwrap_or_else(|_| "20000000".to_string()) - .parse() - .expect("Invalid value for 't_props_chunk_size'"); - let read_chunk_size: usize = env::var("read_chunk_size") - .unwrap_or_else(|_| "4000000".to_string()) - .parse() - .expect("Invalid value for 'read_chunk_size'"); - let concurrent_files: usize = env::var("concurrent_files") - .unwrap_or_else(|_| "1".to_string()) - .parse() - .expect("Invalid value for 'concurrent_files'"); - let num_threads: usize = env::var("num_threads") - .unwrap_or_else(|_| NonZeroUsize::new(1).unwrap().to_string()) - .parse() - .expect("Invalid value for 'num_threads'"); - - println!("Resources directory: {}", resources_dir); - println!("Target directory: {}", target_dir); - println!("Chunk Size: {}", chunk_size); - println!("T Props Chunk Size: {}", t_props_chunk_size); - println!("Read Chunk Size: {}", read_chunk_size); - println!("Concurrent Files: {}", concurrent_files); - println!("Num Threads: {}", num_threads); - - let graph_dir: &str = &target_dir.to_string(); - - let parquet_dirs = vec![ - format!("{}/netflowsorted/nft_sorted", resources_dir), - format!("{}/netflowsorted/v1_sorted", resources_dir), - format!("{}/netflowsorted/v2_sorted", resources_dir), - ]; - - let layer_parquet_cols: Vec = vec![ - ParquetLayerCols { - parquet_dir: &parquet_dirs[0], - layer: "netflow", - src_col: "src", - dst_col: "dst", - time_col: "epoch_time", - }, - ParquetLayerCols { - parquet_dir: &parquet_dirs[1], - layer: "events_1v", - src_col: "src", - dst_col: "dst", - time_col: "epoch_time", - }, - ParquetLayerCols { - parquet_dir: &parquet_dirs[2], - layer: "events_2v", - src_col: "src", - dst_col: "dst", - time_col: "epoch_time", - }, - ]; - - let graph = match measure_without_print_results("Graph load from dir", || { - ArrowGraph::load_from_dir(graph_dir) - }) { - Ok(g) => g, - Err(e) => { - println!("Failed to load saved graph. Attempting to load from parquet files..."); - measure_without_print_results("Graph load from parquets", || { - ArrowGraph::load_from_parquets( - graph_dir, - layer_parquet_cols, - None, - chunk_size, - t_props_chunk_size, - Some(read_chunk_size), - Some(concurrent_files), - num_threads, - ) - }) - .expect("Failed to load the graph from parquet files") - } - }; - - println!("Node count = {}", graph.count_nodes()); - println!("Edge count = {}", graph.count_edges()); - println!("Earliest time = {}", graph.earliest_time().unwrap()); - println!("Latest time = {}", graph.latest_time().unwrap()); - - measure_with_print_results("Query 1", || query1::run(graph.as_ref()).unwrap()); - measure_with_print_results("Query 2", || query2::run(graph.as_ref()).unwrap()); - measure_with_print_results("Query 3", || query3::run(graph.as_ref()).unwrap()); - measure_with_print_results("Query 3b", || query3b::run(graph.as_ref()).unwrap()); - // # measure_with_print_results("Query 3c", || query3c::run(graph.as_ref()).unwrap()); - measure_with_print_results("Query 4", || query4::run2(graph.as_ref()).unwrap()); - - measure_without_print_results("CC", || { - connected_components::connected_components(graph.as_ref()) - }); - measure_without_print_results("Weakly CC", || { - weakly_connected_components(&graph.valid_layers("netflow"), 20, None) - }); - measure_without_print_results("Page Rank", || { - unweighted_page_rank( - &graph.valid_layers("netflow"), - Some(100), - Some(1), - None, - true, - None, - ) - }); - measure_with_print_results("Exfilteration Query 1", || { - exfiltration::query1::run(graph.as_ref()) - }); - measure_with_print_results("Exfilteration Count Query Total", || { - exfiltration::count::query_total(graph.as_ref(), 30) - }); - measure_with_print_results("Exfilteration List Query Count", || { - exfiltration::list::query_count(graph.as_ref(), 30) - }); -} diff --git a/python/Cargo.toml b/python/Cargo.toml index ddec9e255a..0c628d5004 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -21,7 +21,6 @@ crate-type = ["cdylib"] pyo3 = { workspace = true } raphtory_core = { path = "../raphtory", version = "0.8.1", features = ["python", "search", "vectors"], package = "raphtory" } raphtory-graphql = { path = "../raphtory-graphql", version = "0.8.1" } -raphtory-storage = { path = "../raphtory-storage", version = "0.8.1" } serde_json = { workspace = true } reqwest = { workspace = true } tokio = { workspace = true } @@ -32,9 +31,10 @@ dynamic-graphql = { workspace = true } itertools = { workspace = true } [features] +default = ["extension-module"] arrow = ["raphtory_core/arrow"] extension-module = ["pyo3/extension-module"] -default = ["extension-module"] + [build-dependencies] pyo3-build-config = { workspace = true } diff --git a/python/python/raphtory/__init__.py b/python/python/raphtory/__init__.py index d67a30c2d3..435a67c07b 100644 --- a/python/python/raphtory/__init__.py +++ b/python/python/raphtory/__init__.py @@ -12,11 +12,6 @@ except Exception as e: print(e) -try: - sys.modules["raphtory.lanl"] = lanl -except Exception as e: - print(e) - from .nullmodels import * from .plottingutils import * diff --git a/python/src/lib.rs b/python/src/lib.rs index 3d159efea0..ac67f953df 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -4,6 +4,8 @@ extern crate core; use graphql::*; use pyo3::prelude::*; +#[cfg(feature = "arrow")] +use raphtory_core::python::graph::arrow::{PyArrowGraph, PyGraphQuery, PyState}; use raphtory_core::python::{ graph::{ algorithm_result::AlgorithmResult, @@ -23,10 +25,6 @@ use raphtory_core::python::{ }, types::wrappers::document::PyDocument, }; -use raphtory_storage::python::packages::algorithms::*; - -#[cfg(feature = "arrow")] -use raphtory_core::python::graph::arrow::{PyArrowGraph, PyGraphQuery, PyState}; macro_rules! add_functions { ($module:expr, $($func:ident),* $(,)?) => { @@ -103,7 +101,6 @@ fn raphtory(py: Python<'_>, m: &PyModule) -> PyResult<()> { temporally_reachable_nodes, local_clustering_coefficient, weakly_connected_components, - connected_components, strongly_connected_components, in_components, out_components, @@ -118,6 +115,10 @@ fn raphtory(py: Python<'_>, m: &PyModule) -> PyResult<()> { fruchterman_reingold, cohesive_fruchterman_reingold, ); + + #[cfg(feature = "arrow")] + add_functions!(algorithm_module, connected_components,); + m.add_submodule(algorithm_module)?; // let usecase_algorithm_module = PyModule::new(py, "usecase_algorithms")?; @@ -153,24 +154,5 @@ fn raphtory(py: Python<'_>, m: &PyModule) -> PyResult<()> { add_functions!(vectors_module, generate_property_list); m.add_submodule(vectors_module)?; - // LANL ALGORITHMS - #[cfg(feature = "arrow")] - let lanl_module = PyModule::new(py, "lanl")?; - #[cfg(feature = "arrow")] - add_functions!( - lanl_module, - lanl_query1, - lanl_query2, - lanl_query3, - lanl_query3b, - lanl_query3c, - lanl_query4, - exfilteration_query1, - exfilteration_count_query_total, - exfiltration_list_query_count - ); - #[cfg(feature = "arrow")] - m.add_submodule(lanl_module)?; - Ok(()) } diff --git a/python/tests/test_arrowgraph.py b/python/tests/test_arrowgraph.py index e681876b1e..3af535cb91 100644 --- a/python/tests/test_arrowgraph.py +++ b/python/tests/test_arrowgraph.py @@ -1,15 +1,4 @@ from raphtory import ArrowGraph -from raphtory.lanl import ( - lanl_query1, - lanl_query2, - lanl_query3, - lanl_query3b, - lanl_query3c, - lanl_query4, - exfilteration_query1, - exfilteration_count_query_total, - exfiltration_list_query_count, -) from raphtory import algorithms from utils import measure import tempfile @@ -85,16 +74,6 @@ def test_arrow_graph(): assert g.earliest_time == 7257601 assert g.latest_time == 7343985 - assert measure("Query 1", lanl_query1, g) == 0 - assert measure("Query 2", lanl_query2, g) == 0 - assert measure("Query 3", lanl_query3, g) == 0 - assert measure("Query 3b", lanl_query3b, g) == 0 - # assert(measure("Query 3c", lanl_query3c, g) == 0) - assert measure("Query 4", lanl_query4, g) == 0 - - # assert (measure("CC", algorithms.connected_components, g, print_result=False)[:10] == [0, 1, 2, 3, 4, 5, 6, 7, 8, - # 9]) - actual = measure( "Weakly CC Layer", algorithms.weakly_connected_components, @@ -112,15 +91,3 @@ def test_arrow_graph(): "Page Rank", algorithms.pagerank, g.layer("netflow"), 100, print_result=False ) assert len(list(actual.get_all_with_names())) == 1624 - - assert measure("Exfilteration Query 1", exfilteration_query1, g) == 0 - assert ( - measure( - "Exfilteration Count Query Total", exfilteration_count_query_total, g, 30 - ) - == 0 - ) - assert ( - measure("Exfilteration List Query Count", exfiltration_list_query_count, g, 30) - == 0 - ) diff --git a/raphtory-arrow/Cargo.toml b/raphtory-arrow/Cargo.toml new file mode 100644 index 0000000000..6eec84f04a --- /dev/null +++ b/raphtory-arrow/Cargo.toml @@ -0,0 +1,4 @@ +[package] +name = "raphtory-arrow" +version = "0.8.1" +[dependencies] diff --git a/raphtory-arrow/src/lib.rs b/raphtory-arrow/src/lib.rs new file mode 100644 index 0000000000..8b13789179 --- /dev/null +++ b/raphtory-arrow/src/lib.rs @@ -0,0 +1 @@ + diff --git a/raphtory-benchmark/Cargo.toml b/raphtory-benchmark/Cargo.toml index 2e4da4be2b..c5ac34388d 100644 --- a/raphtory-benchmark/Cargo.toml +++ b/raphtory-benchmark/Cargo.toml @@ -7,13 +7,19 @@ edition = "2021" [dependencies] criterion = { workspace = true } -raphtory = { path = "../raphtory", features = ["io", "arrow"] } +raphtory = { path = "../raphtory", features = ["io"] } +raphtory-graphql = { path = "../raphtory-graphql", version = "0.8.1" } +raphtory-arrow = { path = "../raphtory-arrow", version = "0.8.1" } sorted_vector_map = { workspace = true } rand = { workspace = true } rayon = { workspace = true } polars-arrow = { workspace = true } tempfile = { workspace = true } -raphtory-arrow = { workspace = true } +chrono = { workspace = true } +clap = { workspace = true } +csv = { workspace = true } +flate2 = { workspace = true } +tokio = { workspace = true } [[bench]] name = "tgraph_benchmarks" @@ -39,6 +45,6 @@ harness = false name = "edge_add" harness = false -[[bench]] -name = "arrow_algobench" -harness = false +# [[bench]] +# name = "arrow_algobench" +# harness = false diff --git a/comparison-benchmark/graphql-benchmark/benchmark.yml b/raphtory-benchmark/bin/benchmark.yml similarity index 100% rename from comparison-benchmark/graphql-benchmark/benchmark.yml rename to raphtory-benchmark/bin/benchmark.yml diff --git a/comparison-benchmark/graphql-benchmark/generate_graph.py b/raphtory-benchmark/bin/generate_graph.py similarity index 100% rename from comparison-benchmark/graphql-benchmark/generate_graph.py rename to raphtory-benchmark/bin/generate_graph.py diff --git a/comparison-benchmark/graphql-benchmark/hooks.js b/raphtory-benchmark/bin/hooks.js similarity index 100% rename from comparison-benchmark/graphql-benchmark/hooks.js rename to raphtory-benchmark/bin/hooks.js diff --git a/comparison-benchmark/graphql-benchmark/names.csv b/raphtory-benchmark/bin/names.csv similarity index 100% rename from comparison-benchmark/graphql-benchmark/names.csv rename to raphtory-benchmark/bin/names.csv diff --git a/comparison-benchmark/graphql-benchmark/readme.md b/raphtory-benchmark/bin/readme.md similarity index 100% rename from comparison-benchmark/graphql-benchmark/readme.md rename to raphtory-benchmark/bin/readme.md diff --git a/comparison-benchmark/graphql-benchmark/run_graphql.py b/raphtory-benchmark/bin/run_graphql.py similarity index 100% rename from comparison-benchmark/graphql-benchmark/run_graphql.py rename to raphtory-benchmark/bin/run_graphql.py diff --git a/comparison-benchmark/graphql-benchmark/run_server/src/main.rs b/raphtory-benchmark/bin/run_server.rs similarity index 100% rename from comparison-benchmark/graphql-benchmark/run_server/src/main.rs rename to raphtory-benchmark/bin/run_server.rs diff --git a/comparison-benchmark/python/benchmark_base.py b/raphtory-benchmark/python/benchmark_base.py similarity index 100% rename from comparison-benchmark/python/benchmark_base.py rename to raphtory-benchmark/python/benchmark_base.py diff --git a/comparison-benchmark/python/benchmark_driver.py b/raphtory-benchmark/python/benchmark_driver.py similarity index 100% rename from comparison-benchmark/python/benchmark_driver.py rename to raphtory-benchmark/python/benchmark_driver.py diff --git a/comparison-benchmark/python/benchmark_imports.py b/raphtory-benchmark/python/benchmark_imports.py similarity index 100% rename from comparison-benchmark/python/benchmark_imports.py rename to raphtory-benchmark/python/benchmark_imports.py diff --git a/comparison-benchmark/python/cozo_bench.py b/raphtory-benchmark/python/cozo_bench.py similarity index 100% rename from comparison-benchmark/python/cozo_bench.py rename to raphtory-benchmark/python/cozo_bench.py diff --git a/comparison-benchmark/python/graphtool_bench.py b/raphtory-benchmark/python/graphtool_bench.py similarity index 100% rename from comparison-benchmark/python/graphtool_bench.py rename to raphtory-benchmark/python/graphtool_bench.py diff --git a/comparison-benchmark/python/kuzu_bench.py b/raphtory-benchmark/python/kuzu_bench.py similarity index 100% rename from comparison-benchmark/python/kuzu_bench.py rename to raphtory-benchmark/python/kuzu_bench.py diff --git a/comparison-benchmark/python/memgraph_bench.py b/raphtory-benchmark/python/memgraph_bench.py similarity index 100% rename from comparison-benchmark/python/memgraph_bench.py rename to raphtory-benchmark/python/memgraph_bench.py diff --git a/comparison-benchmark/python/neo4j_bench.py b/raphtory-benchmark/python/neo4j_bench.py similarity index 100% rename from comparison-benchmark/python/neo4j_bench.py rename to raphtory-benchmark/python/neo4j_bench.py diff --git a/comparison-benchmark/python/networkx_bench.py b/raphtory-benchmark/python/networkx_bench.py similarity index 100% rename from comparison-benchmark/python/networkx_bench.py rename to raphtory-benchmark/python/networkx_bench.py diff --git a/comparison-benchmark/python/profile_bench.py b/raphtory-benchmark/python/profile_bench.py similarity index 100% rename from comparison-benchmark/python/profile_bench.py rename to raphtory-benchmark/python/profile_bench.py diff --git a/comparison-benchmark/python/raphtory_bench.py b/raphtory-benchmark/python/raphtory_bench.py similarity index 100% rename from comparison-benchmark/python/raphtory_bench.py rename to raphtory-benchmark/python/raphtory_bench.py diff --git a/comparison-benchmark/readme.md b/raphtory-benchmark/readme.md similarity index 100% rename from comparison-benchmark/readme.md rename to raphtory-benchmark/readme.md diff --git a/comparison-benchmark/rust/raphtory-rust-benchmark/src/main.rs b/raphtory-benchmark/src/main.rs similarity index 100% rename from comparison-benchmark/rust/raphtory-rust-benchmark/src/main.rs rename to raphtory-benchmark/src/main.rs diff --git a/raphtory-cypher/Cargo.toml b/raphtory-cypher/Cargo.toml index 76c9a23ebb..1ac40e906b 100644 --- a/raphtory-cypher/Cargo.toml +++ b/raphtory-cypher/Cargo.toml @@ -15,7 +15,7 @@ edition.workspace = true [dependencies] raphtory = { path = "../raphtory" } -raphtory-arrow.workspace = true +raphtory-arrow = { path = "../raphtory-arrow", optional = true, version = "0.8.1"} arrow.workspace = true arrow-buffer.workspace = true arrow-schema.workspace = true @@ -45,3 +45,6 @@ tempfile.workspace = true rand.workspace = true tokio.workspace = true clap.workspace = true + +[features] +arrow = ["raphtory/arrow", "raphtory-arrow"] diff --git a/raphtory-cypher/examples/raphtory_cypher.rs b/raphtory-cypher/examples/raphtory_cypher.rs index 4a02797ba1..0af6f5d5be 100644 --- a/raphtory-cypher/examples/raphtory_cypher.rs +++ b/raphtory-cypher/examples/raphtory_cypher.rs @@ -1,184 +1,200 @@ -use std::{error::Error, str::FromStr}; - -use arrow::util::pretty::print_batches; -use clap::Parser; -use futures::{stream, StreamExt}; -use raphtory::arrow::graph_impl::{ArrowGraph, ParquetLayerCols}; -use raphtory_cypher::{run_cypher, run_cypher_to_streams, run_sql}; -use serde::{de::DeserializeOwned, Deserialize}; - -/// Query graph with cypher -#[derive(Parser, Debug)] -#[command(version, about, long_about = None)] -struct CypherQuery { - /// Cypher query to run - #[arg(short, long)] - query: String, - - /// Graph path on disk - #[arg(short, long)] - graph_dir: String, - - /// Print the first batch of results - #[arg(short, long, default_value_t = false)] - print: bool, - - /// use sql instead of cypher - #[arg(short, long, default_value_t = false)] - sql: bool, -} +#[cfg(feature = "arrow")] +pub use cyper_arrow::*; -#[derive(Parser, Debug)] -#[command(version, about, long_about = None)] -struct LoadGraph { - /// Graph path on disk - #[arg(short, long)] - graph_dir: String, +#[cfg(not(feature = "arrow"))] +fn main() {} - /// Chunk size of the adjacency list - #[arg(short, long, default_value_t = 1000000)] - chunk_size: usize, +#[cfg(feature = "arrow")] +#[tokio::main] +async fn main() { + cyper_arrow::main().await; +} - /// Chunk size of the edge property list - #[arg(short, long, default_value_t = 5000000)] - t_prop_chunk_size: usize, +#[cfg(feature = "arrow")] +mod cyper_arrow { + use std::{error::Error, str::FromStr}; + + use arrow::util::pretty::print_batches; + use clap::Parser; + use futures::{stream, StreamExt}; + use raphtory::arrow::graph_impl::{ArrowGraph, ParquetLayerCols}; + use raphtory_cypher::{run_cypher, run_cypher_to_streams, run_sql}; + use serde::{de::DeserializeOwned, Deserialize}; + + /// Query graph with cypher + #[derive(Parser, Debug)] + #[command(version, about, long_about = None)] + struct CypherQuery { + /// Cypher query to run + #[arg(short, long)] + query: String, + + /// Graph path on disk + #[arg(short, long)] + graph_dir: String, + + /// Print the first batch of results + #[arg(short, long, default_value_t = false)] + print: bool, + + /// use sql instead of cypher + #[arg(short, long, default_value_t = false)] + sql: bool, + } - /// Chunk size of the parquet edge list - #[arg(short, long)] - read_chunk_size: Option, + #[derive(Parser, Debug)] + #[command(version, about, long_about = None)] + struct LoadGraph { + /// Graph path on disk + #[arg(short, long)] + graph_dir: String, - /// Number of threads to use when loading temporal properties - #[arg(short, long, default_value_t = 8)] - num_threads: usize, + /// Chunk size of the adjacency list + #[arg(short, long, default_value_t = 1000000)] + chunk_size: usize, - /// Number of concurrent files to read when loading temporal properties - #[arg(short, long)] - concurrent_files: Option, + /// Chunk size of the edge property list + #[arg(short, long, default_value_t = 5000000)] + t_prop_chunk_size: usize, - /// Node properties to load - #[arg(short, long)] - node_props: Option, + /// Chunk size of the parquet edge list + #[arg(short, long)] + read_chunk_size: Option, - /// Edge list parquet files to load as layers - #[arg(short='l', last = true, value_parser = parse_key_val::)] - layers: Vec<(String, ArgLayer)>, -} + /// Number of threads to use when loading temporal properties + #[arg(short, long, default_value_t = 8)] + num_threads: usize, -#[derive(thiserror::Error, Debug, PartialEq)] -enum Err { - #[error("{0}")] - Message(String), -} + /// Number of concurrent files to read when loading temporal properties + #[arg(short, long)] + concurrent_files: Option, -fn parse_key_val(s: &str) -> Result<(T, U), Err> -where - T: std::str::FromStr, - U: std::str::FromStr, - ::Err: std::fmt::Display, -{ - let pos = s - .find('=') - .ok_or_else(|| Err::Message(format!("invalid KEY=value: no `=` found in `{s}`")))?; - let json = &s[pos + 1..]; - let arg_layer: U = serde_json::from_str(json) - .map_err(|e| Err::Message(format!("Failed to parse json: {}", e)))?; - let layer = s[..pos] - .parse() - .map_err(|e| Err::Message(format!("Failed to parse key: {}", e)))?; - Ok((layer, arg_layer)) -} + /// Node properties to load + #[arg(short, long)] + node_props: Option, -#[derive(Clone, Debug, Deserialize)] -struct ArgLayer { - path: String, - src_col: String, - dst_col: String, - time_col: String, -} + /// Edge list parquet files to load as layers + #[arg(short='l', last = true, value_parser = parse_key_val::)] + layers: Vec<(String, ArgLayer)>, + } -impl std::str::FromStr for ArgLayer { - type Err = Box; + #[derive(thiserror::Error, Debug, PartialEq)] + enum Err { + #[error("{0}")] + Message(String), + } - fn from_str(s: &str) -> Result { - let layer: ArgLayer = serde_json::from_str(s)?; - Ok(layer) + fn parse_key_val(s: &str) -> Result<(T, U), Err> + where + T: std::str::FromStr, + U: std::str::FromStr, + ::Err: std::fmt::Display, + { + let pos = s + .find('=') + .ok_or_else(|| Err::Message(format!("invalid KEY=value: no `=` found in `{s}`")))?; + let json = &s[pos + 1..]; + let arg_layer: U = serde_json::from_str(json) + .map_err(|e| Err::Message(format!("Failed to parse json: {}", e)))?; + let layer = s[..pos] + .parse() + .map_err(|e| Err::Message(format!("Failed to parse key: {}", e)))?; + Ok((layer, arg_layer)) } -} -/// Query graph with cypher -#[derive(Parser, Debug)] -#[command(version, about, long_about = None)] -enum Args { - Query(CypherQuery), - Load(LoadGraph), -} + #[derive(Clone, Debug, Deserialize)] + struct ArgLayer { + path: String, + src_col: String, + dst_col: String, + time_col: String, + } -#[tokio::main] -async fn main() { - let args = Args::parse(); + impl std::str::FromStr for ArgLayer { + type Err = Box; + + fn from_str(s: &str) -> Result { + let layer: ArgLayer = serde_json::from_str(s)?; + Ok(layer) + } + } - match args { - Args::Query(args) => { - let graph = ArrowGraph::load_from_dir(&args.graph_dir).expect("Failed to load graph"); + /// Query graph with cypher + #[derive(Parser, Debug)] + #[command(version, about, long_about = None)] + enum Args { + Query(CypherQuery), + Load(LoadGraph), + } - let now = std::time::Instant::now(); + // #[tokio::main] + pub async fn main() { + let args = Args::parse(); - if args.print { - let df = if args.sql { - run_sql(&args.query, &graph).await.unwrap() - } else { - run_cypher(&args.query, &graph, true).await.unwrap() - }; + match args { + Args::Query(args) => { + let graph = + ArrowGraph::load_from_dir(&args.graph_dir).expect("Failed to load graph"); let now = std::time::Instant::now(); - let batches = df.collect().await.unwrap(); - println!("Query execution time: {:?}", now.elapsed()); - print_batches(&batches).expect("Failed to print batches"); - } else { - let streams = run_cypher_to_streams(&args.query, &graph).await.unwrap(); - let num_rows = stream::iter(streams) - .flatten() - .filter_map(|batch| async move { batch.ok() }) - .map(|batch| batch.num_rows()) - .fold(0, |acc, x| async move { acc + x }) - .await; - println!("Query execution: {:?}, num_rows: {num_rows}", now.elapsed()); + + if args.print { + let df = if args.sql { + run_sql(&args.query, &graph).await.unwrap() + } else { + run_cypher(&args.query, &graph, true).await.unwrap() + }; + + let now = std::time::Instant::now(); + let batches = df.collect().await.unwrap(); + println!("Query execution time: {:?}", now.elapsed()); + print_batches(&batches).expect("Failed to print batches"); + } else { + let streams = run_cypher_to_streams(&args.query, &graph).await.unwrap(); + let num_rows = stream::iter(streams) + .flatten() + .filter_map(|batch| async move { batch.ok() }) + .map(|batch| batch.num_rows()) + .fold(0, |acc, x| async move { acc + x }) + .await; + println!("Query execution: {:?}, num_rows: {num_rows}", now.elapsed()); + } } - } - Args::Load(args) => { - let layers = args.layers; - let layer_parquet_cols = (0..layers.len()) - .map(|layer_id| { - let ( - layer, - ArgLayer { - path, - src_col, - dst_col, - time_col, - }, - ) = &layers[layer_id]; - ParquetLayerCols { - parquet_dir: &path, - layer: &layer, - src_col: &src_col, - dst_col: &dst_col, - time_col: &time_col, - } - }) - .collect(); - ArrowGraph::load_from_parquets( - args.graph_dir.as_str(), - layer_parquet_cols, - args.node_props.as_deref(), - args.chunk_size, - args.t_prop_chunk_size, - args.read_chunk_size, - args.concurrent_files, - args.num_threads, - ) - .expect("Failed to load graph"); + Args::Load(args) => { + let layers = args.layers; + let layer_parquet_cols = (0..layers.len()) + .map(|layer_id| { + let ( + layer, + ArgLayer { + path, + src_col, + dst_col, + time_col, + }, + ) = &layers[layer_id]; + ParquetLayerCols { + parquet_dir: &path, + layer: &layer, + src_col: &src_col, + dst_col: &dst_col, + time_col: &time_col, + } + }) + .collect(); + ArrowGraph::load_from_parquets( + args.graph_dir.as_str(), + layer_parquet_cols, + args.node_props.as_deref(), + args.chunk_size, + args.t_prop_chunk_size, + args.read_chunk_size, + args.concurrent_files, + args.num_threads, + ) + .expect("Failed to load graph"); + } } } } diff --git a/raphtory-cypher/src/lib.rs b/raphtory-cypher/src/lib.rs index d1538b9206..e9d76c028c 100644 --- a/raphtory-cypher/src/lib.rs +++ b/raphtory-cypher/src/lib.rs @@ -1,306 +1,251 @@ -use arrow::compute::take; -use std::sync::Arc; - -use arrow_array::{builder, Array, RecordBatch, UInt64Array}; -use arrow_schema::{ArrowError, DataType}; -use datafusion::{ - dataframe::DataFrame, - error::DataFusionError, - execution::{ - config::SessionConfig, - context::{SQLOptions, SessionContext, SessionState}, - runtime_env::RuntimeEnv, - }, - logical_expr::{create_udf, ColumnarValue, LogicalPlan, Volatility}, - physical_plan::SendableRecordBatchStream, -}; - -use executor::{table_provider::edge::EdgeListTableProvider, ExecError}; -use parser::ast::*; -use raphtory::arrow::graph_impl::ArrowGraph; - -use crate::{ - executor::table_provider::node::NodeTableProvider, - hop::rule::{HopQueryPlanner, HopRule}, -}; +#[cfg(feature = "arrow")] +pub use cypher_arrow::*; +#[cfg(feature = "arrow")] pub mod executor; +#[cfg(feature = "arrow")] pub mod hop; +#[cfg(feature = "arrow")] pub mod parser; +#[cfg(feature = "arrow")] pub mod transpiler; -pub use polars_arrow as arrow2; - -pub async fn run_cypher( - query: &str, - g: &ArrowGraph, - enable_hop_optim: bool, -) -> Result { - let (ctx, plan) = prepare_plan(query, g, enable_hop_optim).await?; - let df = ctx.execute_logical_plan(plan).await?; - Ok(df) -} - -pub async fn prepare_plan( - query: &str, - g: &ArrowGraph, - enable_hop_optim: bool, -) -> Result<(SessionContext, LogicalPlan), ExecError> { - // println!("Running query: {:?}", query); - let query = parser::parse_cypher(query)?; - - let config = SessionConfig::from_env()?.with_information_schema(true); - - // config.options_mut().optimizer.skip_failed_rules = true; - // config.options_mut().optimizer.top_down_join_key_reordering = false; - - let runtime = Arc::new(RuntimeEnv::default()); - let state = if enable_hop_optim { - SessionState::new_with_config_rt(config, runtime) - .with_query_planner(Arc::new(HopQueryPlanner {})) - .add_optimizer_rule(Arc::new(HopRule::new(g.clone()))) - } else { - SessionState::new_with_config_rt(config, runtime) +#[cfg(feature = "arrow")] +mod cypher_arrow { + use arrow::compute::take; + use std::sync::Arc; + + use arrow_array::{builder, Array, RecordBatch, UInt64Array}; + use arrow_schema::{ArrowError, DataType}; + use datafusion::{ + dataframe::DataFrame, + error::DataFusionError, + execution::{ + config::SessionConfig, + context::{SQLOptions, SessionContext, SessionState}, + runtime_env::RuntimeEnv, + }, + logical_expr::{create_udf, ColumnarValue, LogicalPlan, Volatility}, + physical_plan::SendableRecordBatchStream, }; - let ctx = SessionContext::new_with_state(state); - - let graph = g.as_ref(); - for layer in graph.layer_names() { - let edge_list_table = EdgeListTableProvider::new(layer, g.clone())?; - ctx.register_table(layer, Arc::new(edge_list_table))?; - } - - let node_table_provider = NodeTableProvider::new(g.clone())?; - ctx.register_table("nodes", Arc::new(node_table_provider))?; - let layer_names = graph.layer_names().to_vec(); - - ctx.register_udf(create_udf( - "type", - vec![DataType::UInt64], - DataType::Utf8.into(), - Volatility::Immutable, - Arc::new(move |cols| { - let layer_id_col = match &cols[0] { - ColumnarValue::Array(a) => a.clone(), - ColumnarValue::Scalar(a) => a.to_array()?, - }; - let layer_id_col = layer_id_col - .as_any() - .downcast_ref::() - .ok_or_else(|| { - DataFusionError::Execution("Expected column of type u64".to_string()) - })?; - - let mut type_col = builder::StringBuilder::new(); - for layer_id in layer_id_col.values() { - let layer_name = layer_names - .get(*layer_id as usize) - .ok_or_else(|| DataFusionError::Execution("Layer not found".to_string()))?; - type_col.append_value(layer_name); - } - Ok(ColumnarValue::Array(Arc::new(type_col.finish()))) - }), - )); - ctx.refresh_catalogs().await?; - let query = transpiler::to_sql(query, g); - - // println!("SQL: {:?}", query.to_string()); - // println!("SQL AST: {:?}", query); - let plan = ctx - .state() - .statement_to_plan(datafusion::sql::parser::Statement::Statement(Box::new( - query, - ))) - .await?; - let opts = SQLOptions::new(); - opts.verify_plan(&plan)?; - - let plan = ctx.state().optimize(&plan)?; - // println!("PLAN! {:?}", plan); - Ok((ctx, plan)) -} + use super::{ + executor::{table_provider::edge::EdgeListTableProvider, ExecError}, + parser::ast::*, + *, + }; + use raphtory::arrow::graph_impl::ArrowGraph; -pub async fn run_cypher_to_streams( - query: &str, - graph: &ArrowGraph, -) -> Result, ExecError> { - let df = run_cypher(query, graph, true).await?; - let stream = df.execute_stream_partitioned().await?; - Ok(stream) -} + use crate::{ + executor::table_provider::node::NodeTableProvider, + hop::rule::{HopQueryPlanner, HopRule}, + }; -pub async fn run_sql(query: &str, graph: &ArrowGraph) -> Result { - let ctx = SessionContext::new(); + pub use polars_arrow as arrow2; - for layer in graph.as_ref().layer_names() { - let table = EdgeListTableProvider::new(layer, graph.clone())?; - ctx.register_table(layer, Arc::new(table))?; + pub async fn run_cypher( + query: &str, + g: &ArrowGraph, + enable_hop_optim: bool, + ) -> Result { + let (ctx, plan) = prepare_plan(query, g, enable_hop_optim).await?; + let df = ctx.execute_logical_plan(plan).await?; + Ok(df) } - let node_table_provider = NodeTableProvider::new(graph.clone())?; - ctx.register_table("nodes", Arc::new(node_table_provider))?; - - // let state = ctx.state(); - // let dialect = state.config().options().sql_parser.dialect.as_str(); - // let sql_ast = ctx.state().sql_to_statement(query, dialect)?; - // println!("SQL AST: {:?}", sql_ast); - - let df = ctx.sql(query).await?; - Ok(df) -} + pub async fn prepare_plan( + query: &str, + g: &ArrowGraph, + enable_hop_optim: bool, + ) -> Result<(SessionContext, LogicalPlan), ExecError> { + // println!("Running query: {:?}", query); + let query = super::parser::parse_cypher(query)?; + + let config = SessionConfig::from_env()?.with_information_schema(true); + + // config.options_mut().optimizer.skip_failed_rules = true; + // config.options_mut().optimizer.top_down_join_key_reordering = false; + + let runtime = Arc::new(RuntimeEnv::default()); + let state = if enable_hop_optim { + SessionState::new_with_config_rt(config, runtime) + .with_query_planner(Arc::new(HopQueryPlanner {})) + .add_optimizer_rule(Arc::new(HopRule::new(g.clone()))) + } else { + SessionState::new_with_config_rt(config, runtime) + }; + let ctx = SessionContext::new_with_state(state); -pub fn take_record_batch( - record_batch: &RecordBatch, - indices: &dyn Array, -) -> Result { - let columns = record_batch - .columns() - .iter() - .map(|c| take(c, indices, None)) - .collect::, _>>()?; - RecordBatch::try_new(record_batch.schema(), columns) -} + let graph = g.as_ref(); + for layer in graph.layer_names() { + let edge_list_table = EdgeListTableProvider::new(layer, g.clone())?; + ctx.register_table(layer, Arc::new(edge_list_table))?; + } -#[cfg(test)] -mod test { - use arrow::compute::concat_batches; - use std::path::Path; - - // FIXME: actually assert the tests below - // use pretty_assertions::assert_eq; - use arrow::util::pretty::print_batches; - use arrow_array::RecordBatch; - use tempfile::tempdir; - - use raphtory::{arrow::graph_impl::ArrowGraph, prelude::*}; - - use crate::run_cypher; - - lazy_static::lazy_static! { - static ref EDGES: Vec<(u64, u64, i64, f64)> = vec![ - (0, 1, 1, 3.), - (0, 1, 2, 4.), - (0, 2, 0, 1.), - (1, 2, 2, 4.), - (1, 3, 3, 4.), - (1, 4, 1, 1.), - (3, 2, 5, 5.), - (3, 4, 1, 6.), - (3, 4, 3, 6.), - (3, 5, 7, 6.), - (4, 5, 9, 7.), - ]; - - static ref EDGES2: Vec<(u64, u64, i64, f64, String)> = vec![ - (0, 1, 1, 3., "baa".to_string()), - (0, 2, 2, 7., "buu".to_string()), - (2, 3, 1, 9., "xbaa".to_string()), - (2, 3, 2, 1., "xbaa".to_string()), - (1, 0, 3, 4., "beea".to_string()), - (1, 0, 3, 1., "beex".to_string()), - (4, 1, 5, 5., "baaz".to_string()), - (4, 5, 1, 6., "bxx".to_string()), - (5, 6, 3, 6., "mbaa".to_string()), - (6, 4, 7, 8., "baa".to_string()), - (6, 4, 9, 7., "bzz".to_string()), - ]; - - // a star graph with 5 nodes an 4 edges - static ref EDGES3: Vec<(u64, u64, i64, f64)> = vec![ - (0, 2, 2, 7.), - (0, 3, 3, 9.), - (0, 4, 4, 1.), - (1, 0, 1, 3.), - ]; - - - // (id, name, age, city) - static ref NODES: Vec<(u64, String, i64, Option)> = vec![ - (0, "Alice", 30, None), - (1, "Bob", 25, Some( "Paris" )), - (2, "Charlie", 35, Some( "Berlin" )), - (3, "David", 40, None), - (4, "Eve", 45, Some( "London" )), - (5, "Frank", 50, Some( "Berlin" )), - (6, "Grace", 55, Some( "Paris" )), - ].into_iter().map(|(id, name, age, city)| { - (id, name.to_string(), age, city.map(|s| s.to_string())) - }).collect(); + let node_table_provider = NodeTableProvider::new(g.clone())?; + ctx.register_table("nodes", Arc::new(node_table_provider))?; + let layer_names = graph.layer_names().to_vec(); + + ctx.register_udf(create_udf( + "type", + vec![DataType::UInt64], + DataType::Utf8.into(), + Volatility::Immutable, + Arc::new(move |cols| { + let layer_id_col = match &cols[0] { + ColumnarValue::Array(a) => a.clone(), + ColumnarValue::Scalar(a) => a.to_array()?, + }; + + let layer_id_col = layer_id_col + .as_any() + .downcast_ref::() + .ok_or_else(|| { + DataFusionError::Execution("Expected column of type u64".to_string()) + })?; + + let mut type_col = builder::StringBuilder::new(); + for layer_id in layer_id_col.values() { + let layer_name = layer_names + .get(*layer_id as usize) + .ok_or_else(|| DataFusionError::Execution("Layer not found".to_string()))?; + type_col.append_value(layer_name); + } + Ok(ColumnarValue::Array(Arc::new(type_col.finish()))) + }), + )); + ctx.refresh_catalogs().await?; + let query = transpiler::to_sql(query, g); + + // println!("SQL: {:?}", query.to_string()); + // println!("SQL AST: {:?}", query); + let plan = ctx + .state() + .statement_to_plan(datafusion::sql::parser::Statement::Statement(Box::new( + query, + ))) + .await?; + let opts = SQLOptions::new(); + opts.verify_plan(&plan)?; + + let plan = ctx.state().optimize(&plan)?; + // println!("PLAN! {:?}", plan); + Ok((ctx, plan)) } - //TODO: need better way of testing these, since they run in parallel order of batches is non-deterministic - #[tokio::test] - async fn select_table() { - let graph_dir = tempdir().unwrap(); - let graph = ArrowGraph::make_simple_graph(graph_dir, &EDGES, 3, 2); - - let df = run_cypher("match ()-[e]->() RETURN *", &graph, true) - .await - .unwrap(); + pub async fn run_cypher_to_streams( + query: &str, + graph: &ArrowGraph, + ) -> Result, ExecError> { + let df = run_cypher(query, graph, true).await?; + let stream = df.execute_stream_partitioned().await?; + Ok(stream) + } - let data = df.collect().await.unwrap(); + pub async fn run_sql(query: &str, graph: &ArrowGraph) -> Result { + let ctx = SessionContext::new(); - print_batches(&data).expect("failed to print batches"); - } + for layer in graph.as_ref().layer_names() { + let table = EdgeListTableProvider::new(layer, graph.clone())?; + ctx.register_table(layer, Arc::new(table))?; + } - #[tokio::test] - async fn select_table_order_by() { - let graph_dir = tempdir().unwrap(); - let graph = ArrowGraph::make_simple_graph(graph_dir, &EDGES, 3, 2); + let node_table_provider = NodeTableProvider::new(graph.clone())?; + ctx.register_table("nodes", Arc::new(node_table_provider))?; - let df = run_cypher("match ()-[e]->() RETURN * ORDER by e.weight", &graph, true) - .await - .unwrap(); + // let state = ctx.state(); + // let dialect = state.config().options().sql_parser.dialect.as_str(); + // let sql_ast = ctx.state().sql_to_statement(query, dialect)?; + // println!("SQL AST: {:?}", sql_ast); - let data = df.collect().await.unwrap(); + let df = ctx.sql(query).await?; + Ok(df) + } - print_batches(&data).expect("failed to print batches"); + pub fn take_record_batch( + record_batch: &RecordBatch, + indices: &dyn Array, + ) -> Result { + let columns = record_batch + .columns() + .iter() + .map(|c| take(c, indices, None)) + .collect::, _>>()?; + RecordBatch::try_new(record_batch.schema(), columns) } - mod arrow2_load { - use std::path::PathBuf; + #[cfg(test)] + mod test { + use arrow::compute::concat_batches; + use std::path::Path; - use crate::arrow2::{ - array::{PrimitiveArray, StructArray}, - datatypes::*, - }; + // FIXME: actually assert the tests below + // use pretty_assertions::assert_eq; use arrow::util::pretty::print_batches; + use arrow_array::RecordBatch; use tempfile::tempdir; - use raphtory::arrow::graph_impl::{ArrowGraph, ParquetLayerCols}; + use raphtory::{arrow::graph_impl::ArrowGraph, prelude::*}; use crate::run_cypher; - fn schema() -> ArrowSchema { - let srcs = Field::new("srcs", ArrowDataType::UInt64, false); - let dsts = Field::new("dsts", ArrowDataType::UInt64, false); - let time = Field::new("bla_time", ArrowDataType::Int64, false); - let weight = Field::new("weight", ArrowDataType::Float64, true); - ArrowSchema::from(vec![srcs, dsts, time, weight]) - } - - #[tokio::test] - async fn select_table_time_column_different_name() { - let graph_dir = tempdir().unwrap(); + lazy_static::lazy_static! { + static ref EDGES: Vec<(u64, u64, i64, f64)> = vec![ + (0, 1, 1, 3.), + (0, 1, 2, 4.), + (0, 2, 0, 1.), + (1, 2, 2, 4.), + (1, 3, 3, 4.), + (1, 4, 1, 1.), + (3, 2, 5, 5.), + (3, 4, 1, 6.), + (3, 4, 3, 6.), + (3, 5, 7, 6.), + (4, 5, 9, 7.), + ]; - let srcs = PrimitiveArray::from_vec(vec![1u64, 2u64, 2u64, 2u64]).boxed(); - let dsts = PrimitiveArray::from_vec(vec![3u64, 3u64, 4u64, 4u64]).boxed(); - let time = PrimitiveArray::from_vec(vec![2i64, 3i64, 4i64, 5i64]).boxed(); - let weight = PrimitiveArray::from_vec(vec![3.14f64, 4.14f64, 5.14f64, 6.14f64]).boxed(); + static ref EDGES2: Vec<(u64, u64, i64, f64, String)> = vec![ + (0, 1, 1, 3., "baa".to_string()), + (0, 2, 2, 7., "buu".to_string()), + (2, 3, 1, 9., "xbaa".to_string()), + (2, 3, 2, 1., "xbaa".to_string()), + (1, 0, 3, 4., "beea".to_string()), + (1, 0, 3, 1., "beex".to_string()), + (4, 1, 5, 5., "baaz".to_string()), + (4, 5, 1, 6., "bxx".to_string()), + (5, 6, 3, 6., "mbaa".to_string()), + (6, 4, 7, 8., "baa".to_string()), + (6, 4, 9, 7., "bzz".to_string()), + ]; - let chunk = StructArray::new( - ArrowDataType::Struct(schema().fields), - vec![srcs, dsts, time, weight], - None, - ); + // a star graph with 5 nodes an 4 edges + static ref EDGES3: Vec<(u64, u64, i64, f64)> = vec![ + (0, 2, 2, 7.), + (0, 3, 3, 9.), + (0, 4, 4, 1.), + (1, 0, 1, 3.), + ]; - // let node_gids = PrimitiveArray::from_vec((1u64..=4u64).collect()).boxed(); - let edge_lists = vec![chunk]; + // (id, name, age, city) + static ref NODES: Vec<(u64, String, i64, Option)> = vec![ + (0, "Alice", 30, None), + (1, "Bob", 25, Some( "Paris" )), + (2, "Charlie", 35, Some( "Berlin" )), + (3, "David", 40, None), + (4, "Eve", 45, Some( "London" )), + (5, "Frank", 50, Some( "Berlin" )), + (6, "Grace", 55, Some( "Paris" )), + ].into_iter().map(|(id, name, age, city)| { + (id, name.to_string(), age, city.map(|s| s.to_string())) + }).collect(); + } - let graph = - ArrowGraph::load_from_edge_lists(&edge_lists, 20, 20, graph_dir, 0, 1, 2).unwrap(); + //TODO: need better way of testing these, since they run in parallel order of batches is non-deterministic + #[tokio::test] + async fn select_table() { + let graph_dir = tempdir().unwrap(); + let graph = ArrowGraph::make_simple_graph(graph_dir, &EDGES, 3, 2); let df = run_cypher("match ()-[e]->() RETURN *", &graph, true) .await @@ -312,439 +257,509 @@ mod test { } #[tokio::test] - async fn select_table_parquet_column_different_name() { + async fn select_table_order_by() { let graph_dir = tempdir().unwrap(); - // relative to current dir back to parent dir then ./resource/netflowsorted - let netflow_layer_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")) - .parent() - .map(|p| p.join("raphtory/resources/test/netflow2.parquet")) + let graph = ArrowGraph::make_simple_graph(graph_dir, &EDGES, 3, 2); + + let df = run_cypher("match ()-[e]->() RETURN * ORDER by e.weight", &graph, true) + .await .unwrap(); - let v1_layer_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")) - .parent() - .map(|p| p.join("raphtory/resources/test/wls2.parquet")) + let data = df.collect().await.unwrap(); + + print_batches(&data).expect("failed to print batches"); + } + + mod arrow2_load { + use std::path::PathBuf; + + use crate::arrow2::{ + array::{PrimitiveArray, StructArray}, + datatypes::*, + }; + use arrow::util::pretty::print_batches; + use tempfile::tempdir; + + use raphtory::arrow::graph_impl::{ArrowGraph, ParquetLayerCols}; + + use crate::run_cypher; + + fn schema() -> ArrowSchema { + let srcs = Field::new("srcs", ArrowDataType::UInt64, false); + let dsts = Field::new("dsts", ArrowDataType::UInt64, false); + let time = Field::new("bla_time", ArrowDataType::Int64, false); + let weight = Field::new("weight", ArrowDataType::Float64, true); + ArrowSchema::from(vec![srcs, dsts, time, weight]) + } + + #[tokio::test] + async fn select_table_time_column_different_name() { + let graph_dir = tempdir().unwrap(); + + let srcs = PrimitiveArray::from_vec(vec![1u64, 2u64, 2u64, 2u64]).boxed(); + let dsts = PrimitiveArray::from_vec(vec![3u64, 3u64, 4u64, 4u64]).boxed(); + let time = PrimitiveArray::from_vec(vec![2i64, 3i64, 4i64, 5i64]).boxed(); + let weight = + PrimitiveArray::from_vec(vec![3.14f64, 4.14f64, 5.14f64, 6.14f64]).boxed(); + + let chunk = StructArray::new( + ArrowDataType::Struct(schema().fields), + vec![srcs, dsts, time, weight], + None, + ); + + // let node_gids = PrimitiveArray::from_vec((1u64..=4u64).collect()).boxed(); + + let edge_lists = vec![chunk]; + + let graph = + ArrowGraph::load_from_edge_lists(&edge_lists, 20, 20, graph_dir, 0, 1, 2) + .unwrap(); + + let df = run_cypher("match ()-[e]->() RETURN *", &graph, true) + .await + .unwrap(); + + let data = df.collect().await.unwrap(); + + print_batches(&data).expect("failed to print batches"); + } + + #[tokio::test] + async fn select_table_parquet_column_different_name() { + let graph_dir = tempdir().unwrap(); + // relative to current dir back to parent dir then ./resource/netflowsorted + let netflow_layer_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .parent() + .map(|p| p.join("raphtory/resources/test/netflow2.parquet")) + .unwrap(); + + let v1_layer_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .parent() + .map(|p| p.join("raphtory/resources/test/wls2.parquet")) + .unwrap(); + + let layer_parquet_cols = vec![ + ParquetLayerCols { + parquet_dir: netflow_layer_path.to_str().unwrap(), + layer: "netflow", + src_col: "source", + dst_col: "destination", + time_col: "time", + }, + ParquetLayerCols { + parquet_dir: v1_layer_path.to_str().unwrap(), + layer: "wls", + src_col: "src", + dst_col: "dst", + time_col: "epoch_time", + }, + ]; + + let graph = ArrowGraph::load_from_parquets( + graph_dir, + layer_parquet_cols, + None, + 100, + 100, + None, + None, + 1, + ) .unwrap(); - let layer_parquet_cols = vec![ - ParquetLayerCols { - parquet_dir: netflow_layer_path.to_str().unwrap(), - layer: "netflow", - src_col: "source", - dst_col: "destination", - time_col: "time", - }, - ParquetLayerCols { - parquet_dir: v1_layer_path.to_str().unwrap(), - layer: "wls", - src_col: "src", - dst_col: "dst", - time_col: "epoch_time", - }, - ]; + let df = run_cypher("match ()-[e]->() RETURN *", &graph, true) + .await + .unwrap(); - let graph = ArrowGraph::load_from_parquets( - graph_dir, - layer_parquet_cols, - None, - 100, - 100, - None, - None, - 1, - ) - .unwrap(); + let data = df.collect().await.unwrap(); - let df = run_cypher("match ()-[e]->() RETURN *", &graph, true) + print_batches(&data).expect("failed to print batches"); + } + } + + #[tokio::test] + async fn fork_path_on_star_graph() { + let graph_dir = tempdir().unwrap(); + + let graph = Graph::new(); + load_nodes(&graph); + load_star_edges(&graph); + + let graph = ArrowGraph::from_graph(&graph, graph_dir).unwrap(); + + let df = run_cypher("match ()-[e1]->(b)-[e2]->(), (b)-[e3]->() RETURN e1.src, e1.id, b.id, e2.id, e2.dst, e3.id, e3.dst", &graph, true) .await .unwrap(); - let data = df.collect().await.unwrap(); + print_batches(&data).expect("failed to print batches"); + let df = run_cypher("match (b)-[e3]->(), ()-[e1]->(b)-[e2]->() RETURN e1.src, e1.id, b.id, e2.id, e2.dst, e3.id, e3.dst", &graph, true) + .await + .unwrap(); + let data = df.collect().await.unwrap(); print_batches(&data).expect("failed to print batches"); } - } - #[tokio::test] - async fn fork_path_on_star_graph() { - let graph_dir = tempdir().unwrap(); + #[tokio::test] + async fn select_table_filter_weight() { + let graph_dir = tempdir().unwrap(); + let graph = ArrowGraph::make_simple_graph(graph_dir, &EDGES, 10, 10); - let graph = Graph::new(); - load_nodes(&graph); - load_star_edges(&graph); + let df = run_cypher("match ()-[e {src: 0}]->() RETURN *", &graph, true) + .await + .unwrap(); - let graph = ArrowGraph::from_graph(&graph, graph_dir).unwrap(); + let data = df.collect().await.unwrap(); - let df = run_cypher("match ()-[e1]->(b)-[e2]->(), (b)-[e3]->() RETURN e1.src, e1.id, b.id, e2.id, e2.dst, e3.id, e3.dst", &graph, true) - .await - .unwrap(); - let data = df.collect().await.unwrap(); - print_batches(&data).expect("failed to print batches"); + print_batches(&data).expect("failed to print batches"); - let df = run_cypher("match (b)-[e3]->(), ()-[e1]->(b)-[e2]->() RETURN e1.src, e1.id, b.id, e2.id, e2.dst, e3.id, e3.dst", &graph, true) + let df = run_cypher( + "match ()-[e]->() where e.rap_time >2 and e.weight<7 RETURN *", + &graph, + true, + ) .await .unwrap(); - let data = df.collect().await.unwrap(); - print_batches(&data).expect("failed to print batches"); - } - #[tokio::test] - async fn select_table_filter_weight() { - let graph_dir = tempdir().unwrap(); - let graph = ArrowGraph::make_simple_graph(graph_dir, &EDGES, 10, 10); + let data = df.collect().await.unwrap(); + print_batches(&data).expect("failed to print batches"); + } - let df = run_cypher("match ()-[e {src: 0}]->() RETURN *", &graph, true) - .await - .unwrap(); + #[tokio::test] + async fn two_hops() { + let graph_dir = tempdir().unwrap(); + let graph = ArrowGraph::make_simple_graph(graph_dir, &EDGES, 100, 100); - let data = df.collect().await.unwrap(); + let query = "match ()-[e1]->()-[e2]->() return e1.src as start, e1.dst as mid, e2.dst as end ORDER BY start, mid, end"; - print_batches(&data).expect("failed to print batches"); + let rb_hop = run_to_rb(&graph, query, true).await; + let rb_join = run_to_rb(&graph, query, false).await; - let df = run_cypher( - "match ()-[e]->() where e.rap_time >2 and e.weight<7 RETURN *", - &graph, - true, - ) - .await - .unwrap(); + assert_eq!(rb_hop, rb_join); + } - let data = df.collect().await.unwrap(); - print_batches(&data).expect("failed to print batches"); - } + async fn run_to_rb(graph: &ArrowGraph, query: &str, enable_hop_optim: bool) -> RecordBatch { + let df = run_cypher(query, &graph, enable_hop_optim).await.unwrap(); + let data = df.collect().await.unwrap(); + print_batches(&data).unwrap(); + let schema = data.first().map(|rb| rb.schema()).unwrap(); + concat_batches(&schema, data.iter()).unwrap() + } - #[tokio::test] - async fn two_hops() { - let graph_dir = tempdir().unwrap(); - let graph = ArrowGraph::make_simple_graph(graph_dir, &EDGES, 100, 100); + #[tokio::test] + #[ignore] // Hop optimization is not yet fully implemented + async fn three_hops() { + let graph_dir = tempdir().unwrap(); + let graph = ArrowGraph::make_simple_graph(graph_dir, &EDGES, 100, 100); - let query = "match ()-[e1]->()-[e2]->() return e1.src as start, e1.dst as mid, e2.dst as end ORDER BY start, mid, end"; + let query = "match ()-[e1]->()-[e2]->()-[e3]->() return * ORDER BY e1.src, e1.dst, e2.src, e2.dst, e3.src, e3.dst"; + let hop_rb = run_to_rb(&graph, query, true).await; + let hop_join = run_to_rb(&graph, query, false).await; - let rb_hop = run_to_rb(&graph, query, true).await; - let rb_join = run_to_rb(&graph, query, false).await; + assert_eq!(hop_rb.num_rows(), hop_join.num_rows()); + assert_eq!(hop_rb, hop_join); + } - assert_eq!(rb_hop, rb_join); - } + #[tokio::test] + async fn three_hops_with_condition() { + let graph_dir = tempdir().unwrap(); + let graph = ArrowGraph::make_simple_graph(graph_dir, &EDGES, 100, 100); - async fn run_to_rb(graph: &ArrowGraph, query: &str, enable_hop_optim: bool) -> RecordBatch { - let df = run_cypher(query, &graph, enable_hop_optim).await.unwrap(); - let data = df.collect().await.unwrap(); - print_batches(&data).unwrap(); - let schema = data.first().map(|rb| rb.schema()).unwrap(); - concat_batches(&schema, data.iter()).unwrap() - } + let df = run_cypher( + "match ()-[e1]->()-[e2]->()<-[e3]-() where e2.weight > 5 return *", + &graph, + true, + ) + .await + .unwrap(); - #[tokio::test] - #[ignore] // Hop optimization is not yet fully implemented - async fn three_hops() { - let graph_dir = tempdir().unwrap(); - let graph = ArrowGraph::make_simple_graph(graph_dir, &EDGES, 100, 100); + let data = df.collect().await.unwrap(); + print_batches(&data).unwrap(); + } - let query = "match ()-[e1]->()-[e2]->()-[e3]->() return * ORDER BY e1.src, e1.dst, e2.src, e2.dst, e3.src, e3.dst"; - let hop_rb = run_to_rb(&graph, query, true).await; - let hop_join = run_to_rb(&graph, query, false).await; + #[tokio::test] + async fn five_hops() { + let graph_dir = tempdir().unwrap(); + let graph = ArrowGraph::make_simple_graph(graph_dir, &EDGES, 100, 100); - assert_eq!(hop_rb.num_rows(), hop_join.num_rows()); - assert_eq!(hop_rb, hop_join); - } + let df = run_cypher( + "match ()-[e1]->()-[e2]->()-[e3]->()-[e4]->()-[e5]->() return *", + &graph, + true, + ) + .await + .unwrap(); - #[tokio::test] - async fn three_hops_with_condition() { - let graph_dir = tempdir().unwrap(); - let graph = ArrowGraph::make_simple_graph(graph_dir, &EDGES, 100, 100); - - let df = run_cypher( - "match ()-[e1]->()-[e2]->()<-[e3]-() where e2.weight > 5 return *", - &graph, - true, - ) - .await - .unwrap(); - - let data = df.collect().await.unwrap(); - print_batches(&data).unwrap(); - } + let data = df.collect().await.unwrap(); + print_batches(&data).unwrap(); + } - #[tokio::test] - async fn five_hops() { - let graph_dir = tempdir().unwrap(); - let graph = ArrowGraph::make_simple_graph(graph_dir, &EDGES, 100, 100); - - let df = run_cypher( - "match ()-[e1]->()-[e2]->()-[e3]->()-[e4]->()-[e5]->() return *", - &graph, - true, - ) - .await - .unwrap(); - - let data = df.collect().await.unwrap(); - print_batches(&data).unwrap(); - } + fn make_graph_with_str_col(graph_dir: impl AsRef) -> ArrowGraph { + let graph = Graph::new(); - fn make_graph_with_str_col(graph_dir: impl AsRef) -> ArrowGraph { - let graph = Graph::new(); + load_edges_with_str_props(&graph, None); - load_edges_with_str_props(&graph, None); + ArrowGraph::from_graph(&graph, graph_dir).unwrap() + } - ArrowGraph::from_graph(&graph, graph_dir).unwrap() - } + fn make_graph_with_node_props(graph_dir: impl AsRef) -> ArrowGraph { + let graph = Graph::new(); - fn make_graph_with_node_props(graph_dir: impl AsRef) -> ArrowGraph { - let graph = Graph::new(); + load_nodes(&graph); + load_edges_with_str_props(&graph, None); - load_nodes(&graph); - load_edges_with_str_props(&graph, None); + ArrowGraph::from_graph(&graph, graph_dir).unwrap() + } - ArrowGraph::from_graph(&graph, graph_dir).unwrap() - } + fn load_nodes(graph: &Graph) { + for (id, name, age, city) in NODES.iter() { + let nv = graph.add_node(0, *id, NO_PROPS, None).unwrap(); + nv.add_constant_properties(vec![ + ("name", Prop::str(name.as_ref())), + ("age", Prop::I64(*age)), + ]) + .unwrap(); + if let Some(city) = city { + nv.add_constant_properties(vec![("city", Prop::str(city.as_ref()))]) + .unwrap(); + } + } + } - fn load_nodes(graph: &Graph) { - for (id, name, age, city) in NODES.iter() { - let nv = graph.add_node(0, *id, NO_PROPS, None).unwrap(); - nv.add_constant_properties(vec![ - ("name", Prop::str(name.as_ref())), - ("age", Prop::I64(*age)), - ]) - .unwrap(); - if let Some(city) = city { - nv.add_constant_properties(vec![("city", Prop::str(city.as_ref()))]) + fn load_edges_with_str_props(graph: &Graph, layer: Option<&str>) { + for (src, dst, t, weight, name) in EDGES2.iter() { + graph + .add_edge( + *t, + *src, + *dst, + [ + ("weight", Prop::F64(*weight)), + ("name", Prop::Str(name.to_owned().into())), + ], + layer, + ) .unwrap(); } } - } - fn load_edges_with_str_props(graph: &Graph, layer: Option<&str>) { - for (src, dst, t, weight, name) in EDGES2.iter() { - graph - .add_edge( - *t, - *src, - *dst, - [ - ("weight", Prop::F64(*weight)), - ("name", Prop::Str(name.to_owned().into())), - ], - layer, - ) - .unwrap(); + fn load_edges_1(graph: &Graph, layer: Option<&str>) { + for (src, dst, t, weight) in EDGES.iter() { + graph + .add_edge(*t, *src, *dst, [("weight", Prop::F64(*weight))], layer) + .unwrap(); + } } - } - fn load_edges_1(graph: &Graph, layer: Option<&str>) { - for (src, dst, t, weight) in EDGES.iter() { - graph - .add_edge(*t, *src, *dst, [("weight", Prop::F64(*weight))], layer) - .unwrap(); + fn load_star_edges(graph: &Graph) { + for (src, dst, t, weight) in EDGES3.iter() { + graph + .add_edge(*t, *src, *dst, [("weight", Prop::F64(*weight))], None) + .unwrap(); + } } - } - fn load_star_edges(graph: &Graph) { - for (src, dst, t, weight) in EDGES3.iter() { - graph - .add_edge(*t, *src, *dst, [("weight", Prop::F64(*weight))], None) - .unwrap(); - } - } + #[tokio::test] + async fn select_contains() { + let graph_dir = tempdir().unwrap(); + let graph = make_graph_with_str_col(graph_dir); - #[tokio::test] - async fn select_contains() { - let graph_dir = tempdir().unwrap(); - let graph = make_graph_with_str_col(graph_dir); - - let df = run_cypher( - "match ()-[e]->() where e.name ends WITH 'z' RETURN e", - &graph, - true, - ) - .await - .unwrap(); - - let data = df.collect().await.unwrap(); - print_batches(&data).unwrap(); - } + let df = run_cypher( + "match ()-[e]->() where e.name ends WITH 'z' RETURN e", + &graph, + true, + ) + .await + .unwrap(); - #[tokio::test] - async fn select_contains_count() { - let graph_dir = tempdir().unwrap(); - let graph = make_graph_with_str_col(graph_dir); - - let df = run_cypher( - "match ()-[e]->() where e.name ends with 'z' return count(e.name)", - &graph, - true, - ) - .await - .unwrap(); - let data = df.collect().await.unwrap(); - print_batches(&data).unwrap(); - } + let data = df.collect().await.unwrap(); + print_batches(&data).unwrap(); + } - #[tokio::test] - async fn select_all_nodes() { - let graph_dir = tempdir().unwrap(); - let graph = make_graph_with_node_props(graph_dir); + #[tokio::test] + async fn select_contains_count() { + let graph_dir = tempdir().unwrap(); + let graph = make_graph_with_str_col(graph_dir); - let df = run_cypher("match (n) return n", &graph, true) + let df = run_cypher( + "match ()-[e]->() where e.name ends with 'z' return count(e.name)", + &graph, + true, + ) .await .unwrap(); - let data = df.collect().await.unwrap(); - print_batches(&data).unwrap(); - } + let data = df.collect().await.unwrap(); + print_batches(&data).unwrap(); + } - #[tokio::test] - async fn select_node_names_from_edges() { - let graph_dir = tempdir().unwrap(); - let graph = make_graph_with_node_props(graph_dir); + #[tokio::test] + async fn select_all_nodes() { + let graph_dir = tempdir().unwrap(); + let graph = make_graph_with_node_props(graph_dir); - let df = run_cypher("match (a)-[e]->(b) return a.name, e, b.name", &graph, true) - .await - .unwrap(); - let data = df.collect().await.unwrap(); - print_batches(&data).unwrap(); - } + let df = run_cypher("match (n) return n", &graph, true) + .await + .unwrap(); + let data = df.collect().await.unwrap(); + print_batches(&data).unwrap(); + } - #[tokio::test] - async fn select_node_names_2_hops() { - let graph_dir = tempdir().unwrap(); - let graph = make_graph_with_node_props(graph_dir); - - let df = run_cypher( - "match (a)-[e1]->(b)-[e2]->(c) return a.name, b.name, c.name", - &graph, - true, - ) - .await - .unwrap(); - let data = df.collect().await.unwrap(); - print_batches(&data).unwrap(); - } + #[tokio::test] + async fn select_node_names_from_edges() { + let graph_dir = tempdir().unwrap(); + let graph = make_graph_with_node_props(graph_dir); - #[tokio::test] - async fn select_count_nodes() { - let graph_dir = tempdir().unwrap(); - let graph = make_graph_with_node_props(graph_dir); + let df = run_cypher("match (a)-[e]->(b) return a.name, e, b.name", &graph, true) + .await + .unwrap(); + let data = df.collect().await.unwrap(); + print_batches(&data).unwrap(); + } - let df = run_cypher("match (n) return count(n)", &graph, true) - .await - .unwrap(); - let data = df.collect().await.unwrap(); - print_batches(&data).unwrap(); + #[tokio::test] + async fn select_node_names_2_hops() { + let graph_dir = tempdir().unwrap(); + let graph = make_graph_with_node_props(graph_dir); - let df = run_cypher("match (n) return count(*)", &graph, true) + let df = run_cypher( + "match (a)-[e1]->(b)-[e2]->(c) return a.name, b.name, c.name", + &graph, + true, + ) .await .unwrap(); - let data = df.collect().await.unwrap(); - print_batches(&data).unwrap(); - } + let data = df.collect().await.unwrap(); + print_batches(&data).unwrap(); + } - #[tokio::test] - async fn select_union_multiple_layers() { - let graph_dir = tempdir().unwrap(); - let g = Graph::new(); + #[tokio::test] + async fn select_count_nodes() { + let graph_dir = tempdir().unwrap(); + let graph = make_graph_with_node_props(graph_dir); - load_edges_1(&g, Some("LAYER1")); - load_edges_with_str_props(&g, Some("LAYER2")); + let df = run_cypher("match (n) return count(n)", &graph, true) + .await + .unwrap(); + let data = df.collect().await.unwrap(); + print_batches(&data).unwrap(); - let graph = ArrowGraph::from_graph(&g, graph_dir).unwrap(); + let df = run_cypher("match (n) return count(*)", &graph, true) + .await + .unwrap(); + let data = df.collect().await.unwrap(); + print_batches(&data).unwrap(); + } - let df = run_cypher( - "match ()-[e:_default|LAYER1|LAYER2]->() where (e.weight > 3 and e.weight < 5) or e.name starts with 'xb' return e", - &graph, - true).await.unwrap(); + #[tokio::test] + async fn select_union_multiple_layers() { + let graph_dir = tempdir().unwrap(); + let g = Graph::new(); - let data = df.collect().await.unwrap(); - print_batches(&data).unwrap(); - } + load_edges_1(&g, Some("LAYER1")); + load_edges_with_str_props(&g, Some("LAYER2")); - #[tokio::test] - async fn hop_two_different_layers() { - let graph_dir = tempdir().unwrap(); - let g = Graph::new(); + let graph = ArrowGraph::from_graph(&g, graph_dir).unwrap(); - load_edges_1(&g, Some("LAYER1")); - load_edges_with_str_props(&g, Some("LAYER2")); + let df = run_cypher( + "match ()-[e:_default|LAYER1|LAYER2]->() where (e.weight > 3 and e.weight < 5) or e.name starts with 'xb' return e", + &graph, + true).await.unwrap(); - let graph = ArrowGraph::from_graph(&g, graph_dir).unwrap(); + let data = df.collect().await.unwrap(); + print_batches(&data).unwrap(); + } - let df = run_cypher("match ()-[e2:LAYER2]->() RETURN *", &graph, true) - .await - .unwrap(); - let data = df.collect().await.unwrap(); - print_batches(&data).expect("failed to print batches"); - - let df = run_cypher( - "match ()-[e1:LAYER1]->()-[e2:LAYER2]->() RETURN count(*)", - &graph, - true, - ) - .await - .unwrap(); - - let data = df.collect().await.unwrap(); - print_batches(&data).expect("failed to print batches"); - } + #[tokio::test] + async fn hop_two_different_layers() { + let graph_dir = tempdir().unwrap(); + let g = Graph::new(); - #[tokio::test] - async fn select_all_multiple_layers() { - let graph_dir = tempdir().unwrap(); - let g = Graph::new(); + load_edges_1(&g, Some("LAYER1")); + load_edges_with_str_props(&g, Some("LAYER2")); - load_edges_1(&g, Some("LAYER1")); - load_edges_with_str_props(&g, Some("LAYER2")); + let graph = ArrowGraph::from_graph(&g, graph_dir).unwrap(); - let graph = ArrowGraph::from_graph(&g, graph_dir).unwrap(); + let df = run_cypher("match ()-[e2:LAYER2]->() RETURN *", &graph, true) + .await + .unwrap(); + let data = df.collect().await.unwrap(); + print_batches(&data).expect("failed to print batches"); - let df = run_cypher("match ()-[e]->() RETURN *", &graph, true) + let df = run_cypher( + "match ()-[e1:LAYER1]->()-[e2:LAYER2]->() RETURN count(*)", + &graph, + true, + ) .await .unwrap(); - let data = df.collect().await.unwrap(); + let data = df.collect().await.unwrap(); + print_batches(&data).expect("failed to print batches"); + } - print_batches(&data).expect("failed to print batches"); - } + #[tokio::test] + async fn select_all_multiple_layers() { + let graph_dir = tempdir().unwrap(); + let g = Graph::new(); - #[tokio::test] - async fn select_all_layers_expand_layer_type() { - let graph_dir = tempdir().unwrap(); - let g = Graph::new(); + load_edges_1(&g, Some("LAYER1")); + load_edges_with_str_props(&g, Some("LAYER2")); - load_edges_1(&g, Some("LAYER1")); - load_edges_with_str_props(&g, Some("LAYER2")); + let graph = ArrowGraph::from_graph(&g, graph_dir).unwrap(); - let graph = ArrowGraph::from_graph(&g, graph_dir).unwrap(); - let df = run_cypher("match ()-[e]->() return type(e), e", &graph, true) - .await - .unwrap(); + let df = run_cypher("match ()-[e]->() RETURN *", &graph, true) + .await + .unwrap(); - let data = df.collect().await.unwrap(); - print_batches(&data).unwrap(); - } + let data = df.collect().await.unwrap(); + + print_batches(&data).expect("failed to print batches"); + } + + #[tokio::test] + async fn select_all_layers_expand_layer_type() { + let graph_dir = tempdir().unwrap(); + let g = Graph::new(); + + load_edges_1(&g, Some("LAYER1")); + load_edges_with_str_props(&g, Some("LAYER2")); + + let graph = ArrowGraph::from_graph(&g, graph_dir).unwrap(); + let df = run_cypher("match ()-[e]->() return type(e), e", &graph, true) + .await + .unwrap(); - #[tokio::test] - async fn select_contains_count_star() { - let graph_dir = tempdir().unwrap(); - let graph = make_graph_with_str_col(graph_dir); + let data = df.collect().await.unwrap(); + print_batches(&data).unwrap(); + } - let df = run_cypher("match ()-[e]->() return count(*)", &graph, true) + #[tokio::test] + async fn select_contains_count_star() { + let graph_dir = tempdir().unwrap(); + let graph = make_graph_with_str_col(graph_dir); + + let df = run_cypher("match ()-[e]->() return count(*)", &graph, true) + .await + .unwrap(); + let data = df.collect().await.unwrap(); + print_batches(&data).unwrap(); + } + + #[tokio::test] + async fn select_contains_limit() { + let graph_dir = tempdir().unwrap(); + let graph = make_graph_with_str_col(graph_dir); + + let df = run_cypher( + "match ()-[e]->() where e.name contains 'a' return e limit 2", + &graph, + true, + ) .await .unwrap(); - let data = df.collect().await.unwrap(); - print_batches(&data).unwrap(); - } - - #[tokio::test] - async fn select_contains_limit() { - let graph_dir = tempdir().unwrap(); - let graph = make_graph_with_str_col(graph_dir); - - let df = run_cypher( - "match ()-[e]->() where e.name contains 'a' return e limit 2", - &graph, - true, - ) - .await - .unwrap(); - let data = df.collect().await.unwrap(); - print_batches(&data).unwrap(); + let data = df.collect().await.unwrap(); + print_batches(&data).unwrap(); + } } } diff --git a/raphtory-cypher/src/transpiler/exprs.rs b/raphtory-cypher/src/transpiler/exprs.rs index f42f57df3e..6d47c8b14e 100644 --- a/raphtory-cypher/src/transpiler/exprs.rs +++ b/raphtory-cypher/src/transpiler/exprs.rs @@ -1,6 +1,6 @@ use sqlparser::ast::{self as sql_ast}; -use crate::{Clause, Query, Return}; +use crate::parser::ast::{Clause, Query, Return}; pub fn parse_limit(query: &Query) -> Option { query.clauses().iter().find_map(|clause| match clause { diff --git a/raphtory-storage/Cargo.toml b/raphtory-storage/Cargo.toml deleted file mode 100644 index cffde0efed..0000000000 --- a/raphtory-storage/Cargo.toml +++ /dev/null @@ -1,31 +0,0 @@ -[package] -name = "raphtory-storage" -version.workspace = true -documentation.workspace = true -repository.workspace = true -license.workspace = true -readme.workspace = true -homepage.workspace = true -keywords.workspace = true -authors.workspace = true -rust-version.workspace = true -edition.workspace = true - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -itertools.workspace = true -parking_lot.workspace = true -raphtory = { path = "../raphtory", features = ["io", "arrow", "python"] } -raphtory-arrow.workspace = true -rayon.workspace = true -polars-arrow.workspace = true -ahash.workspace = true -tempfile.workspace = true -dashmap.workspace = true -serde_json.workspace = true -pyo3.workspace = true - -[dev-dependencies] -serde.workspace = true -proptest.workspace = true diff --git a/raphtory-storage/examples/arrow_graph_cc.rs b/raphtory-storage/examples/arrow_graph_cc.rs deleted file mode 100644 index 359a3ce4aa..0000000000 --- a/raphtory-storage/examples/arrow_graph_cc.rs +++ /dev/null @@ -1,218 +0,0 @@ -use raphtory::{ - arrow::{ - graph_impl::{ArrowGraph, ParquetLayerCols}, - query::{ast::Query, executors::rayon2, ForwardState}, - }, - core::entities::VID, - db::api::view::GraphViewOps, -}; -use raphtory_arrow::{algorithms::connected_components, graph_fragment::TempColGraphFragment}; -use std::{io::Write, time::Instant}; - -fn main() { - // Retrieve command line arguments - let args = || std::env::args(); - - let graph_dir = args().nth(1).expect("Graph directory not provided"); - - let graph2 = if let Ok(_) = std::fs::metadata(&graph_dir) { - ArrowGraph::load_from_dir(graph_dir).expect("Cannot open graph") - } else { - let parquet_dir = &args().nth(2).expect("Parquet directory not provided"); - - let chunk_size = args() - .nth(3) - .and_then(|x| x.parse::().ok()) - .unwrap_or(268_435_456); - let num_threads = args() - .nth(4) - .and_then(|x| x.parse::().ok()) - .unwrap_or(8); - let t_props_chunk_size = args() - .nth(5) - .and_then(|x| x.parse::().ok()) - .unwrap_or(chunk_size / 16); - - let concurrent_files = args() - .nth(6) - .and_then(|x| x.parse::().ok()) - .unwrap_or(1); - - let read_chunk_size = args() - .nth(7) - .and_then(|x| x.parse::().ok()) - .unwrap_or(4_000_000); - - println!( - "Loading graph from parquet: {} with chunk size: {chunk_size} t_prop chunk size {t_props_chunk_size}, concurrent_files: {concurrent_files} and threads: {num_threads}", - parquet_dir - ); - - let now = Instant::now(); - let graph = ArrowGraph::load_from_parquets( - graph_dir, - vec![ParquetLayerCols { - parquet_dir, - layer: "default", - src_col: "src", - dst_col: "dst", - time_col: "time", - }], - None, - chunk_size, - t_props_chunk_size, - Some(read_chunk_size), - Some(concurrent_files), - num_threads, - ) - .expect("Cannot load graph"); - println!("########## Load took {:?} ########## ", now.elapsed()); - graph - }; - - connected_components(&graph2); - hop_query(&graph2); -} - -fn connected_components(tg: &ArrowGraph) { - println!("Graph has {} nodes", tg.count_nodes()); - println!("Graph has {} edges", tg.count_edges()); - - let now = Instant::now(); - // let ccs = weakly_connected_components(&graph2, 100, None).group_by(); - let out = connected_components::connected_components(tg.as_ref()); - println!( - "########## Arrow CC took {:?} ########## len: {}", - now.elapsed(), - out.len() - ); -} - -fn hop_query(tg: &ArrowGraph) { - let now = Instant::now(); - - let nodes = vec![ - VID(309582099), - VID(54328484), - VID(246173700), - VID(16700593), - VID(177352288), - VID(180812214), - VID(52391093), - VID(399368698), - VID(263103204), - VID(379960042), - VID(416420825), - VID(199488353), - VID(224963182), - VID(51977241), - VID(856781), - VID(444466102), - VID(418741608), - VID(192869236), - VID(299536904), - VID(85715682), - VID(132369141), - VID(202535826), - VID(333437339), - VID(263640094), - VID(33964780), - VID(379081115), - VID(290623079), - VID(395279946), - VID(133035021), - VID(249927504), - VID(261634684), - VID(430970739), - VID(253060757), - VID(272814697), - VID(158376132), - VID(86272904), - VID(326943324), - VID(82327004), - VID(261701485), - VID(109463839), - VID(117863968), - VID(163145864), - VID(330916934), - VID(211355612), - VID(281370847), - VID(371456910), - VID(299845460), - VID(344814299), - VID(90076774), - VID(277046483), - VID(202223853), - VID(315635830), - VID(404087723), - VID(217660841), - VID(262444201), - VID(38909930), - VID(299362410), - VID(436843462), - VID(228264831), - VID(146444304), - VID(89715034), - VID(109094148), - VID(71703352), - VID(253889004), - VID(264785705), - VID(36547407), - VID(158966904), - VID(319912238), - VID(20208726), - VID(156259436), - VID(1721625), - VID(205725206), - VID(442549275), - VID(410095341), - VID(339347119), - VID(318108647), - VID(235328888), - VID(398864679), - VID(18989798), - VID(257431550), - VID(299924240), - VID(296379526), - VID(9157244), - VID(89996738), - VID(170704515), - VID(134164014), - VID(219867467), - VID(244239784), - VID(258341225), - VID(274228163), - VID(342058645), - VID(66680949), - VID(150576676), - VID(248701795), - VID(221102041), - VID(325407184), - VID(45609419), - VID(69308556), - VID(130864213), - VID(205326867), - ]; - - let query: Query = Query::new() - .out_limit("default", 100) - // .out_limit("default", 100) - // .out_limit("default", 100) - // .out_limit("default", 100) - // .out_limit("default", 100) - .path("hop", |mut writer, state: ForwardState| { - serde_json::to_writer(&mut writer, &state.path).unwrap(); - write!(writer, "\n").unwrap(); - }); - - let _ = rayon2::execute::( - query, - raphtory::arrow::query::NodeSource::NodeIds(nodes), - tg, - |node| { - let earliest = node.earliest(); - ForwardState::at_time(node, earliest, 100) - }, - ); - println!("########## Arrow Hop took {:?} ##########", now.elapsed()); -} diff --git a/raphtory-storage/examples/hop_hop_forward.rs b/raphtory-storage/examples/hop_hop_forward.rs deleted file mode 100644 index d9f9043357..0000000000 --- a/raphtory-storage/examples/hop_hop_forward.rs +++ /dev/null @@ -1,103 +0,0 @@ -use raphtory_arrow::{graph::TemporalGraph, load::ExternalEdgeList}; -use raphtory_storage::lanl::exfiltration::query1; -use std::time::Instant; - -fn main() { - let graph_dir = std::env::args() - .nth(1) - .expect("please supply a graph directory"); - - println!("graph_dir: {:?}", graph_dir); - - let now = Instant::now(); - let graph = if std::fs::read_dir(&graph_dir).is_ok() { - TemporalGraph::new(&graph_dir).expect("failed to load graph") - } else { - let netflow_dir = std::env::args() - .nth(2) - .expect("please supply a wls directory"); - - let v1_dir = std::env::args() - .nth(3) - .expect("please supply a v1 directory"); - - let v2_dir = std::env::args() - .nth(4) - .expect("please supply a v2 directory"); - - let num_threads = std::env::args() - .nth(5) - .and_then(|x| x.parse::().ok()) - .unwrap_or(8); - let chunk_size = std::env::args() - .nth(6) - .and_then(|x| x.parse::().ok()) - .unwrap_or(8_388_608); - let t_props_chunk_size = std::env::args() - .nth(7) - .and_then(|x| x.parse::().ok()) - .unwrap_or(20_970_100); - - println!("netflow_dir: {:?}", netflow_dir); - println!("v1_dir: {:?}", v1_dir); - println!("v2_dir: {:?}", v2_dir); - println!("chunk_size: {:?}", chunk_size); - println!("t_props_chunk_size: {:?}", t_props_chunk_size); - println!("num_threads: {:?}", num_threads); - - let layered_edge_list = [ - ExternalEdgeList::new("netflow", netflow_dir, "src", "dst", "epoch_time") - .expect("failed to load netflow"), - ExternalEdgeList::new("events_1v", v1_dir, "src", "dst", "epoch_time") - .expect("failed to load events_v1"), - ExternalEdgeList::new("events_2v", v2_dir, "src", "dst", "epoch_time") - .expect("failed to load events_v2"), - ]; - let graph = TemporalGraph::from_edge_lists( - num_threads, - chunk_size, - t_props_chunk_size, - None, - None, - graph_dir, - layered_edge_list, - None, - ) - .expect("failed to load graph"); - graph - }; - println!("Time taken to load graph: {:?}", now.elapsed()); - - println!("graph nodes count {}", graph.num_nodes()); - println!("graph edges count netflow {}", graph.num_edges(0)); - println!("graph edges count 1v {}", graph.num_edges(1)); - println!("graph edges count 2v {}", graph.num_edges(2)); - let now = Instant::now(); - - // MATCH - // (E)<-[nf1:Netflow]-(B)<-[login1:Events2v]-(A), (B)<-[prog1:Events1v]-(B) - // WHERE A <> B AND B <> E AND A <> E - // AND login1.eventID = 4624 - // AND prog1.eventID = 4688 - // AND nf1.dstBytes > 100000000 - // // time constraints within each path - // AND login1.epochtime < prog1.epochtime - // AND prog1.epochtime < nf1.epochtime - // AND nf1.epochtime - login1.epochtime <= 30 - // RETURN count(*) - - // Launched job 7 - // Number of answers: 2,992,551 - // CPU times: user 25.9 ms, sys: 11.7 ms, total: 37.6 ms - // Wall time: 21.8 s - - // Devices (vertices): 159,245 - // Netflow (edges): 317,164,045 - // Host event 1-vertex (edges): 33,480,483 - // Host event 2-vertex (edges): 97,716,529 - // Total (edges): 448,361,057 - - let count = query1::run(&graph); - - println!("Time taken: {:?}, count: {:?}", now.elapsed(), count); -} diff --git a/raphtory-storage/examples/hop_hop_forward_count.rs b/raphtory-storage/examples/hop_hop_forward_count.rs deleted file mode 100644 index 6837666ee3..0000000000 --- a/raphtory-storage/examples/hop_hop_forward_count.rs +++ /dev/null @@ -1,64 +0,0 @@ -use raphtory_arrow::{graph::TemporalGraph, load::ExternalEdgeList}; - -use raphtory_storage::lanl::exfiltration::count::query_total; -use std::time::Instant; - -fn main() { - let window = 30; // change me to get bigger windows - let graph_dir = std::env::args() - .nth(1) - .expect("please supply a graph directory"); - - println!("graph_dir: {:?}", graph_dir); - - let now = Instant::now(); - let graph = if std::fs::read_dir(&graph_dir).is_ok() { - TemporalGraph::new(&graph_dir).expect("failed to load graph") - } else { - let netflow_dir = std::env::args() - .nth(2) - .expect("please supply a wls directory"); - - let v1_dir = std::env::args() - .nth(3) - .expect("please supply a v1 directory"); - - let v2_dir = std::env::args() - .nth(4) - .expect("please supply a v2 directory"); - - println!("netflow_dir: {:?}", netflow_dir); - println!("v1_dir: {:?}", v1_dir); - println!("v2_dir: {:?}", v2_dir); - - let layered_edge_list = [ - ExternalEdgeList::new("netflow", netflow_dir, "src", "dst", "epoch_time") - .expect("failed to load netflow"), - ExternalEdgeList::new("events_1v", v1_dir, "src", "dst", "epoch_time") - .expect("failed to load events_v1"), - ExternalEdgeList::new("events_2v", v2_dir, "src", "dst", "epoch_time") - .expect("failed to load events_v2"), - ]; - let chunk_size = 8_388_608; - let t_props_chunk_size = 20_970_100; - let graph = TemporalGraph::from_edge_lists( - 8, - chunk_size, - t_props_chunk_size, - None, - None, - graph_dir, - layered_edge_list, - None, - ) - .expect("failed to load graph"); - graph - }; - println!("Time taken to load graph: {:?}", now.elapsed()); - - let now = Instant::now(); - - let count: usize = query_total(&graph, window); - - println!("Time taken: {:?}, count: {:?}", now.elapsed(), count); -} diff --git a/raphtory-storage/examples/hop_hop_forward_materialised.rs b/raphtory-storage/examples/hop_hop_forward_materialised.rs deleted file mode 100644 index 8ae250b4ea..0000000000 --- a/raphtory-storage/examples/hop_hop_forward_materialised.rs +++ /dev/null @@ -1,64 +0,0 @@ -use raphtory_arrow::{graph::TemporalGraph, load::ExternalEdgeList}; -use raphtory_storage::lanl::exfiltration::list::query; -use rayon::prelude::*; -use std::time::Instant; - -fn main() { - let window = 30; // change me to get bigger windows - let graph_dir = std::env::args() - .nth(1) - .expect("please supply a graph directory"); - - println!("graph_dir: {:?}", graph_dir); - - let now = Instant::now(); - let graph = if std::fs::read_dir(&graph_dir).is_ok() { - TemporalGraph::new(&graph_dir).expect("failed to load graph") - } else { - let netflow_dir = std::env::args() - .nth(2) - .expect("please supply a wls directory"); - - let v1_dir = std::env::args() - .nth(3) - .expect("please supply a v1 directory"); - - let v2_dir = std::env::args() - .nth(4) - .expect("please supply a v2 directory"); - - println!("netflow_dir: {:?}", netflow_dir); - println!("v1_dir: {:?}", v1_dir); - println!("v2_dir: {:?}", v2_dir); - - let layered_edge_list = [ - ExternalEdgeList::new("netflow", netflow_dir, "src", "dst", "epoch_time") - .expect("failed to load netflow"), - ExternalEdgeList::new("events_1v", v1_dir, "src", "dst", "epoch_time") - .expect("failed to load events_v1"), - ExternalEdgeList::new("events_2v", v2_dir, "src", "dst", "epoch_time") - .expect("failed to load events_v2"), - ]; - let chunk_size = 8_388_608; - let t_props_chunk_size = 20_970_100; - let graph = TemporalGraph::from_edge_lists( - 8, - chunk_size, - t_props_chunk_size, - None, - None, - graph_dir, - layered_edge_list, - None, - ) - .expect("failed to load graph"); - graph - }; - println!("Time taken to load graph: {:?}", now.elapsed()); - - let now = Instant::now(); - - let iter = query(&graph, window); - let count = iter.count(); - println!("Time taken: {:?}, count: {count}", now.elapsed()); -} diff --git a/raphtory-storage/python_scripts/data_loader.py b/raphtory-storage/python_scripts/data_loader.py deleted file mode 100644 index ba9348ebdc..0000000000 --- a/raphtory-storage/python_scripts/data_loader.py +++ /dev/null @@ -1,19 +0,0 @@ -from pathlib import Path -import polars as pl -from tqdm import tqdm - -base = Path.cwd() -files = sorted((base / "edges").iterdir()) -out_dir = base / "edges_parquet" -out_dir.mkdir(exist_ok=True) - -offset = 0 -for file in tqdm(files): - df = pl.read_csv(file, separator=" ", has_header=False, new_columns=["src", "dst"]) - num_rows = df.shape[0] - df = df.with_columns([pl.col("src").alias("src_hash"), pl.col("dst").alias("dst_hash"), pl.int_range(start=offset, end=offset + num_rows, dtype=pl.Int64).alias("time")]) - offset += num_rows - out_file = out_dir / (file.stem + ".snappy.parquet") - df = df.cast({"src": pl.UInt64, "dst": pl.UInt64, "src_hash": pl.Int64, "dst_hash": pl.Int64, "time": pl.Int64}) - df.write_parquet(out_file, compression="snappy") - print(f"processed {file}") diff --git a/raphtory-storage/src/lanl/exfiltration/count.rs b/raphtory-storage/src/lanl/exfiltration/count.rs deleted file mode 100644 index 624db0f5ba..0000000000 --- a/raphtory-storage/src/lanl/exfiltration/count.rs +++ /dev/null @@ -1,495 +0,0 @@ -use crate::lanl::exfiltration::find_active_nodes; -use itertools::kmerge_by; -use raphtory::{ - arrow::Time, - core::{ - entities::{EID, VID}, - Direction, - }, -}; -use raphtory_arrow::{ - edge::Edge, global_order::GlobalOrder, graph::TemporalGraph, - graph_fragment::TempColGraphFragment, prelude::*, -}; -use rayon::prelude::*; -use std::{ - cmp::Ordering, - collections::VecDeque, - sync::{ - atomic::{AtomicU64, Ordering::Relaxed}, - Arc, - }, - time::Instant, -}; - -#[derive(Debug, Eq, Copy, Clone)] -enum Window { - Start { t: i64 }, - End { t: i64 }, -} - -impl PartialOrd for Window { - #[inline] - fn partial_cmp(&self, other: &Self) -> Option { - self.t().partial_cmp(&other.t()) - } -} - -impl Ord for Window { - #[inline] - fn cmp(&self, other: &Self) -> Ordering { - self.t().cmp(&other.t()) - } -} - -impl PartialEq for Window { - #[inline] - fn eq(&self, other: &Self) -> bool { - self.t() == other.t() - } -} - -impl Window { - #[inline] - pub fn t(&self) -> i64 { - match self { - Window::Start { t, .. } => *t, - Window::End { t, .. } => *t, - } - } -} -#[inline] -fn window_bounds(t: i64, w: i64) -> [Window; 2] { - [Window::Start { t: t - w }, Window::End { t }] -} -#[inline] -fn valid_netflow_events( - nft_graph: &TempColGraphFragment, - b_vid: VID, - bytes_prop_id: usize, - window: i64, -) -> Option)>> { - let nft_events: Vec<_> = nft_graph - .out_edges_par(b_vid) - .filter(|(_, e_vid)| *e_vid != b_vid) - .filter_map(|(edge_id, e_vid)| { - let nf1 = nft_graph.edge(edge_id); - let mut valid_events: Vec<_> = nf1 - .par_prop_items_unchecked::(bytes_prop_id) - .unwrap() - .filter_map(move |(t, v)| (v > 100_000_000).then(|| window_bounds(t, window))) - .flatten() - .collect(); - if valid_events.is_empty() { - None - } else { - valid_events.sort_by(|a, b| b.cmp(a)); // reverse sort - Some((e_vid.into(), valid_events)) - } - }) - .collect(); - if nft_events.is_empty() { - None - } else { - Some(nft_events) - } -} - -#[derive(Debug, Default)] -struct MergeCounter { - count: usize, - active_windows: VecDeque, - lookup: Vec<(Time, usize)>, - event_count: usize, -} - -impl MergeCounter { - #[inline] - fn update_lookup(&mut self, t: Time) { - if let Some(last) = self.lookup.last_mut().filter(|(last_t, _)| last_t == &t) { - last.1 = self.count; - } else { - self.lookup.push((t, self.count)) - } - } - - #[inline] - fn update_window_start(&mut self, t: Time) { - //finished a window - let index = self.active_windows.pop_front().unwrap(); - let num_events = self.event_count - index; - if num_events != 0 { - self.count -= num_events; - self.update_lookup(t); - } - } - #[inline] - fn update_nft(&mut self, event: Window) { - match event { - Window::Start { t } => self.update_window_start(t), - Window::End { .. } => { - //start a new window - self.active_windows.push_back(self.event_count); - } - } - } - #[inline] - fn update_prog1(&mut self, prog1: i64) { - if !self.active_windows.is_empty() { - self.event_count += 1; - self.count += self.active_windows.len(); - self.update_lookup(prog1); - } - } - #[inline] - fn finish( - mut self, - remaining_windows: impl Iterator, - ) -> Option> { - if !self.active_windows.is_empty() { - for w in remaining_windows { - match w { - Window::Start { t } => self.update_window_start(t), - _ => {} - } - if self.active_windows.is_empty() { - break; - } - } - } - if self.lookup.is_empty() { - None - } else { - Some(self.lookup) - } - } -} - -#[inline] -fn merge_nft_prog1( - events_1v_edge: &Edge, - nft_events: impl IntoIterator, - prop_id: usize, -) -> Option> { - let prog_events = events_1v_edge - .prop_items_unchecked::(prop_id) - .filter_map(|(t, v)| (v == 4688).then_some(t)) - .rev(); - let mut nft_events_iter = nft_events.into_iter(); - let mut next_nft_event = nft_events_iter.next(); - let mut merge_counter = MergeCounter::default(); - for t in prog_events { - while next_nft_event.filter(|event| event.t() > t).is_some() { - merge_counter.update_nft(next_nft_event.unwrap()); - next_nft_event = nft_events_iter.next(); - } - if next_nft_event.is_none() { - // no more windows - break; - } - merge_counter.update_prog1(t); - } - // finalise any remaining open windows! - merge_counter.finish(next_nft_event.into_iter().chain(nft_events_iter)) -} - -#[inline] -fn loop_events( - a_vid: VID, - events_1v_edge: &Edge, - login_edge: &Edge, - nft_events: &[(VID, Vec)], - prog1_prop_id: usize, - login_prop_id: usize, -) -> Option { - let index = nft_events.binary_search_by_key(&a_vid, |(v, _)| *v).ok()?; - let nf_events_iter = nft_events[index].1.iter().copied(); - let prog1_map = merge_nft_prog1(events_1v_edge, nf_events_iter, prog1_prop_id)?; - local_login_count(login_edge, login_prop_id, &prog1_map) -} - -#[inline] -fn local_login_count( - login_edge: &Edge, - prop_id: usize, - prog1_map: &[(Time, usize)], -) -> Option { - if login_edge.timestamp_slice().iter().next()? >= prog1_map.first()?.0 { - return None; - } - login_edge - .par_prop_items_unchecked::(prop_id) - .map(|iter| { - iter.filter(|(_, id)| *id == 4624) - .map(|(t, _)| { - let index = prog1_map.partition_point(|(ti, _)| ti > &t); - if index > 0 { - prog1_map[index - 1].1 - } else { - 0 - } - }) - .sum() - }) -} - -#[inline] -fn count_logins( - b_vid: VID, - prog1_map: &[(Time, usize)], - nft_events: &[(VID, Vec)], - events_2v_graph: &TempColGraphFragment, - events_1v_edge: &Edge, - edges_par_iter: impl IndexedParallelIterator, - login_prop_id: usize, - prog1_prop_id: usize, -) -> usize { - edges_par_iter - .filter(|(_, a_vid)| *a_vid != b_vid) - .filter_map(|(eid, a_vid)| { - let edge = events_2v_graph.edge(eid); - local_login_count(&edge, login_prop_id, prog1_map).map(move |c| (a_vid, edge, c)) - }) - .filter(|(_, _, count)| *count != 0) - .map(|(a_vid, edge, count)| { - let loop_count = loop_events( - a_vid, - events_1v_edge, - &edge, - nft_events, - prog1_prop_id, - login_prop_id, - ) - .unwrap_or(0); - count - loop_count - }) - .sum() -} - -#[inline] -pub fn query( - g: &TemporalGraph, - window: i64, -) -> Option + '_> { - // MATCH - // (E)<-[nf1:Netflow]-(B)<-[login1:Events2v]-(A), (B)<-[prog1:Events1v]-(B) - // WHERE A <> B AND B <> E AND A <> E - // AND login1.eventID = 4624 - // AND prog1.eventID = 4688 - // AND nf1.dstBytes > 100000000 - // // time constraints within each path - // AND login1.epochtime < prog1.epochtime - // AND prog1.epochtime < nf1.epochtime - // AND nf1.epochtime - login1.epochtime <= 30 - // RETURN count(*) - - // Launched job 7 - // Number of answers: 2,992,551 - // CPU times: user 25.9 ms, sys: 11.7 ms, total: 37.6 ms - // Wall time: 21.8 s - - // Devices (vertices): 159,245 - // Netflow (edges): 317,164,045 - // Host event 1-vertex (edges): 33,480,483 - // Host event 2-vertex (edges): 97,716,529 - // Total (edges): 448,361,057 - let nft = g.find_layer_id("netflow")?; - let nft_graph = g.layer(nft); - let events_1v = g.find_layer_id("events_1v")?; - let events_1v_graph = g.layer(events_1v); - let events_2v = g.find_layer_id("events_2v")?; - let events_2v_graph = g.layer(events_2v); - - let bytes_prop_id = g.edge_property_id("dst_bytes", nft)?; - let event_id_prop_id_2v = g.edge_property_id("event_id", events_2v)?; - let prog1_prop_id = g.edge_property_id("event_id", events_1v)?; - - let log_nodes = find_active_nodes(events_1v_graph); - - let valid_netflow_events_ms = Arc::new(AtomicU64::default()); - let valid_netflow_events_ms_ref = valid_netflow_events_ms.clone(); - let prog1_merge_ms = Arc::new(AtomicU64::default()); - let prog1_merge_ms_ref = prog1_merge_ms.clone(); - let count_login_ms = Arc::new(AtomicU64::default()); - let count_login_ms_ref = count_login_ms.clone(); - - let count = log_nodes.into_par_iter().flat_map(move |b_vid| { - let login_edges = events_2v_graph - .in_edges_par(b_vid) - .map(|(eid, a_vid)| (eid.into(), a_vid.into())); - let (self_loop, _) = events_1v_graph - .edges_iter(b_vid, Direction::OUT) - .filter(|(_, n_vid)| *n_vid == b_vid) - .next()?; - - let now = Instant::now(); - let nft_events = valid_netflow_events(nft_graph, b_vid, bytes_prop_id, window)?; - valid_netflow_events_ms_ref.fetch_add(now.elapsed().as_millis() as u64, Relaxed); - - let now = Instant::now(); - let merged_nft_events = kmerge_by( - nft_events.iter().map(|(_, windows)| windows), - |a: &&Window, b: &&Window| a >= b, - ) - .copied(); - let events_1v_edge = events_1v_graph.edge(self_loop); - - let prog1_eventmap = merge_nft_prog1(&events_1v_edge, merged_nft_events, prog1_prop_id)?; - prog1_merge_ms_ref.fetch_add(now.elapsed().as_millis() as u64, Relaxed); - - let now = Instant::now(); - let count = count_logins( - b_vid, - &prog1_eventmap, - &nft_events, - events_2v_graph, - &events_1v_edge, - login_edges, - event_id_prop_id_2v, - prog1_prop_id, - ); - count_login_ms_ref.fetch_add(now.elapsed().as_millis() as u64, Relaxed); - Some((b_vid, count)) - }); - println!( - "finding valid netflow took {}ms", - valid_netflow_events_ms.load(Relaxed) - ); - println!( - "merging netflow with prog1 took {}ms", - prog1_merge_ms.load(Relaxed) - ); - println!("counting logins took {}ms", count_login_ms.load(Relaxed)); - Some(count) -} - -pub fn query_total(g: &TemporalGraph, window: i64) -> usize { - query(g, window) - .into_par_iter() - .flatten() - .map(|(_, c)| c) - .sum() -} - -#[cfg(test)] -mod test { - use super::query_total; - use polars_arrow::{ - array::{PrimitiveArray, StructArray}, - datatypes::{ArrowDataType as DataType, Field}, - }; - use raphtory_arrow::{ - global_order::GlobalMap, graph::TemporalGraph, graph_fragment::TempColGraphFragment, - }; - use std::sync::Arc; - use tempfile::TempDir; - - #[test] - fn test_one_path() { - let test_dir = TempDir::new().unwrap(); - let go = Arc::new(GlobalMap::from(vec![1u64, 2u64, 3u64])); - let vertices = PrimitiveArray::from_vec(vec![1u64, 2u64, 3u64]).boxed(); - - let srcs = PrimitiveArray::from_vec(vec![1u64, 1, 1, 1]).boxed(); - let dsts = PrimitiveArray::from_vec(vec![2u64, 2, 2, 2]).boxed(); - let time = PrimitiveArray::from_vec(vec![0i64, 1, 10, 11]).boxed(); - let event_id = PrimitiveArray::from_vec(vec![4624i64, 4624, 4624, 23]).boxed(); - let chunk = StructArray::new( - DataType::Struct(vec![ - Field::new("src", DataType::UInt64, false), - Field::new("dst", DataType::UInt64, false), - Field::new("time", DataType::Int64, false), - Field::new("event_id", DataType::Int64, false), - ]), - vec![srcs, dsts, time, event_id], - None, - ); - - let graph_events2v = TempColGraphFragment::load_from_edge_list( - &test_dir.path().join("events2v"), - 0, - 100, - 100, - go.clone(), - vertices.clone(), - 0, - 1, - 2, - vec![chunk], - ) - .unwrap(); - - let srcs = PrimitiveArray::from_vec(vec![2u64, 2, 2]).boxed(); - let dsts = PrimitiveArray::from_vec(vec![2u64, 2, 2]).boxed(); - let time = PrimitiveArray::from_vec(vec![1i64, 11, 12]).boxed(); - let event_id = PrimitiveArray::from_vec(vec![4688i64, 4688, 4687]).boxed(); - let chunk = StructArray::new( - DataType::Struct(vec![ - Field::new("src", DataType::UInt64, false), - Field::new("dst", DataType::UInt64, false), - Field::new("time", DataType::Int64, false), - Field::new("event_id", DataType::Int64, false), - ]), - vec![srcs, dsts, time, event_id], - None, - ); - - let graph_events1v = TempColGraphFragment::load_from_edge_list( - &test_dir.path().join("events1v"), - 0, - 100, - 100, - go.clone(), - vertices.clone(), - 0, - 1, - 2, - vec![chunk], - ) - .unwrap(); - - let srcs = PrimitiveArray::from_vec(vec![2u64, 2u64, 2, 2]).boxed(); - let dsts = PrimitiveArray::from_vec(vec![3u64, 3u64, 3, 3]).boxed(); - let time = PrimitiveArray::from_vec(vec![2i64, 3, 4, 31]).boxed(); - let event_id = - PrimitiveArray::from_vec(vec![100_000_005i64, 100_000_005i64, 10, 100_000_005i64]) - .boxed(); - let chunk = StructArray::new( - DataType::Struct(vec![ - Field::new("src", DataType::UInt64, false), - Field::new("dst", DataType::UInt64, false), - Field::new("time", DataType::Int64, false), - Field::new("dst_bytes", DataType::Int64, false), - ]), - vec![srcs, dsts, time, event_id], - None, - ); - - let graph_netflow = TempColGraphFragment::load_from_edge_list( - &test_dir.path().join("netflow"), - 0, - 100, - 100, - go.clone(), - vertices.clone(), - 0, - 1, - 2, - vec![chunk], - ) - .unwrap(); - - let graph = TemporalGraph::new_from_layers( - vertices, - go, - vec![graph_events2v, graph_events1v, graph_netflow], - vec![ - "events_2v".to_owned(), - "events_1v".to_owned(), - "netflow".to_owned(), - ], - ); - let actual = query_total(&graph, 30); - assert_eq!(actual, 4); - } -} diff --git a/raphtory-storage/src/lanl/exfiltration/list.rs b/raphtory-storage/src/lanl/exfiltration/list.rs deleted file mode 100644 index 2d3bc8f508..0000000000 --- a/raphtory-storage/src/lanl/exfiltration/list.rs +++ /dev/null @@ -1,621 +0,0 @@ -use crate::lanl::exfiltration::find_active_nodes; -use itertools::{kmerge_by, Itertools}; -use raphtory::core::{entities::VID, Direction}; -use raphtory_arrow::{ - edge::ExplodedEdge, global_order::GlobalOrder, graph::TemporalGraph, - graph_fragment::TempColGraphFragment, Time, -}; -use rayon::{iter::ParallelIterator, prelude::*}; -use std::{cmp::Ordering, collections::VecDeque, fmt::Debug}; - -#[inline] -fn valid_netflow_events( - nft_graph: &TempColGraphFragment, - b_vid: VID, - bytes_prop_id: usize, -) -> impl Iterator { - kmerge_by( - nft_graph - .edges_iter(b_vid, Direction::OUT) - .into_iter() - .filter(|(_, e_vid)| *e_vid != b_vid) - .map(move |(edge_id, _)| { - nft_graph - .edge(edge_id) - .explode() - .filter(move |e| e.prop::(bytes_prop_id) > Some(100_000_000)) - .rev() - }), - |e1: &ExplodedEdge, e2: &ExplodedEdge| e1.timestamp() >= e2.timestamp(), - ) - .map(Event::Netflow) -} - -#[inline] -fn login_edges( - events_2v_graph: &TempColGraphFragment, - b_vid: VID, - login_event_prop_id: usize, -) -> impl Iterator { - kmerge_by( - events_2v_graph - .edges_iter(b_vid, Direction::IN) - .into_iter() - .filter(|(_, a_vid)| *a_vid != b_vid) - .map(move |(edge_id, _)| { - events_2v_graph - .edge(edge_id) - .explode() - .filter(move |e| e.prop::(login_event_prop_id) == Some(4624)) - .rev() - }), - |e1: &ExplodedEdge, e2: &ExplodedEdge| e1.timestamp() >= e2.timestamp(), - ) - .map(Event::Login) -} - -#[inline] -fn prog1_edges( - event_1v_graph: &TempColGraphFragment, - b_vid: VID, - prog1_event_prop_id: usize, -) -> impl Iterator { - event_1v_graph - .edges_iter(b_vid, Direction::OUT) - .into_iter() - .filter(move |(_, vid)| *vid == b_vid) - .flat_map(move |(eid, _)| { - event_1v_graph - .edge(eid) - .explode() - .filter(move |e| e.prop::(prog1_event_prop_id) == Some(4688)) - }) - .rev() - .map(Event::Prog1) -} - -#[derive(Debug, Clone)] -enum Event<'a> { - Login(ExplodedEdge<'a>), - Prog1(ExplodedEdge<'a>), - Netflow(ExplodedEdge<'a>), -} - -impl<'a> PartialEq for Event<'a> { - #[inline] - fn eq(&self, other: &Self) -> bool { - match (self, other) { - (Event::Login(e1), Event::Login(e2)) => e1.timestamp() == e2.timestamp(), - (Event::Prog1(e1), Event::Prog1(e2)) => e1.timestamp() == e2.timestamp(), - (Event::Netflow(e1), Event::Netflow(e2)) => e1.timestamp() == e2.timestamp(), - _ => false, - } - } -} - -impl<'a> Eq for Event<'a> {} - -impl<'a> PartialOrd for Event<'a> { - #[inline] - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -// the order of the different events at the same time point is crucial, need Login, then Prog1 -// and then Netflow to get the window bounds correct -impl<'a> Ord for Event<'a> { - #[inline] - fn cmp(&self, other: &Self) -> Ordering { - match other.t().cmp(&self.t()) { - Ordering::Less => Ordering::Less, - Ordering::Equal => match self { - Event::Login(_) => match other { - Event::Login(_) => Ordering::Equal, - Event::Prog1(_) => Ordering::Less, - Event::Netflow(_) => Ordering::Less, - }, - Event::Prog1(_) => match other { - Event::Login(_) => Ordering::Greater, - Event::Prog1(_) => Ordering::Equal, - Event::Netflow(_) => Ordering::Less, - }, - Event::Netflow(_) => match other { - Event::Login(_) => Ordering::Greater, - Event::Prog1(_) => Ordering::Greater, - Event::Netflow(_) => Ordering::Equal, - }, - }, - Ordering::Greater => Ordering::Greater, - } - } -} - -impl<'a> Event<'a> { - #[inline] - fn inner(&self) -> &ExplodedEdge<'a> { - match self { - Event::Login(e) => e, - Event::Prog1(e) => e, - Event::Netflow(e) => e, - } - } - - #[inline] - fn t(&self) -> Time { - self.inner().timestamp() - } -} - -struct MergeIter<'a, I: Iterator>> { - events: I, - active_netflow: VecDeque<(Time, usize, ExplodedEdge<'a>)>, - active_prog1: VecDeque>, - inner_state: Option>, - event_count: usize, - window: i64, -} - -impl<'a, I: Iterator>> MergeIter<'a, I> { - #[inline] - fn new(events: I, window: i64) -> Self { - Self { - events, - active_netflow: Default::default(), - active_prog1: Default::default(), - inner_state: None, - event_count: 0, - window, - } - } - #[inline] - fn oldest_window_t(&self) -> Option