Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet loader #1666

Merged
merged 35 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
168dbb1
rename graph/pandas to graph/io
shivam-880 Jun 18, 2024
1826f12
rename utils to panda_loaders
shivam-880 Jun 18, 2024
d433bbc
refactor panda loader related functions to panda_loader from df_loaders
shivam-880 Jun 18, 2024
f0da7b2
init parquet loader
shivam-880 Jun 18, 2024
d3ad3d8
impl and test process_parquet_file_to_df
shivam-880 Jun 19, 2024
4280dba
impl load edges from parquet for graph
shivam-880 Jun 19, 2024
fc9f2ed
impl/test load nodes from parquet and fix names order issue
shivam-880 Jun 19, 2024
db8c036
impl/test load from parquet. load node/edge props from parquet
shivam-880 Jun 19, 2024
69ffe89
add tests for props
shivam-880 Jun 20, 2024
8ab1b64
ref tests
shivam-880 Jun 20, 2024
e09f387
simplify tests
shivam-880 Jun 20, 2024
7744a32
more tests
shivam-880 Jun 20, 2024
76238f1
impl loaders for persistent graphs
shivam-880 Jun 20, 2024
ff0e6eb
impl loader tests for persistent graphs
shivam-880 Jun 20, 2024
ec1e9ab
move load_edges_deletions_from_pandas to panda_loaders
shivam-880 Jun 20, 2024
38a4fb6
impl load_edges_deletions_from_parquet
shivam-880 Jun 20, 2024
5e1dd62
impl edge deletions tests
shivam-880 Jun 20, 2024
5326733
fmt
shivam-880 Jun 20, 2024
96267dc
add py dep
shivam-880 Jun 20, 2024
1323b32
add deps
shivam-880 Jun 20, 2024
330890c
move compression to arrow dep
shivam-880 Jun 20, 2024
116d26c
restruct io
shivam-880 Jun 21, 2024
c48c750
restruct arrow
shivam-880 Jun 21, 2024
4e63229
tmpdir impl
shivam-880 Jun 21, 2024
c32bf86
add feature gates
shivam-880 Jun 21, 2024
7178d01
make parquet loader compatible for rust
shivam-880 Jun 21, 2024
c3048c8
move py impls to python package
shivam-880 Jun 22, 2024
dcf50de
rename PretendDF to DFView, panda_loaders to pandas_loaders
shivam-880 Jun 24, 2024
8d57736
rid parent
shivam-880 Jun 24, 2024
bbb065f
make test create parquet
shivam-880 Jun 24, 2024
f27c9a0
load parquet from dir or file
shivam-880 Jun 24, 2024
f102175
change invalid layers error message to include valid layers
shivam-880 Jun 24, 2024
05a586a
Merge branch 'master' into parquet-loader
shivam-880 Jun 24, 2024
07d12f3
fix issue with valid_layers
shivam-880 Jun 26, 2024
4c7695f
Merge branch 'master' into parquet-loader
shivam-880 Jun 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added python/tests/data/parquet/edges.parquet
Binary file not shown.
Binary file added python/tests/data/parquet/edges_deletions.parquet
Binary file not shown.
Binary file added python/tests/data/parquet/nodes.parquet
Binary file not shown.
434 changes: 434 additions & 0 deletions python/tests/test_load_from_parquet.py

Large diffs are not rendered by default.

11 changes: 7 additions & 4 deletions raphtory/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,10 @@ io = [
# Enables generating the pyo3 python bindings
python = [
"io",
"arrow",
"dep:pyo3",
"dep:num",
"dep:display-error-chain",
"dep:polars-arrow",
"polars-arrow?/compute",
"dep:kdam",
"dep:rpds",
Expand All @@ -118,10 +118,9 @@ search = ["dep:tantivy"]
vectors = ["dep:futures-util", "dep:async-trait", "dep:async-openai"]
# storage
storage = [
"arrow",
"pometry-storage",
"dep:polars-arrow",
"dep:polars-utils",
"dep:polars-parquet",
"dep:memmap2",
"dep:ahash",
"dep:tempfile",
Expand All @@ -130,5 +129,9 @@ storage = [
"dep:thread_local",
"polars-arrow?/io_ipc",
"polars-arrow?/arrow_rs",
"polars-parquet?/compression",
]
arrow = [
"dep:polars-arrow",
"dep:polars-parquet",
"polars-parquet?/compression"
]
Binary file added raphtory/resources/test/test_data.parquet
Binary file not shown.
5 changes: 5 additions & 0 deletions raphtory/src/core/utils/errors.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
use crate::core::{utils::time::error::ParseTimeError, ArcStr, Prop, PropType};
#[cfg(feature = "arrow")]
shivam-880 marked this conversation as resolved.
Show resolved Hide resolved
use polars_arrow::legacy::error;
#[cfg(feature = "search")]
use tantivy;
#[cfg(feature = "search")]
use tantivy::query::QueryParserError;

#[derive(thiserror::Error, Debug)]
pub enum GraphError {
#[cfg(feature = "arrow")]
shivam-880 marked this conversation as resolved.
Show resolved Hide resolved
#[error("Arrow error: {0}")]
Arrow(#[from] error::PolarsError),
#[error("Graph error occurred")]
UnsupportedDataType,
#[error("Graph already exists by name = {name}")]
Expand Down
4 changes: 2 additions & 2 deletions raphtory/src/python/graph/disk_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use pyo3::{
types::{IntoPyDict, PyDict, PyList, PyString},
};

use super::pandas::dataframe::{process_pandas_py_df, PretendDF};
use super::io::{dataframe::PretendDF, panda_loaders::*};

impl From<Error> for PyErr {
fn from(value: Error) -> Self {
Expand Down Expand Up @@ -172,7 +172,7 @@ impl PyDiskGraph {
let df_columns: Vec<String> = edge_df.getattr("columns")?.extract()?;
let df_columns: Vec<&str> = df_columns.iter().map(|x| x.as_str()).collect();

let df = process_pandas_py_df(edge_df, py, size, df_columns)?;
let df = process_pandas_py_df(edge_df, py, df_columns)?;
shivam-880 marked this conversation as resolved.
Show resolved Hide resolved

df.check_cols_exist(&cols_to_check)?;
let graph = Self::from_pandas(graph_dir, df, src_col, dst_col, time_col)?;
Expand Down
236 changes: 229 additions & 7 deletions raphtory/src/python/graph/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
//! This is the base class used to create a temporal graph, add nodes and edges,
//! create windows, and query the graph with a variety of algorithms.
//! In Python, this class wraps around the rust graph.
use super::utils;
use crate::{
algorithms::components::LargestConnectedComponent,
core::{entities::nodes::node_ref::NodeRef, utils::errors::GraphError, ArcStr},
Expand All @@ -14,7 +13,10 @@ use crate::{
prelude::*,
python::{
graph::{
edge::PyEdge, graph_with_deletions::PyPersistentGraph, node::PyNode,
edge::PyEdge,
graph_with_deletions::PyPersistentGraph,
io::{panda_loaders::*, parquet_loaders::*},
node::PyNode,
views::graph_view::PyGraphView,
},
utils::{PyInputNode, PyTime},
Expand All @@ -24,7 +26,7 @@ use pyo3::{prelude::*, types::PyBytes};
use std::{
collections::HashMap,
fmt::{Debug, Formatter},
path::Path,
path::{Path, PathBuf},
};

/// A temporal graph.
Expand Down Expand Up @@ -458,6 +460,83 @@ impl PyGraph {
Ok(graph.graph)
}

/// Load a graph from Parquet file.
///
/// Args:
/// edge_parquet_file_path (str): Parquet file containing the edges.
/// edge_src (str): The column name for the source node ids.
/// edge_dst (str): The column name for the destination node ids.
/// edge_time (str): The column name for the timestamps.
/// edge_properties (list): The column names for the temporal properties (optional) Defaults to None.
/// edge_const_properties (list): The column names for the constant properties (optional) Defaults to None.
/// edge_shared_const_properties (dict): A dictionary of constant properties that will be added to every edge (optional) Defaults to None.
/// edge_layer (str): The edge layer name (optional) Defaults to None.
/// layer_in_df (bool): Whether the layer name should be used to look up the values in a column of the edge_df or if it should be used directly as the layer for all edges (optional) defaults to True.
shivam-880 marked this conversation as resolved.
Show resolved Hide resolved
/// node_parquet_file_path (str): Parquet file containing the nodes (optional) Defaults to None.
/// node_id (str): The column name for the node ids (optional) Defaults to None.
/// node_time (str): The column name for the node timestamps (optional) Defaults to None.
/// node_properties (list): The column names for the node temporal properties (optional) Defaults to None.
/// node_const_properties (list): The column names for the node constant properties (optional) Defaults to None.
/// node_shared_const_properties (dict): A dictionary of constant properties that will be added to every node (optional) Defaults to None.
/// node_type (str): the column name for the node type
/// node_type_in_df (bool): whether the node type should be used to look up the values in a column of the df or if it should be used directly as the node type
shivam-880 marked this conversation as resolved.
Show resolved Hide resolved
///
/// Returns:
/// Graph: The loaded Graph object.
#[staticmethod]
#[pyo3(signature = (edge_parquet_file_path, edge_src, edge_dst, edge_time, edge_properties = None, edge_const_properties = None, edge_shared_const_properties = None,
edge_layer = None, layer_in_df = true, node_parquet_file_path = None, node_id = None, node_time = None, node_properties = None,
node_const_properties = None, node_shared_const_properties = None, node_type = None, node_type_in_df = true))]
fn load_from_parquet(
edge_parquet_file_path: PathBuf,
edge_src: &str,
edge_dst: &str,
edge_time: &str,
shivam-880 marked this conversation as resolved.
Show resolved Hide resolved
edge_properties: Option<Vec<&str>>,
edge_const_properties: Option<Vec<&str>>,
edge_shared_const_properties: Option<HashMap<String, Prop>>,
edge_layer: Option<&str>,
layer_in_df: Option<bool>,
node_parquet_file_path: Option<PathBuf>,
node_id: Option<&str>,
node_time: Option<&str>,
node_properties: Option<Vec<&str>>,
node_const_properties: Option<Vec<&str>>,
node_shared_const_properties: Option<HashMap<String, Prop>>,
node_type: Option<&str>,
node_type_in_df: Option<bool>,
) -> Result<Graph, GraphError> {
let graph = PyGraph {
graph: Graph::new(),
};
if let (Some(node_parquet_file_path), Some(node_id), Some(node_time)) =
(node_parquet_file_path, node_id, node_time)
{
graph.load_nodes_from_parquet(
node_parquet_file_path,
node_id,
node_time,
node_type,
node_type_in_df,
node_properties,
node_const_properties,
node_shared_const_properties,
)?;
}
graph.load_edges_from_parquet(
edge_parquet_file_path,
edge_src,
edge_dst,
edge_time,
edge_properties,
edge_const_properties,
edge_shared_const_properties,
edge_layer,
layer_in_df,
)?;
Ok(graph.graph)
}

/// Load nodes from a Pandas DataFrame into the graph.
///
/// Arguments:
Expand All @@ -483,7 +562,7 @@ impl PyGraph {
const_properties: Option<Vec<&str>>,
shared_const_properties: Option<HashMap<String, Prop>>,
) -> Result<(), GraphError> {
utils::load_nodes_from_pandas(
load_nodes_from_pandas(
&self.graph.0,
df,
id,
Expand All @@ -496,6 +575,44 @@ impl PyGraph {
)
}

/// Load nodes from a Parquet file into the graph.
///
/// Arguments:
/// parquet_file_path (str): Parquet file path containing the nodes
/// id (str): The column name for the node IDs.
/// time (str): The column name for the timestamps.
/// node_type (str): the column name for the node type
/// node_type_in_df (bool): whether the node type should be used to look up the values in a column of the df or if it should be used directly as the node type
/// properties (List<str>): List of node property column names. Defaults to None. (optional)
/// const_properties (List<str>): List of constant node property column names. Defaults to None. (optional)
/// shared_const_properties (Dictionary/Hashmap of properties): A dictionary of constant properties that will be added to every node. Defaults to None. (optional)
/// Returns:
/// Result<(), GraphError>: Result of the operation.
#[pyo3(signature = (parquet_file_path, id, time, node_type = None, node_type_in_df = true, properties = None, const_properties = None, shared_const_properties = None))]
fn load_nodes_from_parquet(
&self,
parquet_file_path: PathBuf,
id: &str,
time: &str,
node_type: Option<&str>,
node_type_in_df: Option<bool>,
properties: Option<Vec<&str>>,
const_properties: Option<Vec<&str>>,
shared_const_properties: Option<HashMap<String, Prop>>,
) -> Result<(), GraphError> {
load_nodes_from_parquet(
&self.graph.0,
parquet_file_path.as_path(),
id,
time,
node_type,
node_type_in_df,
properties,
const_properties,
shared_const_properties,
)
}

/// Load edges from a Pandas DataFrame into the graph.
///
/// Arguments:
Expand Down Expand Up @@ -524,7 +641,7 @@ impl PyGraph {
layer: Option<&str>,
layer_in_df: Option<bool>,
) -> Result<(), GraphError> {
utils::load_edges_from_pandas(
load_edges_from_pandas(
&self.graph.0,
df,
src,
Expand All @@ -538,6 +655,48 @@ impl PyGraph {
)
}

/// Load edges from a Parquet file into the graph.
///
/// Arguments:
/// parquet_file_path (str): Parquet file path containing edges
/// src (str): The column name for the source node ids.
/// dst (str): The column name for the destination node ids.
/// time (str): The column name for the update timestamps.
/// properties (List<str>): List of edge property column names. Defaults to None. (optional)
/// const_properties (List<str>): List of constant edge property column names. Defaults to None. (optional)
/// shared_const_properties (dict): A dictionary of constant properties that will be added to every edge. Defaults to None. (optional)
/// layer (str): The edge layer name (optional) Defaults to None.
/// layer_in_df (bool): Whether the layer name should be used to look up the values in a column of the dataframe or if it should be used directly as the layer for all edges (optional) defaults to True.
shivam-880 marked this conversation as resolved.
Show resolved Hide resolved
///
/// Returns:
/// Result<(), GraphError>: Result of the operation.
#[pyo3(signature = (parquet_file_path, src, dst, time, properties = None, const_properties = None, shared_const_properties = None, layer = None, layer_in_df = true))]
fn load_edges_from_parquet(
&self,
parquet_file_path: PathBuf,
src: &str,
dst: &str,
time: &str,
properties: Option<Vec<&str>>,
const_properties: Option<Vec<&str>>,
shared_const_properties: Option<HashMap<String, Prop>>,
layer: Option<&str>,
layer_in_df: Option<bool>,
) -> Result<(), GraphError> {
load_edges_from_parquet(
&self.graph.0,
parquet_file_path.as_path(),
src,
dst,
time,
properties,
const_properties,
shared_const_properties,
layer,
layer_in_df,
)
}

/// Load node properties from a Pandas DataFrame.
///
/// Arguments:
Expand All @@ -556,7 +715,7 @@ impl PyGraph {
const_properties: Option<Vec<&str>>,
shared_const_properties: Option<HashMap<String, Prop>>,
) -> Result<(), GraphError> {
utils::load_node_props_from_pandas(
load_node_props_from_pandas(
&self.graph.0,
df,
id,
Expand All @@ -565,6 +724,33 @@ impl PyGraph {
)
}

/// Load node properties from a parquet file.
///
/// Arguments:
/// parquet_file_path (str): Parquet file path containing node information.
/// id(str): The column name for the node IDs.
/// const_properties (List<str>): List of constant node property column names. Defaults to None. (optional)
/// shared_const_properties (<HashMap<String, Prop>>): A dictionary of constant properties that will be added to every node. Defaults to None. (optional)
///
/// Returns:
/// Result<(), GraphError>: Result of the operation.
#[pyo3(signature = (parquet_file_path, id, const_properties = None, shared_const_properties = None))]
fn load_node_props_from_parquet(
&self,
parquet_file_path: PathBuf,
id: &str,
const_properties: Option<Vec<&str>>,
shared_const_properties: Option<HashMap<String, Prop>>,
) -> Result<(), GraphError> {
load_node_props_from_parquet(
&self.graph.0,
parquet_file_path.as_path(),
id,
const_properties,
shared_const_properties,
)
}

/// Load edge properties from a Pandas DataFrame.
///
/// Arguments:
Expand All @@ -589,7 +775,7 @@ impl PyGraph {
layer: Option<&str>,
layer_in_df: Option<bool>,
) -> Result<(), GraphError> {
utils::load_edge_props_from_pandas(
load_edge_props_from_pandas(
&self.graph.0,
df,
src,
Expand All @@ -600,4 +786,40 @@ impl PyGraph {
layer_in_df,
)
}

/// Load edge properties from parquet file
///
/// Arguments:
/// parquet_file_path (str): Parquet file path containing edge information.
/// src (str): The column name for the source node.
/// dst (str): The column name for the destination node.
/// const_properties (List<str>): List of constant edge property column names. Defaults to None. (optional)
/// shared_const_properties (dict): A dictionary of constant properties that will be added to every edge. Defaults to None. (optional)
/// layer (str): Layer name. Defaults to None. (optional)
/// layer_in_df (bool): Whether the layer name should be used to look up the values in a column of the data frame or if it should be used directly as the layer for all edges (optional) defaults to True.
shivam-880 marked this conversation as resolved.
Show resolved Hide resolved
///
/// Returns:
/// Result<(), GraphError>: Result of the operation.
#[pyo3(signature = (parquet_file_path, src, dst, const_properties = None, shared_const_properties = None, layer = None, layer_in_df = true))]
fn load_edge_props_from_parquet(
&self,
parquet_file_path: PathBuf,
src: &str,
dst: &str,
const_properties: Option<Vec<&str>>,
shared_const_properties: Option<HashMap<String, Prop>>,
layer: Option<&str>,
layer_in_df: Option<bool>,
) -> Result<(), GraphError> {
load_edge_props_from_parquet(
&self.graph.0,
parquet_file_path.as_path(),
src,
dst,
const_properties,
shared_const_properties,
layer,
layer_in_df,
)
}
}
Loading
Loading