diff --git a/examples/rust/src/bin/bench/main.rs b/examples/rust/src/bin/bench/main.rs index bf90573a71..783c6fb065 100644 --- a/examples/rust/src/bin/bench/main.rs +++ b/examples/rust/src/bin/bench/main.rs @@ -1,6 +1,5 @@ use raphtory::{ - algorithms::centrality::pagerank::unweighted_page_rank, - graph_loader::source::csv_loader::CsvLoader, prelude::*, + algorithms::centrality::pagerank::unweighted_page_rank, io::csv_loader::CsvLoader, prelude::*, }; use serde::Deserialize; use std::{ diff --git a/examples/rust/src/bin/btc/main.rs b/examples/rust/src/bin/btc/main.rs index 8415c81bda..994e336ff9 100644 --- a/examples/rust/src/bin/btc/main.rs +++ b/examples/rust/src/bin/btc/main.rs @@ -2,7 +2,7 @@ #![allow(dead_code)] use chrono::{DateTime, Utc}; -use raphtory::{graph_loader::source::csv_loader::CsvLoader, prelude::*}; +use raphtory::{io::csv_loader::CsvLoader, prelude::*}; use regex::Regex; use serde::Deserialize; use std::{ diff --git a/examples/rust/src/bin/crypto/main.rs b/examples/rust/src/bin/crypto/main.rs index a0d9725908..77d0a47091 100644 --- a/examples/rust/src/bin/crypto/main.rs +++ b/examples/rust/src/bin/crypto/main.rs @@ -5,7 +5,7 @@ use raphtory::{ pathing::temporal_reachability::temporally_reachable_nodes, }, db::api::view::*, - graph_loader::example::stable_coins::stable_coin_graph, + graph_loader::stable_coins::stable_coin_graph, }; use std::{env, time::Instant}; diff --git a/examples/rust/src/bin/hulongbay/main.rs b/examples/rust/src/bin/hulongbay/main.rs index 9ec0207363..1b44ecffb1 100644 --- a/examples/rust/src/bin/hulongbay/main.rs +++ b/examples/rust/src/bin/hulongbay/main.rs @@ -9,7 +9,7 @@ use raphtory::{ triangle_count::triangle_count, }, }, - graph_loader::source::csv_loader::CsvLoader, + io::csv_loader::CsvLoader, prelude::*, }; use regex::Regex; diff --git a/examples/rust/src/bin/lotr/main.rs b/examples/rust/src/bin/lotr/main.rs index aa4611cb12..d96fe32443 100644 --- a/examples/rust/src/bin/lotr/main.rs +++ b/examples/rust/src/bin/lotr/main.rs @@ -1,6 +1,6 @@ use raphtory::{ algorithms::pathing::temporal_reachability::temporally_reachable_nodes, - graph_loader::source::csv_loader::CsvLoader, prelude::*, + io::csv_loader::CsvLoader, prelude::*, }; use serde::Deserialize; use std::{ diff --git a/examples/rust/src/bin/pokec/main.rs b/examples/rust/src/bin/pokec/main.rs index 63158b0d27..2be1a7737d 100644 --- a/examples/rust/src/bin/pokec/main.rs +++ b/examples/rust/src/bin/pokec/main.rs @@ -3,7 +3,7 @@ use raphtory::{ centrality::pagerank::unweighted_page_rank, components::weakly_connected_components, }, db::{api::mutation::AdditionOps, graph::graph::Graph}, - graph_loader::source::csv_loader::CsvLoader, + io::csv_loader::CsvLoader, prelude::*, }; use serde::Deserialize; diff --git a/python/tests/test_graphdb.py b/python/tests/test_graphdb.py index fe732bd179..402fdd5099 100644 --- a/python/tests/test_graphdb.py +++ b/python/tests/test_graphdb.py @@ -2,6 +2,8 @@ import math import sys import random +import re + import pandas as pd import pandas.core.frame import pytest @@ -1435,6 +1437,18 @@ def test_layer(): assert g.exclude_layers(["layer1", "layer2"]).count_edges() == 1 assert g.exclude_layer("layer2").count_edges() == 4 + with pytest.raises( + Exception, + match=re.escape("Invalid layer: test_layer. Valid layers: _default, layer1, layer2"), + ): + g.layers(["test_layer"]) + + with pytest.raises( + Exception, + match=re.escape("Invalid layer: test_layer. Valid layers: _default, layer1, layer2"), + ): + g.edge(1, 2).layers(["test_layer"]) + def test_layer_node(): g = Graph() diff --git a/python/tests/test_load_from_pandas.py b/python/tests/test_load_from_pandas.py index 0b2fb64c56..e966d1a21b 100644 --- a/python/tests/test_load_from_pandas.py +++ b/python/tests/test_load_from_pandas.py @@ -624,7 +624,7 @@ def assertions_layers_in_df(g): assert g.layers(["layer 3"]).edges.src.id.collect() == [3] with pytest.raises( Exception, - match=re.escape("Invalid layer test_layer."), + match=re.escape("Invalid layer: test_layer. Valid layers: _default, layer 1, layer 2, layer 3, layer 4, layer 5"), ): g.layers(["test_layer"]) diff --git a/python/tests/test_load_from_parquet.py b/python/tests/test_load_from_parquet.py new file mode 100644 index 0000000000..6785df097a --- /dev/null +++ b/python/tests/test_load_from_parquet.py @@ -0,0 +1,448 @@ +import os +import re +import tempfile + +import pyarrow as pa +import pyarrow.parquet as pq +import pytest + +from raphtory import Graph, PersistentGraph + + +@pytest.fixture(scope="session") +def parquet_files(): + dirname = tempfile.TemporaryDirectory() + nodes_parquet_file_path = os.path.join(dirname.name, "parquet", "nodes.parquet") + edges_parquet_file_path = os.path.join(dirname.name, "parquet", "edges.parquet") + edge_deletions_parquet_file_path = os.path.join(dirname.name, "parquet", "edges_deletions.parquet") + + os.makedirs(os.path.dirname(nodes_parquet_file_path), exist_ok=True) + + data = { + "id": [1, 2, 3, 4, 5, 6], + "name": ["Alice", "Bob", "Carol", "Dave", "Eve", "Frank"], + "time": [1, 2, 3, 4, 5, 6], + "type": ["Person 1", "Person 2", "Person 3", "Person 4", "Person 5", "Person 6"], + "node_type": ["p", "p", "p", "p", "p", "p"], + } + + table = pa.table(data) + pq.write_table(table, nodes_parquet_file_path) + print("""Created nodes.parquet at loc = {}""".format(nodes_parquet_file_path)) + + data = { + "src": [1, 2, 3, 4, 5], + "dst": [2, 3, 4, 5, 6], + "time": [1, 2, 3, 4, 5], + "weight": [1.0, 2.0, 3.0, 4.0, 5.0], + "marbles": ["red", "blue", "green", "yellow", "purple"], + "marbles_const": ["red", "blue", "green", "yellow", "purple"], + "layers": ["layer 1", "layer 2", "layer 3", "layer 4", "layer 5"], + } + + table = pa.table(data) + pq.write_table(table, edges_parquet_file_path) + print("""Created edges.parquet at loc = {}""".format(edges_parquet_file_path)) + + data = { + "src": [3, 4], + "dst": [4, 5], + "time": [6, 7], + } + + table = pa.table(data) + pq.write_table(table, edge_deletions_parquet_file_path) + print("""Created edges_deletions.parquet at loc = {}""".format(edge_deletions_parquet_file_path)) + + yield nodes_parquet_file_path, edges_parquet_file_path, edge_deletions_parquet_file_path + + # Cleanup the temporary directory after tests + dirname.cleanup() + + +def assert_expected_nodes(g): + expected_node_ids = [1, 2, 3, 4, 5, 6] + expected_nodes = [ + (1, "Alice"), + (2, "Bob"), + (3, "Carol"), + (4, "Dave"), + (5, "Eve"), + (6, "Frank"), + ] + nodes = [] + for v in g.nodes: + name = v["name"] + nodes.append((v.id, name)) + assert g.nodes.id.collect() == expected_node_ids + assert nodes == expected_nodes + + +def assert_expected_edges(g): + expected_edges = [ + (1, 2, 1.0, "red"), + (2, 3, 2.0, "blue"), + (3, 4, 3.0, "green"), + (4, 5, 4.0, "yellow"), + (5, 6, 5.0, "purple"), + ] + edges = [] + for e in g.edges: + weight = e["weight"] + marbles = e["marbles"] + edges.append((e.src.id, e.dst.id, weight, marbles)) + assert edges == expected_edges + + +def assert_expected_node_types(g): + assert g.nodes.node_type == [ + "p", + "p", + "p", + "p", + "p", + "p", + ] + + +def assert_expected_node_property_tag(g): + assert g.nodes.properties.constant.get("tag").collect() == [ + "test_tag", + "test_tag", + "test_tag", + "test_tag", + "test_tag", + "test_tag", + ] + + +def assert_expected_node_property_type(g): + assert g.nodes.properties.constant.get("type").collect() == [ + "Person 1", + "Person 2", + "Person 3", + "Person 4", + "Person 5", + "Person 6", + ] + + +def assert_expected_node_property_dept(g): + assert g.nodes.properties.constant.get("dept").collect() == [ + "Sales", + "Sales", + "Sales", + "Sales", + "Sales", + "Sales", + ] + + +def assert_expected_edge_properties(g): + assert g.layers( + ["layer 1", "layer 2", "layer 3"] + ).edges.properties.constant.get("marbles_const").collect() == [ + {"layer 1": "red"}, + {"layer 2": "blue"}, + {"layer 3": "green"}, + ] + assert g.edges.properties.constant.get("tag").collect() == [ + {"layer 1": "test_tag"}, + {"layer 2": "test_tag"}, + {"layer 3": "test_tag"}, + {"layer 4": "test_tag"}, + {"layer 5": "test_tag"}, + ] + + +def assert_expected_edge_properties_test_layer(g): + assert g.edges.properties.constant.get("type").collect() == [ + {"test_layer": "Edge"}, + {"test_layer": "Edge"}, + {"test_layer": "Edge"}, + {"test_layer": "Edge"}, + {"test_layer": "Edge"}, + ] + assert g.edges.properties.constant.get("tag").collect() == [ + {"test_layer": "test_tag"}, + {"test_layer": "test_tag"}, + {"test_layer": "test_tag"}, + {"test_layer": "test_tag"}, + {"test_layer": "test_tag"}, + ] + assert g.edges.properties.constant.get("tag").collect() == [ + {"test_layer": "test_tag"}, + {"test_layer": "test_tag"}, + {"test_layer": "test_tag"}, + {"test_layer": "test_tag"}, + {"test_layer": "test_tag"}, + ] + + +def assert_expected_layers(g): + assert g.unique_layers == ["_default", "layer 1", "layer 2", "layer 3", "layer 4", "layer 5"] + assert g.layers(["layer 1"]).edges.src.id.collect() == [1] + assert g.layers(["layer 1", "layer 2"]).edges.src.id.collect() == [1, 2] + assert g.layers(["layer 1", "layer 2", "layer 3"]).edges.src.id.collect() == [1, 2, 3] + assert g.layers(["layer 1", "layer 4", "layer 5"]).edges.src.id.collect() == [1, 4, 5] + with pytest.raises( + Exception, + match=re.escape("Invalid layer: test_layer. Valid layers: _default, layer 1, layer 2, layer 3, layer 4, layer 5"), + ): + g.layers(["test_layer"]) + + +def assert_expected_test_layer(g): + assert g.unique_layers == ["_default", "test_layer"] + assert g.layers(["test_layer"]).edges.src.id.collect() == [1, 2, 3, 4, 5] + + +def test_load_from_parquet_graphs(parquet_files): + nodes_parquet_file_path, edges_parquet_file_path, edges_deletions_parquet_file_path = parquet_files + + g = Graph.load_from_parquet( + edge_parquet_path=edges_parquet_file_path, + edge_src="src", + edge_dst="dst", + edge_time="time", + edge_properties=["weight", "marbles"], + node_parquet_path=nodes_parquet_file_path, + node_id="id", + node_time="time", + node_properties=["name"], + node_type="node_type", + ) + assert_expected_nodes(g) + assert_expected_edges(g) + + g = Graph() + g.load_nodes_from_parquet( + parquet_path=nodes_parquet_file_path, + id="id", + time="time", + node_type="node_type", + properties=["name"] + ) + g.load_edges_from_parquet( + parquet_path=edges_parquet_file_path, + src="src", + dst="dst", + time="time", + properties=["weight", "marbles"], + layer="layers" + ) + assert_expected_nodes(g) + assert_expected_edges(g) + assert_expected_layers(g) + + g.load_node_props_from_parquet( + parquet_path=nodes_parquet_file_path, + id="id", + const_properties=["type"], + shared_const_properties={"tag": "test_tag"}, + ) + assert_expected_node_property_tag(g) + assert_expected_node_property_type(g) + + g.load_edge_props_from_parquet( + parquet_path=edges_parquet_file_path, + src="src", + dst="dst", + const_properties=["marbles_const"], + shared_const_properties={"tag": "test_tag"}, + layer="layers", + ) + assert_expected_edge_properties(g) + assert_expected_layers(g) + + g = Graph() + g.load_nodes_from_parquet( + parquet_path=nodes_parquet_file_path, + id="id", + time="time", + node_type="node_type", + properties=["name"], + shared_const_properties={"tag": "test_tag"}, + ) + assert_expected_node_types(g) + assert_expected_node_property_tag(g) + + g = Graph() + g.load_edges_from_parquet( + parquet_path=edges_parquet_file_path, + src="src", + dst="dst", + time="time", + properties=["weight", "marbles"], + const_properties=["marbles_const"], + shared_const_properties={"type": "Edge", "tag": "test_tag"}, + layer="test_layer", + layer_in_df=False, + ) + assert_expected_edge_properties_test_layer(g) + assert_expected_test_layer(g) + + g = Graph.load_from_parquet( + edge_parquet_path=edges_parquet_file_path, + edge_src="src", + edge_dst="dst", + edge_time="time", + edge_layer="test_layer", + layer_in_df=False, + node_parquet_path=nodes_parquet_file_path, + node_id="id", + node_time="time", + node_properties=["name"], + node_shared_const_properties={"dept": "Sales"}, + ) + assert_expected_test_layer(g) + assert_expected_node_property_dept(g) + + g = Graph.load_from_parquet( + edge_parquet_path=edges_parquet_file_path, + edge_src="src", + edge_dst="dst", + edge_time="time", + edge_layer="layers", + node_parquet_path=nodes_parquet_file_path, + node_id="id", + node_time="time", + node_properties=["name"], + node_const_properties=["type"], + ) + assert_expected_node_property_type(g) + assert_expected_layers(g) + + +def test_load_from_parquet_persistent_graphs(parquet_files): + nodes_parquet_file_path, edges_parquet_file_path, edges_deletions_parquet_file_path = parquet_files + + g = PersistentGraph.load_from_parquet( + edge_parquet_path=edges_parquet_file_path, + edge_src="src", + edge_dst="dst", + edge_time="time", + edge_properties=["weight", "marbles"], + node_parquet_path=nodes_parquet_file_path, + node_id="id", + node_time="time", + node_properties=["name"], + node_type="node_type", + ) + assert_expected_nodes(g) + assert_expected_edges(g) + + g = PersistentGraph() + g.load_nodes_from_parquet( + parquet_path=nodes_parquet_file_path, + id="id", + time="time", + node_type="node_type", + properties=["name"] + ) + g.load_edges_from_parquet( + parquet_path=edges_parquet_file_path, + src="src", + dst="dst", + time="time", + properties=["weight", "marbles"], + layer="layers" + ) + assert_expected_nodes(g) + assert_expected_edges(g) + assert_expected_layers(g) + + g.load_node_props_from_parquet( + parquet_path=nodes_parquet_file_path, + id="id", + const_properties=["type"], + shared_const_properties={"tag": "test_tag"}, + ) + assert_expected_node_property_tag(g) + assert_expected_node_property_type(g) + + g.load_edge_props_from_parquet( + parquet_path=edges_parquet_file_path, + src="src", + dst="dst", + const_properties=["marbles_const"], + shared_const_properties={"tag": "test_tag"}, + layer="layers", + ) + assert_expected_edge_properties(g) + assert_expected_layers(g) + + g = PersistentGraph() + g.load_nodes_from_parquet( + parquet_path=nodes_parquet_file_path, + id="id", + time="time", + node_type="node_type", + properties=["name"], + shared_const_properties={"tag": "test_tag"}, + ) + assert_expected_node_types(g) + assert_expected_node_property_tag(g) + + g = PersistentGraph() + g.load_edges_from_parquet( + parquet_path=edges_parquet_file_path, + src="src", + dst="dst", + time="time", + properties=["weight", "marbles"], + const_properties=["marbles_const"], + shared_const_properties={"type": "Edge", "tag": "test_tag"}, + layer="test_layer", + layer_in_df=False, + ) + assert_expected_edge_properties_test_layer(g) + assert_expected_test_layer(g) + + g = Graph.load_from_parquet( + edge_parquet_path=edges_parquet_file_path, + edge_src="src", + edge_dst="dst", + edge_time="time", + edge_layer="test_layer", + layer_in_df=False, + node_parquet_path=nodes_parquet_file_path, + node_id="id", + node_time="time", + node_properties=["name"], + node_shared_const_properties={"dept": "Sales"}, + ) + assert_expected_test_layer(g) + assert_expected_node_property_dept(g) + + g = PersistentGraph.load_from_parquet( + edge_parquet_path=edges_parquet_file_path, + edge_src="src", + edge_dst="dst", + edge_time="time", + edge_layer="layers", + node_parquet_path=nodes_parquet_file_path, + node_id="id", + node_time="time", + node_properties=["name"], + node_const_properties=["type"], + ) + assert_expected_node_property_type(g) + assert_expected_layers(g) + + g = PersistentGraph() + g.load_edges_from_parquet( + parquet_path=edges_parquet_file_path, + src="src", + dst="dst", + time="time", + ) + assert g.window(10, 12).edges.src.id.collect() == [1, 2, 3, 4, 5] + g.load_edges_deletions_from_parquet( + parquet_path=edges_deletions_parquet_file_path, + src="src", + dst="dst", + time="time" + ) + assert g.window(10, 12).edges.src.id.collect() == [1, 2, 5] + diff --git a/raphtory-benchmark/benches/algobench.rs b/raphtory-benchmark/benches/algobench.rs index b879875321..0d469c5053 100644 --- a/raphtory-benchmark/benches/algobench.rs +++ b/raphtory-benchmark/benches/algobench.rs @@ -23,7 +23,7 @@ pub fn local_triangle_count_analysis(c: &mut Criterion) { let mut group = c.benchmark_group("local_triangle_count"); group.sample_size(10); bench(&mut group, "local_triangle_count", None, |b| { - let g = raphtory::graph_loader::example::lotr_graph::lotr_graph(); + let g = raphtory::graph_loader::lotr_graph::lotr_graph(); let windowed_graph = g.window(i64::MIN, i64::MAX); b.iter(|| { @@ -42,7 +42,7 @@ pub fn local_clustering_coefficient_analysis(c: &mut Criterion) { let mut group = c.benchmark_group("local_clustering_coefficient"); bench(&mut group, "local_clustering_coefficient", None, |b| { - let g: Graph = raphtory::graph_loader::example::lotr_graph::lotr_graph(); + let g: Graph = raphtory::graph_loader::lotr_graph::lotr_graph(); b.iter(|| local_clustering_coefficient(&g, "Gandalf")) }); @@ -123,7 +123,7 @@ pub fn temporal_motifs(c: &mut Criterion) { let mut group = c.benchmark_group("temporal_motifs"); bench(&mut group, "temporal_motifs", None, |b| { - let g: Graph = raphtory::graph_loader::example::lotr_graph::lotr_graph(); + let g: Graph = raphtory::graph_loader::lotr_graph::lotr_graph(); b.iter(|| global_temporal_three_node_motif(&g, 100, None)) }); diff --git a/raphtory-benchmark/benches/arrow_algobench.rs b/raphtory-benchmark/benches/arrow_algobench.rs index 3f9b7439c9..a1b7d913ad 100644 --- a/raphtory-benchmark/benches/arrow_algobench.rs +++ b/raphtory-benchmark/benches/arrow_algobench.rs @@ -37,7 +37,7 @@ pub fn local_triangle_count_analysis(c: &mut Criterion) { let mut group = c.benchmark_group("local_triangle_count"); group.sample_size(10); bench(&mut group, "local_triangle_count", None, |b| { - let g = raphtory::graph_loader::example::lotr_graph::lotr_graph(); + let g = raphtory::graph_loader::lotr_graph::lotr_graph(); let test_dir = TempDir::new().unwrap(); let g = g.persist_as_disk_graph(test_dir.path()).unwrap(); let windowed_graph = g.window(i64::MIN, i64::MAX); diff --git a/raphtory-benchmark/benches/base.rs b/raphtory-benchmark/benches/base.rs index 5968e9f44b..94badb81e7 100644 --- a/raphtory-benchmark/benches/base.rs +++ b/raphtory-benchmark/benches/base.rs @@ -1,7 +1,7 @@ use crate::common::{bootstrap_graph, run_large_ingestion_benchmarks}; use common::run_graph_ops_benches; use criterion::{criterion_group, criterion_main, Criterion, Throughput}; -use raphtory::{graph_loader::example::lotr_graph::lotr_graph, prelude::*}; +use raphtory::{graph_loader::lotr_graph::lotr_graph, prelude::*}; mod common; diff --git a/raphtory-benchmark/src/main.rs b/raphtory-benchmark/src/main.rs index ca535c5748..df489da776 100644 --- a/raphtory-benchmark/src/main.rs +++ b/raphtory-benchmark/src/main.rs @@ -5,7 +5,8 @@ use raphtory::{ algorithms::{ centrality::pagerank::unweighted_page_rank, components::weakly_connected_components, }, - graph_loader::{fetch_file, source::csv_loader::CsvLoader}, + graph_loader::fetch_file, + io::csv_loader::CsvLoader, prelude::{AdditionOps, Graph, GraphViewOps, NodeViewOps, NO_PROPS}, }; use std::{ @@ -16,10 +17,10 @@ use std::{ }; #[derive(Parser, Debug)] -#[command(author, version, about, long_about = None )] +#[command(author, version, about, long_about = None)] struct Args { /// Set if the file has a header, default is False - #[arg(long, action=ArgAction::SetTrue)] + #[arg(long, action = ArgAction::SetTrue)] header: bool, /// Delimiter of the csv file @@ -43,11 +44,11 @@ struct Args { time_column: i32, /// Download default files - #[arg(long, action=ArgAction::SetTrue)] + #[arg(long, action = ArgAction::SetTrue)] download: bool, /// Debug to print more info to the screen - #[arg(long, action=ArgAction::SetTrue)] + #[arg(long, action = ArgAction::SetTrue)] debug: bool, /// Set the number of locks for the node and edge storage diff --git a/raphtory/Cargo.toml b/raphtory/Cargo.toml index 3034f97cd5..71419c6c06 100644 --- a/raphtory/Cargo.toml +++ b/raphtory/Cargo.toml @@ -104,10 +104,10 @@ io = [ # Enables generating the pyo3 python bindings python = [ "io", + "arrow", "dep:pyo3", "dep:num", "dep:display-error-chain", - "dep:polars-arrow", "polars-arrow?/compute", "raphtory-api/python", "dep:kdam", @@ -119,10 +119,9 @@ search = ["dep:tantivy"] vectors = ["dep:futures-util", "dep:async-trait", "dep:async-openai"] # storage storage = [ + "arrow", "pometry-storage", - "dep:polars-arrow", "dep:polars-utils", - "dep:polars-parquet", "dep:memmap2", "dep:ahash", "dep:tempfile", @@ -131,5 +130,9 @@ storage = [ "dep:thread_local", "polars-arrow?/io_ipc", "polars-arrow?/arrow_rs", - "polars-parquet?/compression", +] +arrow = [ + "dep:polars-arrow", + "dep:polars-parquet", + "polars-parquet?/compression" ] diff --git a/raphtory/resources/test/test_data.parquet b/raphtory/resources/test/test_data.parquet new file mode 100644 index 0000000000..59f63d062b Binary files /dev/null and b/raphtory/resources/test/test_data.parquet differ diff --git a/raphtory/src/algorithms/community_detection/louvain.rs b/raphtory/src/algorithms/community_detection/louvain.rs index 7383f114d9..a2c7c39724 100644 --- a/raphtory/src/algorithms/community_detection/louvain.rs +++ b/raphtory/src/algorithms/community_detection/louvain.rs @@ -133,7 +133,7 @@ mod test { #[cfg(feature = "io")] #[test] fn lfr_test() { - use crate::graph_loader::source::csv_loader::CsvLoader; + use crate::io::csv_loader::CsvLoader; use serde::{Deserialize, Serialize}; use std::path::PathBuf; diff --git a/raphtory/src/core/entities/graph/tgraph.rs b/raphtory/src/core/entities/graph/tgraph.rs index a80db59ab3..3f1a7625ab 100644 --- a/raphtory/src/core/entities/graph/tgraph.rs +++ b/raphtory/src/core/entities/graph/tgraph.rs @@ -118,6 +118,15 @@ impl Default for InternalGraph { } impl TemporalGraph { + fn get_valid_layers(edge_meta: &Arc) -> Vec { + edge_meta + .layer_meta() + .get_keys() + .iter() + .map(|x| x.to_string()) + .collect::>() + } + pub(crate) fn num_layers(&self) -> usize { self.edge_meta.layer_meta().len() } @@ -148,15 +157,21 @@ impl TemporalGraph { Layer::Default => Ok(LayerIds::One(0)), Layer::One(id) => match self.edge_meta.get_layer_id(&id) { Some(id) => Ok(LayerIds::One(id)), - None => Err(GraphError::InvalidLayer(id.to_string())), + None => Err(GraphError::invalid_layer( + id.to_string(), + Self::get_valid_layers(&self.edge_meta), + )), }, Layer::Multiple(ids) => { let mut new_layers = ids .iter() .map(|id| { - self.edge_meta - .get_layer_id(id) - .ok_or_else(|| GraphError::InvalidLayer(id.to_string())) + self.edge_meta.get_layer_id(id).ok_or_else(|| { + GraphError::invalid_layer( + id.to_string(), + Self::get_valid_layers(&self.edge_meta), + ) + }) }) .collect::, GraphError>>()?; let num_layers = self.num_layers(); diff --git a/raphtory/src/core/utils/errors.rs b/raphtory/src/core/utils/errors.rs index 78c3ff7744..d2a6a2dd54 100644 --- a/raphtory/src/core/utils/errors.rs +++ b/raphtory/src/core/utils/errors.rs @@ -1,4 +1,6 @@ use crate::core::{utils::time::error::ParseTimeError, Prop, PropType}; +#[cfg(feature = "arrow")] +use polars_arrow::legacy::error; use raphtory_api::core::storage::arc_str::ArcStr; #[cfg(feature = "search")] use tantivy; @@ -7,6 +9,11 @@ use tantivy::query::QueryParserError; #[derive(thiserror::Error, Debug)] pub enum GraphError { + #[cfg(feature = "arrow")] + #[error("Arrow error: {0}")] + Arrow(#[from] error::PolarsError), + #[error("Invalid path = {0}")] + InvalidPath(String), #[error("Graph error occurred")] UnsupportedDataType, #[error("Graph already exists by name = {name}")] @@ -62,8 +69,11 @@ pub enum GraphError { // wasm #[error("Node is not String or Number")] NodeIdNotStringOrNumber, - #[error("Invalid layer {0}.")] - InvalidLayer(String), + #[error("Invalid layer: {invalid_layer}. Valid layers: {valid_layers}")] + InvalidLayer { + invalid_layer: String, + valid_layers: String, + }, #[error("Layer {layer} does not exist for edge ({src}, {dst})")] InvalidEdgeLayer { layer: String, @@ -121,6 +131,16 @@ pub enum GraphError { TimeAPIError, } +impl GraphError { + pub fn invalid_layer(invalid_layer: String, valid_layers: Vec) -> Self { + let valid_layers = valid_layers.join(", "); + GraphError::InvalidLayer { + invalid_layer, + valid_layers, + } + } +} + #[derive(thiserror::Error, Debug, PartialEq)] pub enum MutateGraphError { #[error("Create node '{node_id}' first before adding static properties to it")] diff --git a/raphtory/src/db/graph/edge.rs b/raphtory/src/db/graph/edge.rs index 901f02c7d1..a1777094d4 100644 --- a/raphtory/src/db/graph/edge.rs +++ b/raphtory/src/db/graph/edge.rs @@ -117,7 +117,9 @@ impl<'graph, G: GraphViewOps<'graph>, GH: GraphViewOps<'graph>> BaseEdgeViewOps< type BaseGraph = G; type Graph = GH; - type ValueType =T where T: 'graph; + type ValueType = T + where + T: 'graph; type PropType = Self; type Nodes = NodeView; type Exploded = Edges<'graph, G, GH>; @@ -162,6 +164,10 @@ impl<'graph, G: GraphViewOps<'graph>, GH: GraphViewOps<'graph>> BaseEdgeViewOps< } impl EdgeView { + fn get_valid_layers(graph: &G) -> Vec { + graph.unique_layers().map(|l| l.0.to_string()).collect() + } + fn resolve_layer(&self, layer: Option<&str>, create: bool) -> Result { match layer { Some(name) => match self.edge.layer() { @@ -169,14 +175,22 @@ impl .graph .get_layer_id(name) .filter(|id| id == l_id) - .ok_or_else(|| GraphError::InvalidLayer(name.to_owned())), + .ok_or_else(|| { + GraphError::invalid_layer( + name.to_owned(), + Self::get_valid_layers(&self.graph), + ) + }), None => { if create { Ok(self.graph.resolve_layer(layer)) } else { self.graph .get_layer_id(name) - .ok_or(GraphError::InvalidLayer(name.to_owned())) + .ok_or(GraphError::invalid_layer( + name.to_owned(), + Self::get_valid_layers(&self.graph), + )) } } }, diff --git a/raphtory/src/disk_graph/graph_impl/layer_ops.rs b/raphtory/src/disk_graph/graph_impl/layer_ops.rs index fe9e9a6cf5..a0a6bf1b4f 100644 --- a/raphtory/src/disk_graph/graph_impl/layer_ops.rs +++ b/raphtory/src/disk_graph/graph_impl/layer_ops.rs @@ -1,10 +1,20 @@ +use super::DiskGraph; use crate::{ core::{entities::LayerIds, utils::errors::GraphError}, db::api::view::internal::InternalLayerOps, prelude::Layer, }; +use itertools::Itertools; +use pometry_storage::graph::TemporalGraph; +use std::sync::Arc; -use super::DiskGraph; +fn get_valid_layers(graph: &Arc) -> Vec { + graph + .layer_names() + .into_iter() + .map(|x| x.clone()) + .collect_vec() +} impl InternalLayerOps for DiskGraph { fn layer_ids(&self) -> &LayerIds { @@ -20,10 +30,9 @@ impl InternalLayerOps for DiskGraph { Layer::All => Ok(LayerIds::All), Layer::Default => Ok(LayerIds::One(0)), Layer::One(name) => { - let id = self - .inner - .find_layer_id(&name) - .ok_or_else(|| GraphError::InvalidLayer(name.to_string()))?; + let id = self.inner.find_layer_id(&name).ok_or_else(|| { + GraphError::invalid_layer(name.to_string(), get_valid_layers(&self.inner)) + })?; Ok(LayerIds::One(id)) } Layer::None => Ok(LayerIds::None), @@ -31,9 +40,12 @@ impl InternalLayerOps for DiskGraph { let ids = names .iter() .map(|name| { - self.inner - .find_layer_id(name) - .ok_or_else(|| GraphError::InvalidLayer(name.to_string())) + self.inner.find_layer_id(name).ok_or_else(|| { + GraphError::invalid_layer( + name.to_string(), + get_valid_layers(&self.inner), + ) + }) }) .collect::, _>>()?; Ok(LayerIds::Multiple(ids.into())) diff --git a/raphtory/src/graph_loader/example/company_house.rs b/raphtory/src/graph_loader/company_house.rs similarity index 98% rename from raphtory/src/graph_loader/example/company_house.rs rename to raphtory/src/graph_loader/company_house.rs index 0a585c9908..6167a5d76c 100644 --- a/raphtory/src/graph_loader/example/company_house.rs +++ b/raphtory/src/graph_loader/company_house.rs @@ -1,4 +1,4 @@ -use crate::{graph_loader::source::csv_loader::CsvLoader, prelude::*}; +use crate::{io::csv_loader::CsvLoader, prelude::*}; use chrono::DateTime; use serde::Deserialize; use std::{fs, path::PathBuf, time::Instant}; diff --git a/raphtory/src/graph_loader/example/mod.rs b/raphtory/src/graph_loader/example/mod.rs deleted file mode 100644 index 6a3b542df1..0000000000 --- a/raphtory/src/graph_loader/example/mod.rs +++ /dev/null @@ -1,7 +0,0 @@ -pub mod company_house; -pub mod karate_club; -pub mod lotr_graph; -pub mod neo4j_examples; -pub mod reddit_hyperlinks; -pub mod stable_coins; -pub mod sx_superuser_graph; diff --git a/raphtory/src/graph_loader/example/karate_club.rs b/raphtory/src/graph_loader/karate_club.rs similarity index 100% rename from raphtory/src/graph_loader/example/karate_club.rs rename to raphtory/src/graph_loader/karate_club.rs diff --git a/raphtory/src/graph_loader/example/lotr_graph.rs b/raphtory/src/graph_loader/lotr_graph.rs similarity index 93% rename from raphtory/src/graph_loader/example/lotr_graph.rs rename to raphtory/src/graph_loader/lotr_graph.rs index b923d0a09a..8f5eafd98a 100644 --- a/raphtory/src/graph_loader/example/lotr_graph.rs +++ b/raphtory/src/graph_loader/lotr_graph.rs @@ -13,7 +13,7 @@ //! //! Example: //! ```rust -//! use raphtory::graph_loader::example::lotr_graph::lotr_graph; +//! use raphtory::graph_loader::lotr_graph::lotr_graph; //! use raphtory::prelude::*; //! //! let graph = lotr_graph(); @@ -21,10 +21,7 @@ //! println!("The graph has {:?} nodes", graph.count_nodes()); //! println!("The graph has {:?} edges", graph.count_edges()); //! ``` -use crate::{ - graph_loader::{fetch_file, source::csv_loader::CsvLoader}, - prelude::*, -}; +use crate::{graph_loader::fetch_file, io::csv_loader::CsvLoader, prelude::*}; use serde::Deserialize; use std::path::PathBuf; diff --git a/raphtory/src/graph_loader/mod.rs b/raphtory/src/graph_loader/mod.rs index 76ec92996a..409c37518c 100644 --- a/raphtory/src/graph_loader/mod.rs +++ b/raphtory/src/graph_loader/mod.rs @@ -10,7 +10,7 @@ //! ```rust //! use raphtory::algorithms::metrics::degree::average_degree; //! use raphtory::prelude::*; -//! use raphtory::graph_loader::example::lotr_graph::lotr_graph; +//! use raphtory::graph_loader::lotr_graph::lotr_graph; //! //! let graph = lotr_graph(); //! @@ -32,7 +32,7 @@ //! ```no_run //! use std::time::Instant; //! use serde::Deserialize; -//! use raphtory::graph_loader::source::csv_loader::CsvLoader; +//! use raphtory::io::csv_loader::CsvLoader; //! use raphtory::prelude::*; //! //! let data_dir = "/tmp/lotr.csv"; @@ -105,8 +105,13 @@ use std::{ }; use zip::read::ZipArchive; -pub mod example; -pub mod source; +pub mod company_house; +pub mod karate_club; +pub mod lotr_graph; +pub mod neo4j_examples; +pub mod reddit_hyperlinks; +pub mod stable_coins; +pub mod sx_superuser_graph; pub fn fetch_file( name: &str, @@ -177,13 +182,13 @@ mod graph_loader_test { #[test] fn test_lotr_load_graph() { - let g = crate::graph_loader::example::lotr_graph::lotr_graph(); + let g = crate::graph_loader::lotr_graph::lotr_graph(); assert_eq!(g.count_edges(), 701); } #[test] fn test_graph_at() { - let g = crate::graph_loader::example::lotr_graph::lotr_graph(); + let g = crate::graph_loader::lotr_graph::lotr_graph(); let g_at_empty = g.at(1); let g_astart = g.at(7059); @@ -196,7 +201,7 @@ mod graph_loader_test { #[test] fn test_karate_graph() { - let g = crate::graph_loader::example::karate_club::karate_club_graph(); + let g = crate::graph_loader::karate_club::karate_club_graph(); assert_eq!(g.count_nodes(), 34); assert_eq!(g.count_edges(), 155); } @@ -205,8 +210,8 @@ mod graph_loader_test { fn db_lotr() { let g = Graph::new(); - let data_dir = crate::graph_loader::example::lotr_graph::lotr_file() - .expect("Failed to get lotr.csv file"); + let data_dir = + crate::graph_loader::lotr_graph::lotr_file().expect("Failed to get lotr.csv file"); fn parse_record(rec: &StringRecord) -> Option<(String, String, i64)> { let src = rec.get(0).and_then(|s| s.parse::().ok())?; @@ -244,7 +249,7 @@ mod graph_loader_test { #[test] fn test_all_degrees_window() { - let g = crate::graph_loader::example::lotr_graph::lotr_graph(); + let g = crate::graph_loader::lotr_graph::lotr_graph(); assert_eq!(g.count_edges(), 701); assert_eq!(g.node("Gandalf").unwrap().degree(), 49); @@ -263,7 +268,7 @@ mod graph_loader_test { #[test] fn test_all_neighbours_window() { - let g = crate::graph_loader::example::lotr_graph::lotr_graph(); + let g = crate::graph_loader::lotr_graph::lotr_graph(); assert_eq!(g.count_edges(), 701); assert_eq!(g.node("Gandalf").unwrap().neighbours().iter().count(), 49); @@ -316,7 +321,7 @@ mod graph_loader_test { #[test] fn test_all_edges_window() { - let g = crate::graph_loader::example::lotr_graph::lotr_graph(); + let g = crate::graph_loader::lotr_graph::lotr_graph(); assert_eq!(g.count_edges(), 701); assert_eq!(g.node("Gandalf").unwrap().edges().iter().count(), 59); diff --git a/raphtory/src/graph_loader/example/neo4j_examples.rs b/raphtory/src/graph_loader/neo4j_examples.rs similarity index 96% rename from raphtory/src/graph_loader/example/neo4j_examples.rs rename to raphtory/src/graph_loader/neo4j_examples.rs index 1e661fbc81..112aaef70c 100644 --- a/raphtory/src/graph_loader/example/neo4j_examples.rs +++ b/raphtory/src/graph_loader/neo4j_examples.rs @@ -1,6 +1,6 @@ use crate::{ db::{api::mutation::AdditionOps, graph::graph as rap}, - graph_loader::source::neo4j_loader::Neo4JConnection, + io::neo4j_loader::Neo4JConnection, prelude::{IntoProp, NO_PROPS}, }; use neo4rs::*; diff --git a/raphtory/src/graph_loader/example/reddit_hyperlinks.rs b/raphtory/src/graph_loader/reddit_hyperlinks.rs similarity index 97% rename from raphtory/src/graph_loader/example/reddit_hyperlinks.rs rename to raphtory/src/graph_loader/reddit_hyperlinks.rs index 9ac2d78258..350f3600d0 100644 --- a/raphtory/src/graph_loader/example/reddit_hyperlinks.rs +++ b/raphtory/src/graph_loader/reddit_hyperlinks.rs @@ -29,7 +29,7 @@ //! //! Example: //! ```no_run -//! use raphtory::graph_loader::example::reddit_hyperlinks::reddit_graph; +//! use raphtory::graph_loader::reddit_hyperlinks::reddit_graph; //! use raphtory::prelude::*; //! //! let graph = reddit_graph(120, false); @@ -158,7 +158,7 @@ pub fn generate_reddit_graph(path: PathBuf) -> Graph { mod reddit_test { use crate::{ db::api::view::*, - graph_loader::example::reddit_hyperlinks::{reddit_file, reddit_graph}, + graph_loader::reddit_hyperlinks::{reddit_file, reddit_graph}, }; #[test] diff --git a/raphtory/src/graph_loader/source/mod.rs b/raphtory/src/graph_loader/source/mod.rs deleted file mode 100644 index ea9fc4dc61..0000000000 --- a/raphtory/src/graph_loader/source/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub mod csv_loader; -pub mod json_loader; -pub mod neo4j_loader; diff --git a/raphtory/src/graph_loader/example/stable_coins.rs b/raphtory/src/graph_loader/stable_coins.rs similarity index 98% rename from raphtory/src/graph_loader/example/stable_coins.rs rename to raphtory/src/graph_loader/stable_coins.rs index d221770392..ccb15899bc 100644 --- a/raphtory/src/graph_loader/example/stable_coins.rs +++ b/raphtory/src/graph_loader/stable_coins.rs @@ -1,5 +1,6 @@ use crate::{ - graph_loader::{fetch_file, source::csv_loader::CsvLoader, unzip_file}, + graph_loader::{fetch_file, unzip_file}, + io::csv_loader::CsvLoader, prelude::*, }; use chrono::DateTime; diff --git a/raphtory/src/graph_loader/example/sx_superuser_graph.rs b/raphtory/src/graph_loader/sx_superuser_graph.rs similarity index 92% rename from raphtory/src/graph_loader/example/sx_superuser_graph.rs rename to raphtory/src/graph_loader/sx_superuser_graph.rs index 503046f336..4b7887c4d3 100644 --- a/raphtory/src/graph_loader/example/sx_superuser_graph.rs +++ b/raphtory/src/graph_loader/sx_superuser_graph.rs @@ -36,7 +36,7 @@ //! //! Example: //! ```no_run -//! use raphtory::graph_loader::example::sx_superuser_graph::sx_superuser_graph; +//! use raphtory::graph_loader::sx_superuser_graph::sx_superuser_graph; //! use raphtory::prelude::*; //! //! let graph = sx_superuser_graph().unwrap(); @@ -45,10 +45,7 @@ //! println!("The graph has {:?} edges", graph.count_edges()); //! ``` -use crate::{ - graph_loader::{fetch_file, source::csv_loader::CsvLoader}, - prelude::*, -}; +use crate::{graph_loader::fetch_file, io::csv_loader::CsvLoader, prelude::*}; use serde::Deserialize; use std::path::PathBuf; @@ -90,7 +87,7 @@ pub fn sx_superuser_graph() -> Result> { #[cfg(test)] mod sx_superuser_test { - use crate::graph_loader::example::sx_superuser_graph::{sx_superuser_file, sx_superuser_graph}; + use crate::graph_loader::sx_superuser_graph::{sx_superuser_file, sx_superuser_graph}; #[test] #[ignore] // don't hit SNAP by default diff --git a/raphtory/src/io/arrow/dataframe.rs b/raphtory/src/io/arrow/dataframe.rs new file mode 100644 index 0000000000..2b8173f5e1 --- /dev/null +++ b/raphtory/src/io/arrow/dataframe.rs @@ -0,0 +1,101 @@ +use crate::core::utils::errors::GraphError; + +use polars_arrow::{ + array::{Array, PrimitiveArray, Utf8Array}, + compute::cast::{self, CastOptions}, + datatypes::{ArrowDataType as DataType, TimeUnit}, + offset::Offset, + types::NativeType, +}; + +use itertools::Itertools; + +#[derive(Debug)] +pub(crate) struct DFView { + pub(crate) names: Vec, + pub(crate) arrays: Vec>>, +} + +impl DFView { + pub(crate) fn get_inner_size(&self) -> usize { + if self.arrays.is_empty() || self.arrays[0].is_empty() { + return 0; + } + self.arrays[0][0].len() + } + + pub fn check_cols_exist(&self, cols: &[&str]) -> Result<(), GraphError> { + let non_cols: Vec<&&str> = cols + .iter() + .filter(|c| !self.names.contains(&c.to_string())) + .collect(); + if non_cols.len() > 0 { + return Err(GraphError::ColumnDoesNotExist(non_cols.iter().join(", "))); + } + + Ok(()) + } + + pub(crate) fn iter_col( + &self, + name: &str, + ) -> Option> + '_> { + let idx = self.names.iter().position(|n| n == name)?; + + let _ = (&self.arrays[0])[idx] + .as_any() + .downcast_ref::>()?; + + let iter = self.arrays.iter().flat_map(move |arr| { + let arr = &arr[idx]; + let arr = arr.as_any().downcast_ref::>().unwrap(); + arr.iter() + }); + + Some(iter) + } + + pub fn utf8(&self, name: &str) -> Option> + '_> { + let idx = self.names.iter().position(|n| n == name)?; + // test that it's actually a utf8 array + let _ = (&self.arrays[0])[idx] + .as_any() + .downcast_ref::>()?; + + let iter = self.arrays.iter().flat_map(move |arr| { + let arr = &arr[idx]; + let arr = arr.as_any().downcast_ref::>().unwrap(); + arr.iter() + }); + + Some(iter) + } + + pub fn time_iter_col(&self, name: &str) -> Option> + '_> { + let idx = self.names.iter().position(|n| n == name)?; + + let _ = (&self.arrays[0])[idx] + .as_any() + .downcast_ref::>()?; + + let iter = self.arrays.iter().flat_map(move |arr| { + let arr = &arr[idx]; + let arr = if let DataType::Timestamp(_, _) = arr.data_type() { + let array = cast::cast( + &*arr.clone(), + &DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".to_string())), + CastOptions::default(), + ) + .unwrap(); + array + } else { + arr.clone() + }; + + let arr = arr.as_any().downcast_ref::>().unwrap(); + arr.clone().into_iter() + }); + + Some(iter) + } +} diff --git a/raphtory/src/python/graph/pandas/loaders.rs b/raphtory/src/io/arrow/df_loaders.rs similarity index 94% rename from raphtory/src/python/graph/pandas/loaders.rs rename to raphtory/src/io/arrow/df_loaders.rs index 88b3e74437..ee2003953f 100644 --- a/raphtory/src/python/graph/pandas/loaders.rs +++ b/raphtory/src/io/arrow/df_loaders.rs @@ -1,17 +1,20 @@ use crate::{ - core::{entities::graph::tgraph::InternalGraph, utils::errors::GraphError}, - db::api::mutation::AdditionOps, - prelude::*, - python::graph::pandas::{ - dataframe::PretendDF, - prop_handler::{get_prop_rows, lift_layer}, + core::utils::errors::GraphError, + db::api::{ + mutation::{internal::*, AdditionOps}, + view::StaticGraphViewOps, }, + io::arrow::{dataframe::DFView, prop_handler::*}, + prelude::*, }; use kdam::tqdm; use std::{collections::HashMap, iter}; -pub(crate) fn load_nodes_from_df<'a>( - df: &'a PretendDF, +pub(crate) fn load_nodes_from_df< + 'a, + G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps, +>( + df: &'a DFView, size: usize, node_id: &str, time: &str, @@ -20,7 +23,7 @@ pub(crate) fn load_nodes_from_df<'a>( shared_const_properties: Option>, node_type: Option<&str>, node_type_in_df: bool, - graph: &InternalGraph, + graph: &G, ) -> Result<(), GraphError> { let (prop_iter, const_prop_iter) = get_prop_rows(df, properties, const_properties)?; @@ -39,7 +42,7 @@ pub(crate) fn load_nodes_from_df<'a>( }; iter_res? } else { - Box::new(std::iter::repeat(Some(node_type))) + Box::new(iter::repeat(Some(node_type))) } } None => Box::new(iter::repeat(None)), @@ -136,8 +139,12 @@ fn extract_out_default_type(n_t: Option<&str>) -> Option<&str> { } } -pub(crate) fn load_edges_from_df<'a, S: AsRef>( - df: &'a PretendDF, +pub(crate) fn load_edges_from_df< + 'a, + S: AsRef, + G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps, +>( + df: &'a DFView, size: usize, src: &str, dst: &str, @@ -147,7 +154,7 @@ pub(crate) fn load_edges_from_df<'a, S: AsRef>( shared_const_properties: Option>, layer: Option, layer_in_df: bool, - graph: &InternalGraph, + graph: &G, ) -> Result<(), GraphError> { let (prop_iter, const_prop_iter) = get_prop_rows(df, properties, const_properties)?; let layer = lift_layer(layer, layer_in_df, df); @@ -162,7 +169,7 @@ pub(crate) fn load_edges_from_df<'a, S: AsRef>( .zip(dst.map(|i| i.copied())) .zip(time); load_edges_from_num_iter( - &graph, + graph, size, triplets, prop_iter, @@ -180,7 +187,7 @@ pub(crate) fn load_edges_from_df<'a, S: AsRef>( .zip(dst.map(i64_opt_into_u64_opt)) .zip(time); load_edges_from_num_iter( - &graph, + graph, size, triplets, prop_iter, @@ -240,15 +247,19 @@ pub(crate) fn load_edges_from_df<'a, S: AsRef>( Ok(()) } -pub(crate) fn load_edges_deletions_from_df<'a, S: AsRef>( - df: &'a PretendDF, +pub(crate) fn load_edges_deletions_from_df< + 'a, + S: AsRef, + G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps + DeletionOps, +>( + df: &'a DFView, size: usize, src: &str, dst: &str, time: &str, layer: Option, layer_in_df: bool, - graph: &InternalGraph, + graph: &G, ) -> Result<(), GraphError> { let layer = lift_layer(layer, layer_in_df, df); @@ -335,13 +346,16 @@ pub(crate) fn load_edges_deletions_from_df<'a, S: AsRef>( Ok(()) } -pub(crate) fn load_node_props_from_df<'a>( - df: &'a PretendDF, +pub(crate) fn load_node_props_from_df< + 'a, + G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps, +>( + df: &'a DFView, size: usize, node_id: &str, const_properties: Option>, shared_const_properties: Option>, - graph: &InternalGraph, + graph: &G, ) -> Result<(), GraphError> { let (_, const_prop_iter) = get_prop_rows(df, None, const_properties)?; @@ -429,8 +443,12 @@ pub(crate) fn load_node_props_from_df<'a>( Ok(()) } -pub(crate) fn load_edges_props_from_df<'a, S: AsRef>( - df: &'a PretendDF, +pub(crate) fn load_edges_props_from_df< + 'a, + S: AsRef, + G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps, +>( + df: &'a DFView, size: usize, src: &str, dst: &str, @@ -438,7 +456,7 @@ pub(crate) fn load_edges_props_from_df<'a, S: AsRef>( shared_const_properties: Option>, layer: Option, layer_in_df: bool, - graph: &InternalGraph, + graph: &G, ) -> Result<(), GraphError> { let (_, const_prop_iter) = get_prop_rows(df, None, const_properties)?; let layer = lift_layer(layer, layer_in_df, df); @@ -547,8 +565,9 @@ fn load_edges_from_num_iter< I: Iterator, Option), Option)>, PI: Iterator>, IL: Iterator>, + G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps, >( - graph: &InternalGraph, + graph: &G, size: usize, edges: I, properties: PI, @@ -579,8 +598,9 @@ fn load_nodes_from_num_iter< S: AsRef, I: Iterator, Option, Option<&'a str>)>, PI: Iterator>, + G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps, >( - graph: &InternalGraph, + graph: &G, size: usize, nodes: I, properties: PI, diff --git a/raphtory/src/python/graph/pandas/mod.rs b/raphtory/src/io/arrow/mod.rs similarity index 94% rename from raphtory/src/python/graph/pandas/mod.rs rename to raphtory/src/io/arrow/mod.rs index 41b0394b98..6925701f00 100644 --- a/raphtory/src/python/graph/pandas/mod.rs +++ b/raphtory/src/io/arrow/mod.rs @@ -1,22 +1,19 @@ pub mod dataframe; -pub mod loaders; +pub mod df_loaders; mod prop_handler; #[cfg(test)] mod test { use crate::{ + io::arrow::{dataframe::DFView, df_loaders::*}, prelude::*, - python::graph::pandas::{ - dataframe::PretendDF, - loaders::{load_edges_from_df, load_nodes_from_df}, - }, }; use polars_arrow::array::{PrimitiveArray, Utf8Array}; use raphtory_api::core::storage::arc_str::ArcStr; #[test] fn load_edges_from_pretend_df() { - let df = PretendDF { + let df = DFView { names: vec!["src", "dst", "time", "prop1", "prop2"] .iter() .map(|s| s.to_string()) @@ -52,7 +49,7 @@ mod test { None, layer, layer_in_df, - &graph.0, + &graph, ) .expect("failed to load edges from pretend df"); @@ -88,7 +85,7 @@ mod test { #[test] fn load_nodes_from_pretend_df() { - let df = PretendDF { + let df = DFView { names: vec!["id", "name", "time", "node_type"] .iter() .map(|s| s.to_string()) @@ -120,7 +117,7 @@ mod test { None, Some("node_type"), false, - &graph.0, + &graph, ) .expect("failed to load nodes from pretend df"); diff --git a/raphtory/src/python/graph/pandas/prop_handler.rs b/raphtory/src/io/arrow/prop_handler.rs similarity index 99% rename from raphtory/src/python/graph/pandas/prop_handler.rs rename to raphtory/src/io/arrow/prop_handler.rs index 5b672c642f..c3d07979c4 100644 --- a/raphtory/src/python/graph/pandas/prop_handler.rs +++ b/raphtory/src/io/arrow/prop_handler.rs @@ -6,8 +6,8 @@ use polars_arrow::{ use crate::{ core::{utils::errors::GraphError, IntoPropList}, + io::arrow::dataframe::DFView, prelude::Prop, - python::graph::pandas::dataframe::PretendDF, }; pub struct PropIter<'a> { @@ -23,7 +23,7 @@ impl<'a> Iterator for PropIter<'a> { } pub(crate) fn get_prop_rows<'a>( - df: &'a PretendDF, + df: &'a DFView, props: Option>, const_props: Option>, ) -> Result<(PropIter<'a>, PropIter<'a>), GraphError> { @@ -34,7 +34,7 @@ pub(crate) fn get_prop_rows<'a>( fn combine_properties<'a>( props: Option>, - df: &'a PretendDF, + df: &'a DFView, ) -> Result, GraphError> { let iter = props .unwrap_or_default() @@ -148,7 +148,7 @@ fn validate_data_types(dt: &DataType) -> Result<(), GraphError> { pub(crate) fn lift_property<'a: 'b, 'b>( name: &'a str, - df: &'b PretendDF, + df: &'b DFView, ) -> Result> + 'b>, GraphError> { let idx = df .names @@ -383,7 +383,7 @@ pub(crate) fn lift_property<'a: 'b, 'b>( pub(crate) fn lift_layer<'a, S: AsRef>( layer: Option, layer_in_df: bool, - df: &'a PretendDF, + df: &'a DFView, ) -> Box> + 'a> { if let Some(layer) = layer { if layer_in_df { diff --git a/raphtory/src/graph_loader/source/csv_loader.rs b/raphtory/src/io/csv_loader.rs similarity index 97% rename from raphtory/src/graph_loader/source/csv_loader.rs rename to raphtory/src/io/csv_loader.rs index d6cc8f5129..1baa0fdd1a 100644 --- a/raphtory/src/graph_loader/source/csv_loader.rs +++ b/raphtory/src/io/csv_loader.rs @@ -4,8 +4,8 @@ //! ```no_run //! use std::path::{Path, PathBuf}; //! use regex::Regex; -//! use raphtory::graph_loader::source::csv_loader::CsvLoader; -//! use raphtory::graph_loader::example::lotr_graph::Lotr; +//! use raphtory::io::csv_loader::CsvLoader; +//! use raphtory::graph_loader::lotr_graph::Lotr; //! use raphtory::prelude::*; //! //! let g = Graph::new(); @@ -142,7 +142,7 @@ impl CsvLoader { /// /// ```no_run /// - /// use raphtory::graph_loader::source::csv_loader::CsvLoader; + /// use raphtory::io::csv_loader::CsvLoader; /// let loader = CsvLoader::new("/path/to/csv_file.csv"); /// ``` pub fn new>(p: P) -> Self { @@ -164,7 +164,7 @@ impl CsvLoader { /// # Example /// /// ```no_run - /// use raphtory::graph_loader::source::csv_loader::CsvLoader; + /// use raphtory::io::csv_loader::CsvLoader; /// let loader = CsvLoader::new("/path/to/csv_file.csv").set_header(true); /// ``` pub fn set_header(mut self, h: bool) -> Self { @@ -180,7 +180,7 @@ impl CsvLoader { /// /// # Example /// ```no_run - /// use raphtory::graph_loader::source::csv_loader::CsvLoader; + /// use raphtory::io::csv_loader::CsvLoader; /// let loader = CsvLoader::new("/path/to/csv_file.csv").set_print_file_name(true); /// ``` pub fn set_print_file_name(mut self, p: bool) -> Self { @@ -197,7 +197,7 @@ impl CsvLoader { /// # Example /// /// ```no_run - /// use raphtory::graph_loader::source::csv_loader::CsvLoader; + /// use raphtory::io::csv_loader::CsvLoader; /// let loader = CsvLoader::new("/path/to/csv_file.csv").set_delimiter("|"); /// ``` pub fn set_delimiter(mut self, d: &str) -> Self { @@ -215,7 +215,7 @@ impl CsvLoader { /// /// ```no_run /// use regex::Regex; - /// use raphtory::graph_loader::source::csv_loader::CsvLoader; + /// use raphtory::io::csv_loader::CsvLoader; /// /// let loader = CsvLoader::new("/path/to/csv_files") /// .with_filter(Regex::new(r"file_name_pattern").unwrap()); @@ -473,7 +473,7 @@ impl CsvLoader { #[cfg(test)] mod csv_loader_test { - use crate::{graph_loader::source::csv_loader::CsvLoader, prelude::*}; + use crate::{io::csv_loader::CsvLoader, prelude::*}; use csv::StringRecord; use regex::Regex; use serde::Deserialize; diff --git a/raphtory/src/graph_loader/source/json_loader.rs b/raphtory/src/io/json_loader.rs similarity index 100% rename from raphtory/src/graph_loader/source/json_loader.rs rename to raphtory/src/io/json_loader.rs diff --git a/raphtory/src/io/mod.rs b/raphtory/src/io/mod.rs new file mode 100644 index 0000000000..327e9b42c4 --- /dev/null +++ b/raphtory/src/io/mod.rs @@ -0,0 +1,7 @@ +#[cfg(feature = "arrow")] +pub(crate) mod arrow; +pub mod csv_loader; +pub mod json_loader; +pub mod neo4j_loader; +#[cfg(feature = "arrow")] +pub mod parquet_loaders; diff --git a/raphtory/src/graph_loader/source/neo4j_loader.rs b/raphtory/src/io/neo4j_loader.rs similarity index 98% rename from raphtory/src/graph_loader/source/neo4j_loader.rs rename to raphtory/src/io/neo4j_loader.rs index c437821727..882d771afd 100644 --- a/raphtory/src/graph_loader/source/neo4j_loader.rs +++ b/raphtory/src/io/neo4j_loader.rs @@ -50,7 +50,7 @@ impl Neo4JConnection { mod neo_loader_test { use crate::{ db::{api::mutation::AdditionOps, graph::graph as rap}, - graph_loader::source::neo4j_loader::Neo4JConnection, + io::neo4j_loader::Neo4JConnection, prelude::*, }; use neo4rs::*; diff --git a/raphtory/src/io/parquet_loaders.rs b/raphtory/src/io/parquet_loaders.rs new file mode 100644 index 0000000000..0cf35b2586 --- /dev/null +++ b/raphtory/src/io/parquet_loaders.rs @@ -0,0 +1,341 @@ +use crate::{ + core::{utils::errors::GraphError, Prop}, + db::api::{ + mutation::internal::{InternalAdditionOps, InternalPropertyAdditionOps}, + view::StaticGraphViewOps, + }, + io::arrow::{dataframe::*, df_loaders::*}, + prelude::DeletionOps, +}; +use itertools::Itertools; +use polars_arrow::{ + array::Array, + datatypes::{ArrowDataType as DataType, ArrowSchema, Field}, + legacy::error, + record_batch::RecordBatch as Chunk, +}; +use polars_parquet::{ + read, + read::{read_metadata, FileMetaData, FileReader}, +}; +use std::{ + collections::HashMap, + fs, + path::{Path, PathBuf}, +}; + +pub fn load_nodes_from_parquet< + G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps, +>( + graph: &G, + parquet_path: &Path, + id: &str, + time: &str, + node_type: Option<&str>, + node_type_in_df: Option, + properties: Option>, + const_properties: Option>, + shared_const_properties: Option>, +) -> Result<(), GraphError> { + let mut cols_to_check = vec![id, time]; + cols_to_check.extend(properties.as_ref().unwrap_or(&Vec::new())); + cols_to_check.extend(const_properties.as_ref().unwrap_or(&Vec::new())); + if node_type_in_df.unwrap_or(true) { + if let Some(ref node_type) = node_type { + cols_to_check.push(node_type.as_ref()); + } + } + + for path in get_parquet_file_paths(parquet_path)? { + let df = process_parquet_file_to_df(path.as_path(), cols_to_check.clone())?; + df.check_cols_exist(&cols_to_check)?; + let size = df.get_inner_size(); + load_nodes_from_df( + &df, + size, + id, + time, + properties.clone(), + const_properties.clone(), + shared_const_properties.clone(), + node_type, + node_type_in_df.unwrap_or(true), + graph, + ) + .map_err(|e| GraphError::LoadFailure(format!("Failed to load graph {e:?}")))?; + } + + Ok(()) +} + +pub fn load_edges_from_parquet< + G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps, +>( + graph: &G, + parquet_path: &Path, + src: &str, + dst: &str, + time: &str, + properties: Option>, + const_properties: Option>, + shared_const_properties: Option>, + layer: Option<&str>, + layer_in_df: Option, +) -> Result<(), GraphError> { + let mut cols_to_check = vec![src, dst, time]; + cols_to_check.extend(properties.as_ref().unwrap_or(&Vec::new())); + cols_to_check.extend(const_properties.as_ref().unwrap_or(&Vec::new())); + if layer_in_df.unwrap_or(false) { + if let Some(ref layer) = layer { + cols_to_check.push(layer.as_ref()); + } + } + + for path in get_parquet_file_paths(parquet_path)? { + let df = process_parquet_file_to_df(path.as_path(), cols_to_check.clone())?; + df.check_cols_exist(&cols_to_check)?; + let size = cols_to_check.len(); + load_edges_from_df( + &df, + size, + src, + dst, + time, + properties.clone(), + const_properties.clone(), + shared_const_properties.clone(), + layer, + layer_in_df.unwrap_or(true), + graph, + ) + .map_err(|e| GraphError::LoadFailure(format!("Failed to load graph {e:?}")))?; + } + + Ok(()) +} + +pub fn load_node_props_from_parquet< + G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps, +>( + graph: &G, + parquet_path: &Path, + id: &str, + const_properties: Option>, + shared_const_properties: Option>, +) -> Result<(), GraphError> { + let mut cols_to_check = vec![id]; + cols_to_check.extend(const_properties.as_ref().unwrap_or(&Vec::new())); + + for path in get_parquet_file_paths(parquet_path)? { + let df = process_parquet_file_to_df(path.as_path(), cols_to_check.clone())?; + df.check_cols_exist(&cols_to_check)?; + let size = cols_to_check.len(); + load_node_props_from_df( + &df, + size, + id, + const_properties.clone(), + shared_const_properties.clone(), + graph, + ) + .map_err(|e| GraphError::LoadFailure(format!("Failed to load graph {e:?}")))?; + } + + Ok(()) +} + +pub fn load_edge_props_from_parquet< + G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps, +>( + graph: &G, + parquet_path: &Path, + src: &str, + dst: &str, + const_properties: Option>, + shared_const_properties: Option>, + layer: Option<&str>, + layer_in_df: Option, +) -> Result<(), GraphError> { + let mut cols_to_check = vec![src, dst]; + if layer_in_df.unwrap_or(false) { + if let Some(ref layer) = layer { + cols_to_check.push(layer.as_ref()); + } + } + cols_to_check.extend(const_properties.as_ref().unwrap_or(&Vec::new())); + + for path in get_parquet_file_paths(parquet_path)? { + let df = process_parquet_file_to_df(path.as_path(), cols_to_check.clone())?; + df.check_cols_exist(&cols_to_check)?; + let size = cols_to_check.len(); + load_edges_props_from_df( + &df, + size, + src, + dst, + const_properties.clone(), + shared_const_properties.clone(), + layer, + layer_in_df.unwrap_or(true), + graph, + ) + .map_err(|e| GraphError::LoadFailure(format!("Failed to load graph {e:?}")))?; + } + + Ok(()) +} + +pub fn load_edges_deletions_from_parquet< + G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps + DeletionOps, +>( + graph: &G, + parquet_path: &Path, + src: &str, + dst: &str, + time: &str, + layer: Option<&str>, + layer_in_df: Option, +) -> Result<(), GraphError> { + let mut cols_to_check = vec![src, dst, time]; + if layer_in_df.unwrap_or(true) { + if let Some(ref layer) = layer { + cols_to_check.push(layer.as_ref()); + } + } + + for path in get_parquet_file_paths(parquet_path)? { + let df = process_parquet_file_to_df(path.as_path(), cols_to_check.clone())?; + df.check_cols_exist(&cols_to_check)?; + let size = cols_to_check.len(); + load_edges_deletions_from_df( + &df, + size, + src, + dst, + time, + layer, + layer_in_df.unwrap_or(true), + graph, + ) + .map_err(|e| GraphError::LoadFailure(format!("Failed to load graph {e:?}")))?; + } + + Ok(()) +} + +pub(crate) fn process_parquet_file_to_df( + parquet_file_path: &Path, + col_names: Vec<&str>, +) -> Result { + let (names, arrays) = read_parquet_file(parquet_file_path, &col_names)?; + + let names = names + .into_iter() + .filter(|x| col_names.contains(&x.as_str())) + .collect(); + let arrays = arrays + .map_ok(|r| r.into_iter().map(|boxed| boxed.clone()).collect_vec()) + .collect::, _>>()?; + + Ok(DFView { names, arrays }) +} + +fn read_parquet_file( + path: impl AsRef, + col_names: &Vec<&str>, +) -> Result< + ( + Vec, + impl Iterator>, error::PolarsError>>, + ), + GraphError, +> { + let read_schema = |metadata: &FileMetaData| -> Result { + let schema = read::infer_schema(metadata)?; + let fields = schema + .fields + .iter() + .map(|f| { + if f.data_type == DataType::Utf8View { + Field::new(f.name.clone(), DataType::LargeUtf8, f.is_nullable) + } else { + f.clone() + } + }) + .filter(|f| { + // Filtered fields to avoid loading data that is not needed + col_names.contains(&f.name.as_str()) + }) + .collect::>(); + + Ok(ArrowSchema::from(fields).with_metadata(schema.metadata)) + }; + + let mut file = std::fs::File::open(&path)?; + let metadata = read_metadata(&mut file)?; + let row_groups = metadata.clone().row_groups; + let schema = read_schema(&metadata)?; + + // Although fields are already filtered by col_names, we need names in the order as it appears + // in the schema to create PretendDF + let names = schema.fields.iter().map(|f| f.name.clone()).collect_vec(); + + let reader = FileReader::new(file, row_groups, schema, None, None, None); + Ok((names, reader)) +} + +fn get_parquet_file_paths(parquet_path: &Path) -> Result, GraphError> { + let mut parquet_files = Vec::new(); + if parquet_path.is_file() { + parquet_files.push(parquet_path.to_path_buf()); + } else if parquet_path.is_dir() { + for entry in fs::read_dir(parquet_path).expect("Directory not found") { + let entry = entry.expect("Unable to read entry"); + let path = entry.path(); + if path.extension().map_or(false, |ext| ext == "parquet") { + parquet_files.push(path); + } + } + } else { + return Err(GraphError::InvalidPath(parquet_path.display().to_string())); + } + + Ok(parquet_files) +} + +#[cfg(test)] +mod test { + use super::*; + use polars_arrow::array::{PrimitiveArray, Utf8Array}; + use std::path::PathBuf; + + #[test] + fn test_process_parquet_file_to_df() { + let parquet_file_path = + PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("resources/test/test_data.parquet"); + + let col_names = vec!["src", "dst", "time", "weight", "marbles"]; + let df = process_parquet_file_to_df(parquet_file_path.as_path(), col_names).unwrap(); + + let df1 = DFView { + names: vec!["src", "dst", "time", "weight", "marbles"] + .iter() + .map(|s| s.to_string()) + .collect(), + arrays: vec![vec![ + Box::new(PrimitiveArray::::from_values(vec![1, 2, 3, 4, 5])), + Box::new(PrimitiveArray::::from_values(vec![2, 3, 4, 5, 6])), + Box::new(PrimitiveArray::::from_values(vec![1, 2, 3, 4, 5])), + Box::new(PrimitiveArray::::from_values(vec![ + 1f64, 2f64, 3f64, 4f64, 5f64, + ])), + Box::new(Utf8Array::::from_iter_values( + vec!["red", "blue", "green", "yellow", "purple"].into_iter(), + )), + ]], + }; + + assert_eq!(df.names, df1.names); + assert_eq!(df.arrays, df1.arrays); + } +} diff --git a/raphtory/src/lib.rs b/raphtory/src/lib.rs index a0c5fded07..1121df1def 100644 --- a/raphtory/src/lib.rs +++ b/raphtory/src/lib.rs @@ -104,6 +104,9 @@ pub mod search; #[cfg(feature = "vectors")] pub mod vectors; +#[cfg(feature = "io")] +pub mod io; + pub mod prelude { pub const NO_PROPS: [(&str, Prop); 0] = []; pub use crate::{ diff --git a/raphtory/src/python/graph/disk_graph.rs b/raphtory/src/python/graph/disk_graph.rs index 8ae7c94716..3bfe1460a5 100644 --- a/raphtory/src/python/graph/disk_graph.rs +++ b/raphtory/src/python/graph/disk_graph.rs @@ -33,7 +33,8 @@ use pyo3::{ types::{IntoPyDict, PyDict, PyList, PyString}, }; -use super::pandas::dataframe::{process_pandas_py_df, PretendDF}; +use super::io::pandas_loaders::*; +use crate::io::arrow::dataframe::DFView; impl From for PyErr { fn from(value: Error) -> Self { @@ -172,7 +173,7 @@ impl PyDiskGraph { let df_columns: Vec = edge_df.getattr("columns")?.extract()?; let df_columns: Vec<&str> = df_columns.iter().map(|x| x.as_str()).collect(); - let df = process_pandas_py_df(edge_df, py, size, df_columns)?; + let df = process_pandas_py_df(edge_df, py, df_columns)?; df.check_cols_exist(&cols_to_check)?; let graph = Self::from_pandas(graph_dir, df, src_col, dst_col, time_col)?; @@ -239,7 +240,7 @@ impl PyDiskGraph { impl PyDiskGraph { fn from_pandas( graph_dir: &str, - df: PretendDF, + df: DFView, src: &str, dst: &str, time: &str, diff --git a/raphtory/src/python/graph/graph.rs b/raphtory/src/python/graph/graph.rs index 5312fc9f10..44764179ca 100644 --- a/raphtory/src/python/graph/graph.rs +++ b/raphtory/src/python/graph/graph.rs @@ -3,7 +3,6 @@ //! This is the base class used to create a temporal graph, add nodes and edges, //! create windows, and query the graph with a variety of algorithms. //! In Python, this class wraps around the rust graph. -use super::utils; use crate::{ algorithms::components::LargestConnectedComponent, core::{entities::nodes::node_ref::NodeRef, utils::errors::GraphError}, @@ -11,11 +10,12 @@ use crate::{ api::view::internal::{CoreGraphOps, DynamicGraph, IntoDynamic, MaterializedGraph}, graph::{edge::EdgeView, node::NodeView, views::node_subgraph::NodeSubgraph}, }, + io::parquet_loaders::*, prelude::*, python::{ graph::{ - edge::PyEdge, graph_with_deletions::PyPersistentGraph, node::PyNode, - views::graph_view::PyGraphView, + edge::PyEdge, graph_with_deletions::PyPersistentGraph, io::pandas_loaders::*, + node::PyNode, views::graph_view::PyGraphView, }, utils::{PyInputNode, PyTime}, }, @@ -25,7 +25,8 @@ use raphtory_api::core::storage::arc_str::ArcStr; use std::{ collections::HashMap, fmt::{Debug, Formatter}, - path::Path, + ops::Deref, + path::{Path, PathBuf}, }; /// A temporal graph. @@ -459,6 +460,83 @@ impl PyGraph { Ok(graph.graph) } + /// Load a graph from Parquet file. + /// + /// Args: + /// edge_parquet_path (str): Parquet file or directory of Parquet files containing the edges. + /// edge_src (str): The column name for the source node ids. + /// edge_dst (str): The column name for the destination node ids. + /// edge_time (str): The column name for the timestamps. + /// edge_properties (list): The column names for the temporal properties (optional) Defaults to None. + /// edge_const_properties (list): The column names for the constant properties (optional) Defaults to None. + /// edge_shared_const_properties (dict): A dictionary of constant properties that will be added to every edge (optional) Defaults to None. + /// edge_layer (str): The edge layer name (optional) Defaults to None. + /// layer_in_df (bool): Whether the layer name should be used to look up the values in a column of the edge_df or if it should be used directly as the layer for all edges (optional) defaults to True. + /// node_parquet_path (str): Parquet file or directory of Parquet files containing the nodes (optional) Defaults to None. + /// node_id (str): The column name for the node ids (optional) Defaults to None. + /// node_time (str): The column name for the node timestamps (optional) Defaults to None. + /// node_properties (list): The column names for the node temporal properties (optional) Defaults to None. + /// node_const_properties (list): The column names for the node constant properties (optional) Defaults to None. + /// node_shared_const_properties (dict): A dictionary of constant properties that will be added to every node (optional) Defaults to None. + /// node_type (str): the column name for the node type + /// node_type_in_df (bool): whether the node type should be used to look up the values in a column of the df or if it should be used directly as the node type + /// + /// Returns: + /// Graph: The loaded Graph object. + #[staticmethod] + #[pyo3(signature = (edge_parquet_path, edge_src, edge_dst, edge_time, edge_properties = None, edge_const_properties = None, edge_shared_const_properties = None, + edge_layer = None, layer_in_df = true, node_parquet_path = None, node_id = None, node_time = None, node_properties = None, + node_const_properties = None, node_shared_const_properties = None, node_type = None, node_type_in_df = true))] + fn load_from_parquet( + edge_parquet_path: PathBuf, + edge_src: &str, + edge_dst: &str, + edge_time: &str, + edge_properties: Option>, + edge_const_properties: Option>, + edge_shared_const_properties: Option>, + edge_layer: Option<&str>, + layer_in_df: Option, + node_parquet_path: Option, + node_id: Option<&str>, + node_time: Option<&str>, + node_properties: Option>, + node_const_properties: Option>, + node_shared_const_properties: Option>, + node_type: Option<&str>, + node_type_in_df: Option, + ) -> Result { + let graph = PyGraph { + graph: Graph::new(), + }; + if let (Some(node_parquet_path), Some(node_id), Some(node_time)) = + (node_parquet_path, node_id, node_time) + { + graph.load_nodes_from_parquet( + node_parquet_path, + node_id, + node_time, + node_type, + node_type_in_df, + node_properties, + node_const_properties, + node_shared_const_properties, + )?; + } + graph.load_edges_from_parquet( + edge_parquet_path, + edge_src, + edge_dst, + edge_time, + edge_properties, + edge_const_properties, + edge_shared_const_properties, + edge_layer, + layer_in_df, + )?; + Ok(graph.graph) + } + /// Load nodes from a Pandas DataFrame into the graph. /// /// Arguments: @@ -484,7 +562,7 @@ impl PyGraph { const_properties: Option>, shared_const_properties: Option>, ) -> Result<(), GraphError> { - utils::load_nodes_from_pandas( + load_nodes_from_pandas( &self.graph.0, df, id, @@ -497,6 +575,44 @@ impl PyGraph { ) } + /// Load nodes from a Parquet file into the graph. + /// + /// Arguments: + /// parquet_path (str): Parquet file or directory of Parquet files containing the nodes + /// id (str): The column name for the node IDs. + /// time (str): The column name for the timestamps. + /// node_type (str): the column name for the node type + /// node_type_in_df (bool): whether the node type should be used to look up the values in a column of the df or if it should be used directly as the node type + /// properties (List): List of node property column names. Defaults to None. (optional) + /// const_properties (List): List of constant node property column names. Defaults to None. (optional) + /// shared_const_properties (Dictionary/Hashmap of properties): A dictionary of constant properties that will be added to every node. Defaults to None. (optional) + /// Returns: + /// Result<(), GraphError>: Result of the operation. + #[pyo3(signature = (parquet_path, id, time, node_type = None, node_type_in_df = true, properties = None, const_properties = None, shared_const_properties = None))] + fn load_nodes_from_parquet( + &self, + parquet_path: PathBuf, + id: &str, + time: &str, + node_type: Option<&str>, + node_type_in_df: Option, + properties: Option>, + const_properties: Option>, + shared_const_properties: Option>, + ) -> Result<(), GraphError> { + load_nodes_from_parquet( + &self.graph, + parquet_path.as_path(), + id, + time, + node_type, + node_type_in_df, + properties, + const_properties, + shared_const_properties, + ) + } + /// Load edges from a Pandas DataFrame into the graph. /// /// Arguments: @@ -525,7 +641,7 @@ impl PyGraph { layer: Option<&str>, layer_in_df: Option, ) -> Result<(), GraphError> { - utils::load_edges_from_pandas( + load_edges_from_pandas( &self.graph.0, df, src, @@ -539,6 +655,48 @@ impl PyGraph { ) } + /// Load edges from a Parquet file into the graph. + /// + /// Arguments: + /// parquet_path (str): Parquet file or directory of Parquet files path containing edges + /// src (str): The column name for the source node ids. + /// dst (str): The column name for the destination node ids. + /// time (str): The column name for the update timestamps. + /// properties (List): List of edge property column names. Defaults to None. (optional) + /// const_properties (List): List of constant edge property column names. Defaults to None. (optional) + /// shared_const_properties (dict): A dictionary of constant properties that will be added to every edge. Defaults to None. (optional) + /// layer (str): The edge layer name (optional) Defaults to None. + /// layer_in_df (bool): Whether the layer name should be used to look up the values in a column of the dataframe or if it should be used directly as the layer for all edges (optional) defaults to True. + /// + /// Returns: + /// Result<(), GraphError>: Result of the operation. + #[pyo3(signature = (parquet_path, src, dst, time, properties = None, const_properties = None, shared_const_properties = None, layer = None, layer_in_df = true))] + fn load_edges_from_parquet( + &self, + parquet_path: PathBuf, + src: &str, + dst: &str, + time: &str, + properties: Option>, + const_properties: Option>, + shared_const_properties: Option>, + layer: Option<&str>, + layer_in_df: Option, + ) -> Result<(), GraphError> { + load_edges_from_parquet( + &self.graph, + parquet_path.as_path(), + src, + dst, + time, + properties, + const_properties, + shared_const_properties, + layer, + layer_in_df, + ) + } + /// Load node properties from a Pandas DataFrame. /// /// Arguments: @@ -557,7 +715,7 @@ impl PyGraph { const_properties: Option>, shared_const_properties: Option>, ) -> Result<(), GraphError> { - utils::load_node_props_from_pandas( + load_node_props_from_pandas( &self.graph.0, df, id, @@ -566,6 +724,33 @@ impl PyGraph { ) } + /// Load node properties from a parquet file. + /// + /// Arguments: + /// parquet_path (str): Parquet file or directory of Parquet files path containing node information. + /// id(str): The column name for the node IDs. + /// const_properties (List): List of constant node property column names. Defaults to None. (optional) + /// shared_const_properties (>): A dictionary of constant properties that will be added to every node. Defaults to None. (optional) + /// + /// Returns: + /// Result<(), GraphError>: Result of the operation. + #[pyo3(signature = (parquet_path, id, const_properties = None, shared_const_properties = None))] + fn load_node_props_from_parquet( + &self, + parquet_path: PathBuf, + id: &str, + const_properties: Option>, + shared_const_properties: Option>, + ) -> Result<(), GraphError> { + load_node_props_from_parquet( + &self.graph, + parquet_path.as_path(), + id, + const_properties, + shared_const_properties, + ) + } + /// Load edge properties from a Pandas DataFrame. /// /// Arguments: @@ -590,7 +775,7 @@ impl PyGraph { layer: Option<&str>, layer_in_df: Option, ) -> Result<(), GraphError> { - utils::load_edge_props_from_pandas( + load_edge_props_from_pandas( &self.graph.0, df, src, @@ -601,4 +786,40 @@ impl PyGraph { layer_in_df, ) } + + /// Load edge properties from parquet file + /// + /// Arguments: + /// parquet_path (str): Parquet file or directory of Parquet files path containing edge information. + /// src (str): The column name for the source node. + /// dst (str): The column name for the destination node. + /// const_properties (List): List of constant edge property column names. Defaults to None. (optional) + /// shared_const_properties (dict): A dictionary of constant properties that will be added to every edge. Defaults to None. (optional) + /// layer (str): Layer name. Defaults to None. (optional) + /// layer_in_df (bool): Whether the layer name should be used to look up the values in a column of the data frame or if it should be used directly as the layer for all edges (optional) defaults to True. + /// + /// Returns: + /// Result<(), GraphError>: Result of the operation. + #[pyo3(signature = (parquet_path, src, dst, const_properties = None, shared_const_properties = None, layer = None, layer_in_df = true))] + fn load_edge_props_from_parquet( + &self, + parquet_path: PathBuf, + src: &str, + dst: &str, + const_properties: Option>, + shared_const_properties: Option>, + layer: Option<&str>, + layer_in_df: Option, + ) -> Result<(), GraphError> { + load_edge_props_from_parquet( + &self.graph, + parquet_path.as_path(), + src, + dst, + const_properties, + shared_const_properties, + layer, + layer_in_df, + ) + } } diff --git a/raphtory/src/python/graph/graph_with_deletions.rs b/raphtory/src/python/graph/graph_with_deletions.rs index 2a565f7c21..8c50c0d9ad 100644 --- a/raphtory/src/python/graph/graph_with_deletions.rs +++ b/raphtory/src/python/graph/graph_with_deletions.rs @@ -5,14 +5,6 @@ //! create windows, and query the graph with a variety of algorithms. //! It is a wrapper around a set of shards, which are the actual graph data structures. //! In Python, this class wraps around the rust graph. -use super::{ - graph::PyGraph, - pandas::{ - dataframe::{process_pandas_py_df, GraphLoadException}, - loaders::load_edges_deletions_from_df, - }, - utils, -}; use crate::{ core::{entities::nodes::node_ref::NodeRef, utils::errors::GraphError, Prop}, db::{ @@ -28,10 +20,7 @@ use crate::{ utils::{PyInputNode, PyTime}, }, }; -use pyo3::{ - prelude::*, - types::{IntoPyDict, PyBytes}, -}; +use pyo3::{prelude::*, types::PyBytes}; use raphtory_api::core::storage::arc_str::ArcStr; use std::{ collections::HashMap, @@ -39,6 +28,9 @@ use std::{ path::{Path, PathBuf}, }; +use super::{graph::PyGraph, io::pandas_loaders::*}; +use crate::io::parquet_loaders::*; + /// A temporal graph that allows edges and nodes to be deleted. #[derive(Clone)] #[pyclass(name = "PersistentGraph", extends = PyGraphView)] @@ -455,6 +447,83 @@ impl PyPersistentGraph { Ok(graph.graph) } + /// Load a graph from Parquet file. + /// + /// Args: + /// edge_parquet_path (str): Parquet file or directory of Parquet files containing the edges. + /// edge_src (str): The column name for the source node ids. + /// edge_dst (str): The column name for the destination node ids. + /// edge_time (str): The column name for the timestamps. + /// edge_properties (list): The column names for the temporal properties (optional) Defaults to None. + /// edge_const_properties (list): The column names for the constant properties (optional) Defaults to None. + /// edge_shared_const_properties (dict): A dictionary of constant properties that will be added to every edge (optional) Defaults to None. + /// edge_layer (str): The edge layer name (optional) Defaults to None. + /// layer_in_df (bool): Whether the layer name should be used to look up the values in a column of the edge_df or if it should be used directly as the layer for all edges (optional) defaults to True. + /// node_parquet_path (str): Parquet file or directory of Parquet files containing the nodes (optional) Defaults to None. + /// node_id (str): The column name for the node ids (optional) Defaults to None. + /// node_time (str): The column name for the node timestamps (optional) Defaults to None. + /// node_properties (list): The column names for the node temporal properties (optional) Defaults to None. + /// node_const_properties (list): The column names for the node constant properties (optional) Defaults to None. + /// node_shared_const_properties (dict): A dictionary of constant properties that will be added to every node (optional) Defaults to None. + /// node_type (str): the column name for the node type + /// node_type_in_df (bool): whether the node type should be used to look up the values in a column of the df or if it should be used directly as the node type + /// + /// Returns: + /// Graph: The loaded Graph object. + #[staticmethod] + #[pyo3(signature = (edge_parquet_path, edge_src, edge_dst, edge_time, edge_properties = None, edge_const_properties = None, edge_shared_const_properties = None, + edge_layer = None, layer_in_df = true, node_parquet_path = None, node_id = None, node_time = None, node_properties = None, + node_const_properties = None, node_shared_const_properties = None, node_type = None, node_type_in_df = true))] + fn load_from_parquet( + edge_parquet_path: PathBuf, + edge_src: &str, + edge_dst: &str, + edge_time: &str, + edge_properties: Option>, + edge_const_properties: Option>, + edge_shared_const_properties: Option>, + edge_layer: Option<&str>, + layer_in_df: Option, + node_parquet_path: Option, + node_id: Option<&str>, + node_time: Option<&str>, + node_properties: Option>, + node_const_properties: Option>, + node_shared_const_properties: Option>, + node_type: Option<&str>, + node_type_in_df: Option, + ) -> Result { + let graph = PyPersistentGraph { + graph: PersistentGraph::new(), + }; + if let (Some(node_parquet_file_path), Some(node_id), Some(node_time)) = + (node_parquet_path, node_id, node_time) + { + graph.load_nodes_from_parquet( + node_parquet_file_path, + node_id, + node_time, + node_type, + node_type_in_df, + node_properties, + node_const_properties, + node_shared_const_properties, + )?; + } + graph.load_edges_from_parquet( + edge_parquet_path, + edge_src, + edge_dst, + edge_time, + edge_properties, + edge_const_properties, + edge_shared_const_properties, + edge_layer, + layer_in_df, + )?; + Ok(graph.graph) + } + /// Load nodes from a Pandas DataFrame into the graph. /// /// Arguments: @@ -480,7 +549,7 @@ impl PyPersistentGraph { const_properties: Option>, shared_const_properties: Option>, ) -> Result<(), GraphError> { - utils::load_nodes_from_pandas( + load_nodes_from_pandas( &self.graph.0, df, id, @@ -493,6 +562,44 @@ impl PyPersistentGraph { ) } + /// Load nodes from a Parquet file into the graph. + /// + /// Arguments: + /// parquet_path (str): Parquet file or directory of Parquet files containing the nodes + /// id (str): The column name for the node IDs. + /// time (str): The column name for the timestamps. + /// node_type (str): the column name for the node type + /// node_type_in_df (bool): whether the node type should be used to look up the values in a column of the df or if it should be used directly as the node type + /// properties (List): List of node property column names. Defaults to None. (optional) + /// const_properties (List): List of constant node property column names. Defaults to None. (optional) + /// shared_const_properties (Dictionary/Hashmap of properties): A dictionary of constant properties that will be added to every node. Defaults to None. (optional) + /// Returns: + /// Result<(), GraphError>: Result of the operation. + #[pyo3(signature = (parquet_path, id, time, node_type = None, node_type_in_df = true, properties = None, const_properties = None, shared_const_properties = None))] + fn load_nodes_from_parquet( + &self, + parquet_path: PathBuf, + id: &str, + time: &str, + node_type: Option<&str>, + node_type_in_df: Option, + properties: Option>, + const_properties: Option>, + shared_const_properties: Option>, + ) -> Result<(), GraphError> { + load_nodes_from_parquet( + &self.graph, + parquet_path.as_path(), + id, + time, + node_type, + node_type_in_df, + properties, + const_properties, + shared_const_properties, + ) + } + /// Load edges from a Pandas DataFrame into the graph. /// /// Arguments: @@ -504,7 +611,7 @@ impl PyPersistentGraph { /// const_properties (List): List of constant edge property column names. Defaults to None. (optional) /// shared_const_properties (dict): A dictionary of constant properties that will be added to every edge. Defaults to None. (optional) /// layer (str): The edge layer name (optional) Defaults to None. - /// layer_in_df (bool): Whether the layer name should be used to look up the values in a column of the dateframe or if it should be used directly as the layer for all edges (optional) defaults to True. + /// layer_in_df (bool): Whether the layer name should be used to look up the values in a column of the dataframe or if it should be used directly as the layer for all edges (optional) defaults to True. /// /// Returns: /// Result<(), GraphError>: Result of the operation. @@ -521,7 +628,7 @@ impl PyPersistentGraph { layer: Option<&str>, layer_in_df: Option, ) -> Result<(), GraphError> { - utils::load_edges_from_pandas( + load_edges_from_pandas( &self.graph.0, df, src, @@ -535,6 +642,48 @@ impl PyPersistentGraph { ) } + /// Load edges from a Parquet file into the graph. + /// + /// Arguments: + /// parquet_path (str): Parquet file or directory of Parquet files path containing edges + /// src (str): The column name for the source node ids. + /// dst (str): The column name for the destination node ids. + /// time (str): The column name for the update timestamps. + /// properties (List): List of edge property column names. Defaults to None. (optional) + /// const_properties (List): List of constant edge property column names. Defaults to None. (optional) + /// shared_const_properties (dict): A dictionary of constant properties that will be added to every edge. Defaults to None. (optional) + /// layer (str): The edge layer name (optional) Defaults to None. + /// layer_in_df (bool): Whether the layer name should be used to look up the values in a column of the dataframe or if it should be used directly as the layer for all edges (optional) defaults to True. + /// + /// Returns: + /// Result<(), GraphError>: Result of the operation. + #[pyo3(signature = (parquet_path, src, dst, time, properties = None, const_properties = None, shared_const_properties = None, layer = None, layer_in_df = true))] + fn load_edges_from_parquet( + &self, + parquet_path: PathBuf, + src: &str, + dst: &str, + time: &str, + properties: Option>, + const_properties: Option>, + shared_const_properties: Option>, + layer: Option<&str>, + layer_in_df: Option, + ) -> Result<(), GraphError> { + load_edges_from_parquet( + &self.graph, + parquet_path.as_path(), + src, + dst, + time, + properties, + const_properties, + shared_const_properties, + layer, + layer_in_df, + ) + } + /// Load edges deletions from a Pandas DataFrame into the graph. /// /// Arguments: @@ -543,7 +692,7 @@ impl PyPersistentGraph { /// dst (str): The column name for the destination node ids. /// time (str): The column name for the update timestamps. /// layer (str): The edge layer name (optional) Defaults to None. - /// layer_in_df (bool): Whether the layer name should be used to look up the values in a column of the dateframe or if it should be used directly as the layer for all edges (optional) defaults to True. + /// layer_in_df (bool): Whether the layer name should be used to look up the values in a column of the dataframe or if it should be used directly as the layer for all edges (optional) defaults to True. /// /// Returns: /// Result<(), GraphError>: Result of the operation. @@ -557,42 +706,40 @@ impl PyPersistentGraph { layer: Option<&str>, layer_in_df: Option, ) -> Result<(), GraphError> { - let graph = &self.graph.0; - Python::with_gil(|py| { - let size: usize = py - .eval( - "index.__len__()", - Some([("index", df.getattr("index")?)].into_py_dict(py)), - None, - )? - .extract()?; - - let mut cols_to_check = vec![src, dst, time]; - if layer_in_df.unwrap_or(true) { - if let Some(ref layer) = layer { - cols_to_check.push(layer.as_ref()); - } - } - - let df = process_pandas_py_df(df, py, size, cols_to_check.clone())?; - - df.check_cols_exist(&cols_to_check)?; - load_edges_deletions_from_df( - &df, - size, - src, - dst, - time, - layer, - layer_in_df.unwrap_or(true), - graph, - ) - .map_err(|e| GraphLoadException::new_err(format!("{:?}", e)))?; + load_edges_deletions_from_pandas(&self.graph.0, df, src, dst, time, layer, layer_in_df) + } - Ok::<(), PyErr>(()) - }) - .map_err(|e| GraphError::LoadFailure(format!("Failed to load graph {e:?}")))?; - Ok(()) + /// Load edges deletions from a Parquet file into the graph. + /// + /// Arguments: + /// parquet_path (str): Parquet file or directory of Parquet files path containing node information. + /// src (str): The column name for the source node ids. + /// dst (str): The column name for the destination node ids. + /// time (str): The column name for the update timestamps. + /// layer (str): The edge layer name (optional) Defaults to None. + /// layer_in_df (bool): Whether the layer name should be used to look up the values in a column of the dataframe or if it should be used directly as the layer for all edges (optional) defaults to True. + /// + /// Returns: + /// Result<(), GraphError>: Result of the operation. + #[pyo3(signature = (parquet_path, src, dst, time, layer = None, layer_in_df = true))] + fn load_edges_deletions_from_parquet( + &self, + parquet_path: PathBuf, + src: &str, + dst: &str, + time: &str, + layer: Option<&str>, + layer_in_df: Option, + ) -> Result<(), GraphError> { + load_edges_deletions_from_parquet( + &self.graph, + parquet_path.as_path(), + src, + dst, + time, + layer, + layer_in_df, + ) } /// Load node properties from a Pandas DataFrame. @@ -613,7 +760,7 @@ impl PyPersistentGraph { const_properties: Option>, shared_const_properties: Option>, ) -> Result<(), GraphError> { - utils::load_node_props_from_pandas( + load_node_props_from_pandas( &self.graph.0, df, id, @@ -622,6 +769,33 @@ impl PyPersistentGraph { ) } + /// Load node properties from a parquet file. + /// + /// Arguments: + /// parquet_path (str): Parquet file or directory of Parquet files path containing node information. + /// id(str): The column name for the node IDs. + /// const_properties (List): List of constant node property column names. Defaults to None. (optional) + /// shared_const_properties (>): A dictionary of constant properties that will be added to every node. Defaults to None. (optional) + /// + /// Returns: + /// Result<(), GraphError>: Result of the operation. + #[pyo3(signature = (parquet_path, id, const_properties = None, shared_const_properties = None))] + fn load_node_props_from_parquet( + &self, + parquet_path: PathBuf, + id: &str, + const_properties: Option>, + shared_const_properties: Option>, + ) -> Result<(), GraphError> { + load_node_props_from_parquet( + &self.graph, + parquet_path.as_path(), + id, + const_properties, + shared_const_properties, + ) + } + /// Load edge properties from a Pandas DataFrame. /// /// Arguments: @@ -646,7 +820,7 @@ impl PyPersistentGraph { layer: Option<&str>, layer_in_df: Option, ) -> Result<(), GraphError> { - utils::load_edge_props_from_pandas( + load_edge_props_from_pandas( &self.graph.0, df, src, @@ -657,4 +831,40 @@ impl PyPersistentGraph { layer_in_df, ) } + + /// Load edge properties from parquet file + /// + /// Arguments: + /// parquet_path (str): Parquet file or directory of Parquet files path containing edge information. + /// src (str): The column name for the source node. + /// dst (str): The column name for the destination node. + /// const_properties (List): List of constant edge property column names. Defaults to None. (optional) + /// shared_const_properties (dict): A dictionary of constant properties that will be added to every edge. Defaults to None. (optional) + /// layer (str): Layer name. Defaults to None. (optional) + /// layer_in_df (bool): Whether the layer name should be used to look up the values in a column of the data frame or if it should be used directly as the layer for all edges (optional) defaults to True. + /// + /// Returns: + /// Result<(), GraphError>: Result of the operation. + #[pyo3(signature = (parquet_path, src, dst, const_properties = None, shared_const_properties = None, layer = None, layer_in_df = true))] + fn load_edge_props_from_parquet( + &self, + parquet_path: PathBuf, + src: &str, + dst: &str, + const_properties: Option>, + shared_const_properties: Option>, + layer: Option<&str>, + layer_in_df: Option, + ) -> Result<(), GraphError> { + load_edge_props_from_parquet( + &self.graph, + parquet_path.as_path(), + src, + dst, + const_properties, + shared_const_properties, + layer, + layer_in_df, + ) + } } diff --git a/raphtory/src/python/graph/io/mod.rs b/raphtory/src/python/graph/io/mod.rs new file mode 100644 index 0000000000..a25698fdfa --- /dev/null +++ b/raphtory/src/python/graph/io/mod.rs @@ -0,0 +1,9 @@ +use polars_arrow::array::Array; +use pyo3::{create_exception, exceptions::PyException}; + +pub mod pandas_loaders; + +pub type ArrayRef = Box; + +create_exception!(exceptions, ArrowErrorException, PyException); +create_exception!(exceptions, GraphLoadException, PyException); diff --git a/raphtory/src/python/graph/utils.rs b/raphtory/src/python/graph/io/pandas_loaders.rs similarity index 50% rename from raphtory/src/python/graph/utils.rs rename to raphtory/src/python/graph/io/pandas_loaders.rs index 98701b92cb..20856e3f5c 100644 --- a/raphtory/src/python/graph/utils.rs +++ b/raphtory/src/python/graph/io/pandas_loaders.rs @@ -1,13 +1,11 @@ -use crate::core::{entities::graph::tgraph::InternalGraph, utils::errors::GraphError, Prop}; -use pyo3::{prelude::*, types::IntoPyDict}; -use std::collections::HashMap; - -use super::pandas::{ - dataframe::{process_pandas_py_df, GraphLoadException}, - loaders::{ - load_edges_from_df, load_edges_props_from_df, load_node_props_from_df, load_nodes_from_df, - }, +use crate::{ + core::{entities::graph::tgraph::InternalGraph, utils::errors::GraphError, Prop}, + io::arrow::{dataframe::*, df_loaders::*}, + python::graph::io::*, }; +use polars_arrow::{array::Array, ffi}; +use pyo3::{ffi::Py_uintptr_t, prelude::*, types::IntoPyDict}; +use std::collections::HashMap; pub fn load_nodes_from_pandas( graph: &InternalGraph, @@ -38,7 +36,7 @@ pub fn load_nodes_from_pandas( } } - let df = process_pandas_py_df(df, py, size, cols_to_check.clone())?; + let df = process_pandas_py_df(df, py, cols_to_check.clone())?; df.check_cols_exist(&cols_to_check)?; load_nodes_from_df( @@ -90,7 +88,7 @@ pub fn load_edges_from_pandas( } } - let df = process_pandas_py_df(df, py, size, cols_to_check.clone())?; + let df = process_pandas_py_df(df, py, cols_to_check.clone())?; df.check_cols_exist(&cols_to_check)?; load_edges_from_df( @@ -131,7 +129,7 @@ pub fn load_node_props_from_pandas( .extract()?; let mut cols_to_check = vec![id]; cols_to_check.extend(const_properties.as_ref().unwrap_or(&Vec::new())); - let df = process_pandas_py_df(df, py, size, cols_to_check.clone())?; + let df = process_pandas_py_df(df, py, cols_to_check.clone())?; df.check_cols_exist(&cols_to_check)?; load_node_props_from_df( @@ -175,7 +173,7 @@ pub fn load_edge_props_from_pandas( } } cols_to_check.extend(const_properties.as_ref().unwrap_or(&Vec::new())); - let df = process_pandas_py_df(df, py, size, cols_to_check.clone())?; + let df = process_pandas_py_df(df, py, cols_to_check.clone())?; df.check_cols_exist(&cols_to_check)?; load_edges_props_from_df( &df, @@ -195,3 +193,162 @@ pub fn load_edge_props_from_pandas( .map_err(|e| GraphError::LoadFailure(format!("Failed to load graph {e:?}")))?; Ok(()) } + +pub fn load_edges_deletions_from_pandas( + graph: &InternalGraph, + df: &PyAny, + src: &str, + dst: &str, + time: &str, + layer: Option<&str>, + layer_in_df: Option, +) -> Result<(), GraphError> { + Python::with_gil(|py| { + let size: usize = py + .eval( + "index.__len__()", + Some([("index", df.getattr("index")?)].into_py_dict(py)), + None, + )? + .extract()?; + + let mut cols_to_check = vec![src, dst, time]; + if layer_in_df.unwrap_or(true) { + if let Some(ref layer) = layer { + cols_to_check.push(layer.as_ref()); + } + } + + let df = process_pandas_py_df(df, py, cols_to_check.clone())?; + df.check_cols_exist(&cols_to_check)?; + + load_edges_deletions_from_df( + &df, + size, + src, + dst, + time, + layer, + layer_in_df.unwrap_or(true), + graph, + ) + .map_err(|e| GraphLoadException::new_err(format!("{:?}", e)))?; + + Ok::<(), PyErr>(()) + }) + .map_err(|e| GraphError::LoadFailure(format!("Failed to load graph {e:?}")))?; + Ok(()) +} + +pub(crate) fn process_pandas_py_df( + df: &PyAny, + py: Python, + col_names: Vec<&str>, +) -> PyResult { + is_jupyter(py); + py.import("pandas")?; + let module = py.import("pyarrow")?; + let pa_table = module.getattr("Table")?; + + let df_columns: Vec = df.getattr("columns")?.extract()?; + + let cols_to_drop: Vec = df_columns + .into_iter() + .filter(|x| !col_names.contains(&x.as_str())) + .collect(); + + let dropped_df = if !cols_to_drop.is_empty() { + let drop_method = df.getattr("drop")?; + drop_method.call((cols_to_drop,), Some(vec![("axis", 1)].into_py_dict(py)))? + } else { + df + }; + + let _df_columns: Vec = dropped_df.getattr("columns")?.extract()?; + + let table = pa_table.call_method("from_pandas", (dropped_df,), None)?; + + let rb = table.call_method0("to_batches")?.extract::>()?; + let names: Vec = if let Some(batch0) = rb.get(0) { + let schema = batch0.getattr("schema")?; + schema.getattr("names")?.extract::>()? + } else { + vec![] + } + .into_iter() + .filter(|x| col_names.contains(&x.as_str())) + .collect(); + + let arrays = rb + .iter() + .map(|rb| { + (0..names.len()) + .map(|i| { + let array = rb.call_method1("column", (i,))?; + let arr = array_to_rust(array)?; + Ok::, PyErr>(arr) + }) + .collect::, PyErr>>() + }) + .collect::, PyErr>>()?; + + let df = DFView { names, arrays }; + Ok(df) +} + +pub fn array_to_rust(obj: &PyAny) -> PyResult { + // prepare a pointer to receive the Array struct + let array = Box::new(ffi::ArrowArray::empty()); + let schema = Box::new(ffi::ArrowSchema::empty()); + + let array_ptr = &*array as *const ffi::ArrowArray; + let schema_ptr = &*schema as *const ffi::ArrowSchema; + + // make the conversion through PyArrow's private API + // this changes the pointer's memory and is thus unsafe. In particular, `_export_to_c` can go out of bounds + obj.call_method1( + "_export_to_c", + (array_ptr as Py_uintptr_t, schema_ptr as Py_uintptr_t), + )?; + + unsafe { + let field = ffi::import_field_from_c(schema.as_ref()) + .map_err(|e| ArrowErrorException::new_err(format!("{:?}", e)))?; + + let array = ffi::import_array_from_c(*array, field.data_type) + .map_err(|e| ArrowErrorException::new_err(format!("{:?}", e)))?; + + Ok(array) + } +} + +fn is_jupyter(py: Python) { + let code = r#" +try: + shell = get_ipython().__class__.__name__ + if shell == 'ZMQInteractiveShell': + result = True # Jupyter notebook or qtconsole + elif shell == 'TerminalInteractiveShell': + result = False # Terminal running IPython + else: + result = False # Other type, assuming not a Jupyter environment +except NameError: + result = False # Probably standard Python interpreter +"#; + + if let Err(e) = py.run(code, None, None) { + println!("Error checking if running in a jupyter notebook: {}", e); + return; + } + + match py.eval("result", None, None) { + Ok(x) => { + if let Ok(x) = x.extract() { + kdam::set_notebook(x); + } + } + Err(e) => { + println!("Error checking if running in a jupyter notebook: {}", e); + } + }; +} diff --git a/raphtory/src/python/graph/mod.rs b/raphtory/src/python/graph/mod.rs index 7b457b6a1b..e32a05c680 100644 --- a/raphtory/src/python/graph/mod.rs +++ b/raphtory/src/python/graph/mod.rs @@ -10,8 +10,7 @@ pub mod graph_with_deletions; pub mod edges; #[cfg(feature = "search")] pub mod index; +pub mod io; pub mod node; -pub mod pandas; pub mod properties; -pub mod utils; pub mod views; diff --git a/raphtory/src/python/graph/pandas/dataframe.rs b/raphtory/src/python/graph/pandas/dataframe.rs deleted file mode 100644 index a38fd90e19..0000000000 --- a/raphtory/src/python/graph/pandas/dataframe.rs +++ /dev/null @@ -1,218 +0,0 @@ -use crate::core::utils::errors::GraphError; - -use polars_arrow::{ - array::{Array, PrimitiveArray, Utf8Array}, - compute::cast::{self, CastOptions}, - datatypes::{ArrowDataType as DataType, TimeUnit}, - ffi, - offset::Offset, - types::NativeType, -}; - -use itertools::Itertools; -use pyo3::{ - create_exception, exceptions::PyException, ffi::Py_uintptr_t, types::IntoPyDict, PyAny, PyErr, - PyResult, Python, -}; - -#[derive(Debug)] -pub(crate) struct PretendDF { - pub(crate) names: Vec, - pub(crate) arrays: Vec>>, -} - -impl PretendDF { - pub fn check_cols_exist(&self, cols: &[&str]) -> Result<(), GraphError> { - let non_cols: Vec<&&str> = cols - .iter() - .filter(|c| !self.names.contains(&c.to_string())) - .collect(); - if non_cols.len() > 0 { - return Err(GraphError::ColumnDoesNotExist(non_cols.iter().join(", "))); - } - - Ok(()) - } - - pub(crate) fn iter_col( - &self, - name: &str, - ) -> Option> + '_> { - let idx = self.names.iter().position(|n| n == name)?; - - let _ = (&self.arrays[0])[idx] - .as_any() - .downcast_ref::>()?; - - let iter = self.arrays.iter().flat_map(move |arr| { - let arr = &arr[idx]; - let arr = arr.as_any().downcast_ref::>().unwrap(); - arr.iter() - }); - - Some(iter) - } - - pub fn utf8(&self, name: &str) -> Option> + '_> { - let idx = self.names.iter().position(|n| n == name)?; - // test that it's actually a utf8 array - let _ = (&self.arrays[0])[idx] - .as_any() - .downcast_ref::>()?; - - let iter = self.arrays.iter().flat_map(move |arr| { - let arr = &arr[idx]; - let arr = arr.as_any().downcast_ref::>().unwrap(); - arr.iter() - }); - - Some(iter) - } - - pub fn time_iter_col(&self, name: &str) -> Option> + '_> { - let idx = self.names.iter().position(|n| n == name)?; - - let _ = (&self.arrays[0])[idx] - .as_any() - .downcast_ref::>()?; - - let iter = self.arrays.iter().flat_map(move |arr| { - let arr = &arr[idx]; - let arr = if let DataType::Timestamp(_, _) = arr.data_type() { - let array = cast::cast( - &*arr.clone(), - &DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".to_string())), - CastOptions::default(), - ) - .unwrap(); - array - } else { - arr.clone() - }; - - let arr = arr.as_any().downcast_ref::>().unwrap(); - arr.clone().into_iter() - }); - - Some(iter) - } -} - -fn is_jupyter(py: Python) { - let code = r#" -try: - shell = get_ipython().__class__.__name__ - if shell == 'ZMQInteractiveShell': - result = True # Jupyter notebook or qtconsole - elif shell == 'TerminalInteractiveShell': - result = False # Terminal running IPython - else: - result = False # Other type, assuming not a Jupyter environment -except NameError: - result = False # Probably standard Python interpreter -"#; - - if let Err(e) = py.run(code, None, None) { - println!("Error checking if running in a jupyter notebook: {}", e); - return; - } - - match py.eval("result", None, None) { - Ok(x) => { - if let Ok(x) = x.extract() { - kdam::set_notebook(x); - } - } - Err(e) => { - println!("Error checking if running in a jupyter notebook: {}", e); - } - }; -} - -pub(crate) fn process_pandas_py_df( - df: &PyAny, - py: Python, - _size: usize, - col_names: Vec<&str>, -) -> PyResult { - is_jupyter(py); - py.import("pandas")?; - let module = py.import("pyarrow")?; - let pa_table = module.getattr("Table")?; - - let df_columns: Vec = df.getattr("columns")?.extract()?; - - let cols_to_drop: Vec = df_columns - .into_iter() - .filter(|x| !col_names.contains(&x.as_str())) - .collect(); - - let dropped_df = if !cols_to_drop.is_empty() { - let drop_method = df.getattr("drop")?; - drop_method.call((cols_to_drop,), Some(vec![("axis", 1)].into_py_dict(py)))? - } else { - df - }; - - let _df_columns: Vec = dropped_df.getattr("columns")?.extract()?; - - let table = pa_table.call_method("from_pandas", (dropped_df,), None)?; - - let rb = table.call_method0("to_batches")?.extract::>()?; - let names: Vec = if let Some(batch0) = rb.get(0) { - let schema = batch0.getattr("schema")?; - schema.getattr("names")?.extract::>()? - } else { - vec![] - } - .into_iter() - .filter(|x| col_names.contains(&x.as_str())) - .collect(); - - let arrays = rb - .iter() - .map(|rb| { - (0..names.len()) - .map(|i| { - let array = rb.call_method1("column", (i,))?; - let arr = array_to_rust(array)?; - Ok::, PyErr>(arr) - }) - .collect::, PyErr>>() - }) - .collect::, PyErr>>()?; - - let df = PretendDF { names, arrays }; - Ok(df) -} - -pub fn array_to_rust(obj: &PyAny) -> PyResult { - // prepare a pointer to receive the Array struct - let array = Box::new(ffi::ArrowArray::empty()); - let schema = Box::new(ffi::ArrowSchema::empty()); - - let array_ptr = &*array as *const ffi::ArrowArray; - let schema_ptr = &*schema as *const ffi::ArrowSchema; - - // make the conversion through PyArrow's private API - // this changes the pointer's memory and is thus unsafe. In particular, `_export_to_c` can go out of bounds - obj.call_method1( - "_export_to_c", - (array_ptr as Py_uintptr_t, schema_ptr as Py_uintptr_t), - )?; - - unsafe { - let field = ffi::import_field_from_c(schema.as_ref()) - .map_err(|e| ArrowErrorException::new_err(format!("{:?}", e)))?; - - let array = ffi::import_array_from_c(*array, field.data_type) - .map_err(|e| ArrowErrorException::new_err(format!("{:?}", e)))?; - - Ok(array) - } -} - -pub type ArrayRef = Box; - -create_exception!(exceptions, ArrowErrorException, PyException); -create_exception!(exceptions, GraphLoadException, PyException); diff --git a/raphtory/src/python/packages/graph_loader.rs b/raphtory/src/python/packages/graph_loader.rs index c7784a9891..3ed52dae8f 100644 --- a/raphtory/src/python/packages/graph_loader.rs +++ b/raphtory/src/python/packages/graph_loader.rs @@ -26,7 +26,7 @@ use tokio::runtime::Runtime; /// A Graph containing the LOTR dataset #[pyfunction] pub fn lotr_graph() -> PyResult> { - PyGraph::py_from_db_graph(crate::graph_loader::example::lotr_graph::lotr_graph()) + PyGraph::py_from_db_graph(crate::graph_loader::lotr_graph::lotr_graph()) } /// Load (a subset of) Reddit hyperlinks dataset into a graph. @@ -67,9 +67,10 @@ pub fn lotr_graph() -> PyResult> { #[pyfunction] #[pyo3(signature = (timeout_seconds=600))] pub fn reddit_hyperlink_graph(timeout_seconds: u64) -> PyResult> { - PyGraph::py_from_db_graph( - crate::graph_loader::example::reddit_hyperlinks::reddit_graph(timeout_seconds, false), - ) + PyGraph::py_from_db_graph(crate::graph_loader::reddit_hyperlinks::reddit_graph( + timeout_seconds, + false, + )) } #[pyfunction] @@ -77,19 +78,17 @@ pub fn reddit_hyperlink_graph(timeout_seconds: u64) -> PyResult> { pub fn reddit_hyperlink_graph_local(file_path: &str) -> PyResult> { let file_path_buf = PathBuf::from(file_path); PyGraph::py_from_db_graph( - crate::graph_loader::example::reddit_hyperlinks::generate_reddit_graph(file_path_buf), + crate::graph_loader::reddit_hyperlinks::generate_reddit_graph(file_path_buf), ) } #[pyfunction] #[pyo3(signature = (path=None,subset=None))] pub fn stable_coin_graph(path: Option, subset: Option) -> PyResult> { - PyGraph::py_from_db_graph( - crate::graph_loader::example::stable_coins::stable_coin_graph( - path, - subset.unwrap_or(false), - ), - ) + PyGraph::py_from_db_graph(crate::graph_loader::stable_coins::stable_coin_graph( + path, + subset.unwrap_or(false), + )) } #[pyfunction] @@ -100,11 +99,12 @@ pub fn neo4j_movie_graph( password: String, database: String, ) -> PyResult> { - let g = Runtime::new().unwrap().block_on( - crate::graph_loader::example::neo4j_examples::neo4j_movie_graph( - uri, username, password, database, - ), - ); + let g = + Runtime::new() + .unwrap() + .block_on(crate::graph_loader::neo4j_examples::neo4j_movie_graph( + uri, username, password, database, + )); PyGraph::py_from_db_graph(g) } @@ -131,5 +131,5 @@ pub fn neo4j_movie_graph( #[pyfunction] #[pyo3(signature = ())] pub fn karate_club_graph() -> PyResult> { - PyGraph::py_from_db_graph(crate::graph_loader::example::karate_club::karate_club_graph()) + PyGraph::py_from_db_graph(crate::graph_loader::karate_club::karate_club_graph()) } diff --git a/raphtory/src/python/utils/errors.rs b/raphtory/src/python/utils/errors.rs index 7a67443639..f242220f10 100644 --- a/raphtory/src/python/utils/errors.rs +++ b/raphtory/src/python/utils/errors.rs @@ -1,6 +1,6 @@ use crate::{ core::utils::{errors::GraphError, time::error::ParseTimeError}, - graph_loader::source::csv_loader::CsvErr, + io::csv_loader::CsvErr, }; use pyo3::{exceptions::PyException, PyErr}; use std::error::Error;